/*
PIP - Platform Independent Primitives
Peer - named I/O ethernet node
Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see .
*/
#include "pipeer.h"
#include "piconfig.h"
#include "pidatatransfer.h"
#include "piliterals_time.h"
#include "pipropertystorage.h"
#include "pitime.h"
#define _PIPEER_MSG_SIZE 4000
#define _PIPEER_MSG_TTL 100
#define _PIPEER_MULTICAST_TTL 4
#define _PIPEER_MULTICAST_IP "232.13.3.12"
#define _PIPEER_LOOPBACK_PORT_S 13313
#define _PIPEER_LOOPBACK_PORT_E (13313 + 32)
#define _PIPEER_MULTICAST_PORT 13360
#define _PIPEER_TCP_PORT _PIPEER_MULTICAST_PORT
#define _PIPEER_BROADCAST_PORT 13361
#define _PIPEER_TRAFFIC_PORT_S 13400
#define _PIPEER_TRAFFIC_PORT_E 14000
#define _PIPEER_PING_TIMEOUT 5.0
class PIPeer::PeerData: public PIObject {
PIOBJECT_SUBCLASS(PeerData, PIObject);
public:
PeerData(const PIString & n);
~PeerData();
EVENT_HANDLER1(void, dtSendRequestIn, PIByteArray &, data) {
data.push_front(uchar(2));
sendRequest(name(), data);
}
EVENT_HANDLER1(void, dtSendRequestOut, PIByteArray &, data) {
data.push_front(uchar(3));
sendRequest(name(), data);
}
EVENT_HANDLER1(void, dtReceiveFinishedIn, bool, ok) {
if (ok) received(name(), dt_in.data());
}
EVENT_HANDLER1(void, dtReceiveFinishedOut, bool, ok) {
if (ok) received(name(), dt_out.data());
}
EVENT_HANDLER(void, dtThread);
EVENT2(received, const PIString &, from, const PIByteArray &, data);
EVENT2(sendRequest, const PIString &, to, const PIByteArray &, data);
bool send(const PIByteArray & d);
void receivedPacket(uchar type, const PIByteArray & d);
void setDist(int dist);
PIByteArray data;
PIThread t;
PIDataTransfer dt_in, dt_out;
};
PIPeer::PeerData::PeerData(const PIString & n): PIObject(n) {
dt_in.setPacketSize(_PIPEER_MSG_SIZE);
dt_out.setPacketSize(_PIPEER_MSG_SIZE);
dt_in.setCRCEnabled(false);
dt_out.setCRCEnabled(false);
CONNECT1(void, PIByteArray &, &dt_in, sendRequest, this, dtSendRequestIn);
CONNECT1(void, PIByteArray &, &dt_out, sendRequest, this, dtSendRequestOut);
CONNECT1(void, bool, &dt_in, receiveFinished, this, dtReceiveFinishedIn);
CONNECT1(void, bool, &dt_out, receiveFinished, this, dtReceiveFinishedOut);
CONNECT0(void, &t, started, this, dtThread);
}
PIPeer::PeerData::~PeerData() {
dt_in.stop();
dt_out.stop();
t.stop();
if (!t.waitForFinish(1_s)) t.terminate();
}
void PIPeer::PeerData::dtThread() {
dt_out.send(data);
// piCoutObj << "send DT done";
}
bool PIPeer::PeerData::send(const PIByteArray & d) {
// piCout << "send ..." << t.isRunning();
if (t.isRunning()) return false;
data = d;
t.startOnce();
return true;
}
void PIPeer::PeerData::receivedPacket(uchar type, const PIByteArray & d) {
PIDataTransfer * dt = 0;
if (type == 3) dt = &dt_in;
if (type == 2) dt = &dt_out;
// piCoutObj << "DT received" << int(type) << d.size_s() << "...";
if (dt) dt->received(d);
// piCoutObj << "DT received" << int(type) << d.size_s() << "done";
}
void PIPeer::PeerData::setDist(int dist) {
dt_in.setTimeout(10 * dist);
}
PIPeer::PeerInfo::PeerAddress::PeerAddress(const PINetworkAddress & a, const PINetworkAddress & m): address(a), netmask(m) {
ping = -1.;
wait_ping = false;
last_ping = PISystemTime::current(true);
}
int PIPeer::PeerInfo::ping() const {
int ret = -1;
for (const auto & a: addresses)
if (a.ping > 0.) {
if (ret < 0)
ret = piRoundd(a.ping);
else
ret = piMini(ret, piRoundd(a.ping));
}
return ret;
}
void PIPeer::PeerInfo::init() {
if (_data == 0) _data = new PeerData(name);
}
void PIPeer::PeerInfo::destroy() {
if (_data) delete _data;
_data = 0;
}
PINetworkAddress PIPeer::PeerInfo::fastestAddress() const {
double mp = -1.;
PINetworkAddress ret;
for (const auto & a: addresses) {
if (a.ping <= 0.) continue;
if ((mp < 0) || (mp > a.ping)) {
mp = a.ping;
ret = a.address;
}
}
return ret;
}
void PIPeer::PeerInfo::addNeighbour(const PIString & n) {
if (!neighbours.contains(n)) neighbours << n;
}
void PIPeer::PeerInfo::addNeighbours(const PIStringList & l) {
for (const auto & n: l)
if (!neighbours.contains(n)) neighbours << n;
}
void PIPeer::PeerInfo::removeNeighbour(const PIString & n) {
neighbours.removeAll(n);
}
void PIPeer::PeerInfo::resetPing() {
for (auto & a: addresses)
a.ping = -1;
}
REGISTER_DEVICE(PIPeer)
PIPeer::PIPeer(const PIString & n)
: PIIODevice()
, inited__(false)
, eth_tcp_srv(PIEthernet::TCP_Server)
, eth_tcp_cli(PIEthernet::TCP_Client)
, diag_s(false)
, diag_d(false) {
sync_timer.setName("_S.PIPeer.sync");
// piCout << " PIPeer" << uint(this);
destroyed = false;
setDebug(false);
PIMutexLocker mbl(mc_mutex);
PIMutexLocker ethl(eth_mutex);
PIMutexLocker pl(peers_mutex);
PIMutexLocker sl(send_mutex);
changeName(n);
setReopenTimeout(100_ms);
read_buffer_size = 128;
self_info.dist = 0;
self_info.time = PISystemTime::current();
randomize();
CONNECT1(void, int, &sync_timer, tickEvent, this, timerEvent);
prev_ifaces = PIEthernet::interfaces();
no_timer = false;
sync_timer.addDelimiter(5);
}
PIPeer::~PIPeer() {
// piCout << "~PIPeer" << uint(this);
stop();
if (destroyed) return;
destroyed = true;
destroy();
}
void PIPeer::timerEvent(int delim) {
// piCoutObj << "timerEvent" << delim;
if (no_timer) return;
switch (delim) {
case 1: // every 1 s
syncPeers();
piMSleep(100);
pingNeighbours();
// piCoutObj << "isOpened" << isOpened();
break;
case 5: // every 5 s
checkNetwork();
break;
}
}
void PIPeer::initEths(PIStringList al) {
// piCoutObj << "initEths start";
PIEthernet * ce;
const PIEthernet::Interface * cint = 0;
for (const auto & a: al) {
ce = new PIEthernet();
ce->setDebug(false);
ce->setName("_S.PIPeer.traf_rec_" + a);
ce->setParameters(0);
bool ok = false;
for (int p = _PIPEER_TRAFFIC_PORT_S; p < _PIPEER_TRAFFIC_PORT_E; ++p) {
ce->setReadAddress(a, p);
if (ce->open()) {
eths_traffic << ce;
cint = prev_ifaces.getByAddress(a);
self_info.addresses << PeerInfo::PeerAddress(ce->path(), cint == 0 ? "255.255.255.0" : cint->netmask);
CONNECT2(void, const uchar *, ssize_t, ce, threadedReadEvent, this, dataRead);
ce->startThreadedRead();
// piCoutObj << "dc binded to" << ce->path();
// piCoutObj << "add eth" << a;
ok = true;
break;
}
}
if (!ok) delete ce;
}
eth_send.setDebug(false);
eth_send.setName("_S.PIPeer.traf_send");
eth_send.setParameters(0);
// piCoutObj << "initEths ok";
}
void PIPeer::initMBcasts(PIStringList al) {
PIEthernet * ce;
const PIEthernet::Interface * cint;
PIString nm;
al << _PIPEER_MULTICAST_IP;
// piCoutObj << "initMBcasts start" << al;
for (const auto & a: al) {
// piCout << "mcast try" << a;
ce = new PIEthernet();
ce->setDebug(false);
ce->setName("_S.PIPeer.mcast_" + a);
ce->setParameters(0);
ce->setSendAddress(_PIPEER_MULTICAST_IP, _PIPEER_MULTICAST_PORT);
ce->setReadAddress(a, _PIPEER_MULTICAST_PORT);
ce->setMulticastTTL(_PIPEER_MULTICAST_TTL);
ce->joinMulticastGroup(_PIPEER_MULTICAST_IP);
if (ce->open()) {
eths_mcast << ce;
CONNECT2(void, const uchar *, ssize_t, ce, threadedReadEvent, this, mbcastRead);
ce->startThreadedRead();
// piCout << "mcast bind to" << a << ce->sendIP();
} else {
delete ce;
// piCoutObj << "invalid address for mcast" << a;
}
}
al.removeAll(_PIPEER_MULTICAST_IP);
for (const auto & a: al) {
ce = new PIEthernet();
ce->setDebug(false);
ce->setName("_S.PIPeer.bcast_" + a);
ce->setParameters(PIEthernet::Broadcast);
cint = prev_ifaces.getByAddress(a);
nm = (cint == 0) ? "255.255.255.0" : cint->netmask;
ce->setSendAddress(PIEthernet::getBroadcast(a, nm), _PIPEER_BROADCAST_PORT);
ce->setReadAddress(a, _PIPEER_BROADCAST_PORT);
if (ce->open()) {
eths_bcast << ce;
CONNECT2(void, const uchar *, ssize_t, ce, threadedReadEvent, this, mbcastRead);
ce->startThreadedRead();
// piCout << "mc BC try" << a << nm << ce->sendIP();
// piCout << "bcast bind to" << a << nm;
} else {
delete ce;
// piCoutObj << "invalid address for bcast" << a;
}
}
eth_lo.setName("_S.PIPeer.lo");
eth_lo.setParameters(PIEthernet::SeparateSockets);
eth_lo.init();
cint = prev_ifaces.getByAddress("127.0.0.1");
for (int p = _PIPEER_LOOPBACK_PORT_S; p <= _PIPEER_LOOPBACK_PORT_E; ++p) {
eth_lo.setReadAddress("127.0.0.1", p);
if (eth_lo.open()) {
eth_lo.setSendIP("127.0.0.1");
CONNECT2(void, const uchar *, ssize_t, ð_lo, threadedReadEvent, this, mbcastRead);
eth_lo.startThreadedRead();
// piCout << "lo binded to" << eth_lo.readAddress() << eth_lo.sendAddress();
// piCout << "add eth" << ta;
break;
}
}
eth_tcp_srv.setName("_S.PIPeer.TCP_Server");
eth_tcp_srv.init();
eth_tcp_srv.listen("0.0.0.0", _PIPEER_TCP_PORT, true);
eth_tcp_srv.setDebug(false);
CONNECT1(void, PIEthernet *, ð_tcp_srv, newConnection, this, newTcpClient);
eth_tcp_srv.startThreadedRead();
eth_tcp_cli.setName("_S.PIPeer.TCP_Client");
eth_tcp_cli.init();
eth_tcp_cli.setDebug(false);
tcpClientReconnect();
CONNECT2(void, const uchar *, ssize_t, ð_tcp_cli, threadedReadEvent, this, mbcastRead);
CONNECTU(ð_tcp_cli, disconnected, this, tcpClientReconnect);
eth_tcp_cli.startThreadedRead();
if (eths_mcast.isEmpty() && eths_bcast.isEmpty() && !eth_lo.isOpened())
piCoutObj << "Warning! Can`t find suitable network interface for multicast receive, check for exists at least one interface with "
"multicasting enabled!";
// piCoutObj << "initMBcasts ok";
}
void PIPeer::destroy() {
sync_timer.stopAndWait();
diag_s.stopAndWait();
diag_d.stopAndWait();
PIMutexLocker ml(peers_mutex);
for (auto & p: peers)
if (p._data) {
p._data->dt_in.stop();
p._data->dt_out.stop();
p._data->t.stopAndWait();
}
destroyEths();
for (auto * i: eths_mcast) {
if (!i) continue;
i->stopAndWait();
}
for (auto * i: eths_bcast) {
if (!i) continue;
i->stopAndWait();
}
eth_lo.stopAndWait();
eth_tcp_srv.stopAndWait();
eth_tcp_cli.stopAndWait();
sendSelfRemove();
eth_lo.close();
eth_tcp_srv.close();
eth_tcp_cli.close();
destroyMBcasts();
eth_send.close();
for (auto & p: peers)
p.destroy();
peers.clear();
destroyed = true;
}
void PIPeer::destroyEths() {
for (auto * i: eths_traffic) {
if (!i) continue;
i->stopAndWait();
i->close();
}
piDeleteAllAndClear(eths_traffic);
}
void PIPeer::destroyMBcasts() {
for (auto * i: eths_mcast) {
if (!i) continue;
i->leaveMulticastGroup(_PIPEER_MULTICAST_IP);
i->stopAndWait();
i->close();
}
for (auto * i: eths_bcast) {
if (!i) continue;
i->stopAndWait();
i->close();
}
piDeleteAllAndClear(eths_mcast);
piDeleteAllAndClear(eths_bcast);
eth_lo.stopAndWait();
eth_lo.close();
eth_tcp_srv.stopAndWait();
}
PIPeer::PeerInfo * PIPeer::quickestPeer(const PIString & to) {
if (!peers_map.contains(to)) return 0;
// piCout << "*** search quickest peer" << to;
PIVector tp = addresses_map.value(to);
if (tp.isEmpty()) return 0;
return tp.back();
}
bool PIPeer::send(const PIString & to, const void * data, int size) {
PIByteArray ba(data, size);
// piCoutObj << "send" << ba.size_s() << "bytes" << _PIPEER_MSG_SIZE;
if (ba.size_s() <= _PIPEER_MSG_SIZE) {
ba.insert(0, uchar(1));
return sendInternal(to, ba);
} else {
// ba.insert(0, uchar(2));
PIMutexLocker mlocker(peers_mutex);
PeerInfo * dp = const_cast(getPeerByName(to));
if (!dp) return false;
return dp->_data->send(ba);
}
return true;
}
bool PIPeer::send(const PeerInfo * to, const PIByteArray & data) {
if (!to) return false;
return send(to->name, data.data(), data.size_s());
}
bool PIPeer::send(const PeerInfo * to, const PIString & data) {
if (!to) return false;
return send(to->name, data.data(), data.size_s());
}
bool PIPeer::send(const PeerInfo * to, const void * data, int size) {
if (!to) return false;
return send(to->name, data, size);
}
void PIPeer::sendToAll(const PIByteArray & data) {
for (const auto & i: peers)
send(i.name, data.data(), data.size_s());
}
void PIPeer::sendToAll(const PIString & data) {
for (const auto & i: peers)
send(i.name, data.data(), data.size_s());
}
void PIPeer::sendToAll(const void * data, int size) {
for (const auto & i: peers)
send(i.name, data, size);
}
bool PIPeer::sendInternal(const PIString & to, const PIByteArray & data) {
PIMutexLocker mlocker(peers_mutex);
PeerInfo * dp = quickestPeer(to);
if (dp == 0) {
// piCoutObj << "Can`t find peer \"" << to << "\"!";
return false;
}
PIByteArray ba;
ba << int(4) << self_info.name << to << int(0) << data;
// piCoutObj << "sendInternal to" << to << data.size_s() << int(data.front());
if (!sendToNeighbour(dp, ba)) {
// piCoutObj << "send error";
return false;
}
return true;
}
void PIPeer::dtReceived(const PIString & from, const PIByteArray & data) {
PIByteArray ba = data;
dataReceived(from, data);
dataReceivedEvent(from, data);
if (trust_peer.isEmpty() || trust_peer == from) {
read_buffer_mutex.lock();
if (read_buffer.size_s() < read_buffer_size) read_buffer.enqueue(ba);
read_buffer_mutex.unlock();
}
}
bool PIPeer::dataRead(const uchar * readed, ssize_t size) {
if (destroyed) {
// piCout << "[PIPeer] SegFault";
return true;
}
if (size < 16) return true;
PIByteArray ba(readed, size), sba, pba;
int type, cnt;
PIString from, to;
ba >> type;
eth_mutex.lock();
// piCout << "dataRead lock";
if (type == 5) { // ping request
PINetworkAddress addr;
PISystemTime time;
ba >> to >> from >> addr >> time;
// piCout << "ping request" << to << from << addr;
PIMutexLocker plocker(peers_mutex);
if (from == self_info.name) { // send ping back
const PeerInfo * pi = getPeerByName(to);
if (pi) {
if (pi->isNeighbour()) {
sba << int(6) << to << from << addr << time;
// piCout << " ping from" << from << addr << ", send back to" << pi->name;
send_mutex.lock();
for (const auto & a: pi->addresses) {
if (eth_send.send(a.address, sba)) diag_s.received(sba.size_s());
}
send_mutex.unlock();
}
}
}
eth_mutex.unlock();
return true;
}
if (type == 6) { // ping request
PINetworkAddress addr;
PISystemTime time, ptime, ctime = PISystemTime::current(true);
ba >> to >> from >> addr >> time;
// piCout << "ping reply" << to << from << addr;
PIMutexLocker plocker(peers_mutex);
if (to == self_info.name) { // ping echo
for (auto & p: peers) {
if (!p.isNeighbour()) continue;
if (p.name != from) continue;
for (auto & a: p.addresses) {
if (a.address != addr) continue;
if (a.last_ping >= time) break;
ptime = ctime - time;
a.last_ping = time;
a.wait_ping = false;
if (a.ping < 0)
a.ping = ptime.toMilliseconds();
else
a.ping = 0.6 * a.ping + 0.4 * ptime.toMilliseconds();
// piCout << " ping echo" << p.name << a.address << a.ping;
eth_mutex.unlock();
return true;
}
}
}
eth_mutex.unlock();
return true;
}
// piCoutObj << "received data from" << from << "packet" << type;
if (type != 4) {
eth_mutex.unlock();
return true;
}
diag_d.received(size);
ba >> from >> to >> cnt >> pba;
// piCoutObj << "Received packet" << type << from << to << pba.size_s();
if (type == 4) { // data packet
if (to == self_info.name) { // my packet
uchar pt = pba.take_front();
// piCoutObj << "Received packet" << type << from << to << int(pt) << pba.size_s();
peers_mutex.lock();
PeerInfo * fp = const_cast(getPeerByName(from));
if (fp == 0) {
peers_mutex.unlock();
eth_mutex.unlock();
return true;
}
if (pt == 1) {
peers_mutex.unlock();
eth_mutex.unlock();
dtReceived(from, pba);
return true;
}
if (pt == 2 || pt == 3) {
peers_mutex.unlock();
eth_mutex.unlock();
if (fp->_data) fp->_data->receivedPacket(pt, pba);
return true;
}
peers_mutex.unlock();
eth_mutex.unlock();
return true;
}
PIMutexLocker plocker(peers_mutex);
PeerInfo * dp = quickestPeer(to);
if (dp == 0) {
// piCoutObj << "Can`t find peer \"" << to << "\"!";
eth_mutex.unlock();
return true;
}
cnt++;
if (cnt > _PIPEER_MSG_TTL || from == dp->name) return true;
sba << type << from << to << cnt << pba;
// piCout << "translate packet" << from << "->" << to << ", ttl =" << cnt;
sendToNeighbour(dp, sba);
}
eth_mutex.unlock();
return true;
}
bool PIPeer::mbcastRead(const uchar * data, ssize_t size) {
if (destroyed) {
// piCout << "[PIPeer] SegFault";
return true;
}
if (size < 8) return true;
int type, dist;
PIByteArray ba(data, size);
ba >> type;
if (type <= 0 || type >= 4) return true;
PeerInfo pi;
ba >> pi.name;
// piCout << "received mb from" << pi.name << "packet" << type;
if (pi.name == self_info.name) return true;
PIMutexLocker locker(mc_mutex);
diag_s.received(size);
const PeerInfo * rpi = 0;
bool ch = false;
PIVector rpeers;
// piCout << "analyz ...";
switch (type) {
case 1: // new peer
// piCout << "new peer packet ...";
peers_mutex.lock();
if (!hasPeer(pi.name)) {
ba >> pi;
pi.sync = 0;
if (pi.dist == 0) {
pi.addNeighbour(self_info.name);
self_info.addNeighbour(pi.name);
}
pi.resetPing();
addPeer(pi);
buildMap();
// piCoutObj << "new peer \"" << pi.name << "\"" << " dist " << pi.dist;
// piCoutObj << mode() << opened_;
pi.dist++;
sendSelfInfo();
sendPeerInfo(pi);
ch = true;
// piCout << "new peer packet ok";
}
peers_mutex.unlock();
if (ch) {
peerConnected(pi.name);
peerConnectedEvent(pi.name);
}
break;
case 2: // remove peer
// piCout << "remove peer packet ..." << pi.name;
peers_mutex.lock();
removeNeighbour(pi.name);
rpi = getPeerByName(pi.name);
if (rpi) {
dist = rpi->dist;
addToRemoved(*rpi);
removePeer(pi.name);
// piCoutObj << "remove peer \"" << pi.name << "\"";
if (dist == 0) self_info.removeNeighbour(pi.name);
sendPeerRemove(pi.name);
buildMap();
ch = true;
// piCout << "remove peer packet ok";
}
peers_mutex.unlock();
if (ch) {
peerDisconnected(pi.name);
peerDisconnectedEvent(pi.name);
}
break;
case 3: // sync peers
// piCout << "sync packet ...";
ba >> pi >> rpeers;
rpeers << pi;
// piCoutObj << "rec sync " << rpeers.size_s() << " peers";
peers_mutex.lock();
if (!self_info.neighbours.contains(pi.name)) {
// piCoutObj << "add new nei to me" << pi.name;
self_info.addNeighbour(pi.name);
PeerInfo * np = peers_map.value(pi.name);
if (np) {
np->addNeighbour(self_info.name);
np->dist = 0;
}
ch = true;
}
for (auto & rpeer: rpeers) {
// piCout << " to sync " << rpeer.name;
if (rpeer.name == self_info.name) continue;
bool exist = false;
for (auto & peer: peers) {
if (peer.name == rpeer.name) {
exist = true;
if (isPeerRecent(peer, rpeer)) {
// piCout << "synced " << peer.name;
for (int z = 0; z < rpeer.addresses.size_s(); ++z) {
PeerInfo::PeerAddress & ra(rpeer.addresses[z]);
for (int k = 0; k < peer.addresses.size_s(); ++k) {
PeerInfo::PeerAddress & a(peer.addresses[k]);
if (ra.address == a.address) {
ra.ping = a.ping;
ra.wait_ping = a.wait_ping;
ra.last_ping = a.last_ping;
break;
}
}
}
peer.was_update = true;
peer.addresses = rpeer.addresses;
peer.cnt = rpeer.cnt;
peer.time = rpeer.time;
peer.addNeighbours(rpeer.neighbours);
rpeer.neighbours = peer.neighbours;
if (peer.name == pi.name) peer.sync = 0;
ch = true;
}
break;
}
}
if (exist || isRemoved(rpeer)) continue;
rpeer.dist++;
if (rpeer.name == pi.name) rpeer.dist = 0;
rpeer.resetPing();
addPeer(rpeer);
ch = true;
peerConnected(rpeer.name);
peerConnectedEvent(rpeer.name);
}
// piCout << "***";
// piCout << self_info.name << self_info.neighbours;
for (auto & i: peers) {
if (i.dist == 0) {
self_info.addNeighbour(i.name);
i.addNeighbour(self_info.name);
}
// piCout << i.name << i.neighbours;
}
if (ch) buildMap();
peers_mutex.unlock();
// piCoutObj << "after sync " << peers.size_s() << " peers";
break;
}
return true;
}
bool PIPeer::sendToNeighbour(PIPeer::PeerInfo * peer, const PIByteArray & ba) {
PINetworkAddress addr = peer->fastestAddress();
// piCout << "[PIPeer] sendToNeighbour" << peer->name << addr << ba.size_s() << "bytes ...";
send_mutex.lock();
bool ok = eth_send.send(addr, ba);
// piCout << "[PIPeer] sendToNeighbour" << (ok ? "ok" : "fail");
if (ok) diag_d.sended(ba.size_s());
send_mutex.unlock();
return ok;
}
void PIPeer::sendMBcast(const PIByteArray & ba) {
send_mc_mutex.lock();
// piCout << "sendMBcast" << ba.size() << "bytes ...";
for (auto * e: eths_mcast) {
if (e->isOpened())
if (e->send(ba)) diag_s.sended(ba.size_s());
}
for (auto * e: eths_bcast) {
if (e->isOpened())
if (e->send(ba)) diag_s.sended(ba.size_s());
}
for (int p = _PIPEER_LOOPBACK_PORT_S; p <= _PIPEER_LOOPBACK_PORT_E; ++p) {
eth_lo.setSendPort(p);
if (eth_lo.send(ba)) diag_s.sended(ba.size_s());
}
PIVector cl = eth_tcp_srv.clients();
for (auto * e: cl) {
if (e->isOpened() && e->isConnected())
if (e->send(ba)) diag_s.sended(ba.size_s());
}
if (eth_tcp_cli.isOpened() && eth_tcp_cli.isConnected()) {
if (eth_tcp_cli.send(ba)) diag_s.sended(ba.size_s());
}
// piCout << "sendMBcast ok";
send_mc_mutex.unlock();
}
void PIPeer::removeNeighbour(const PIString & name) {
for (auto & p: peers)
p.neighbours.removeOne(name);
self_info.removeNeighbour(name);
}
void PIPeer::addPeer(const PIPeer::PeerInfo & pd) {
peers << pd;
PeerInfo & p(peers.back());
p.init();
CONNECT2(void, const PIString &, const PIByteArray &, p._data, sendRequest, this, sendInternal)
CONNECT2(void, const PIString &, const PIByteArray &, p._data, received, this, dtReceived)
}
bool PIPeer::removePeer(const PIString & name) {
for (int i = 0; i < peers.size_s(); ++i)
if (peers[i].name == name) {
peers[i].destroy();
peers.remove(i);
return true;
}
return false;
}
void PIPeer::sendPeerInfo(const PeerInfo & info) {
PIByteArray ba;
ba << int(1) << info.name << info;
sendMBcast(ba);
}
void PIPeer::sendPeerRemove(const PIString & peer) {
// piCout << name() << "sendPeerRemove" << peer;
PIByteArray ba;
ba << int(2) << peer;
sendMBcast(ba);
}
void PIPeer::pingNeighbours() {
PIMutexLocker ml(peers_mutex);
PIByteArray ba, sba;
ba << int(5) << self_info.name;
// piCoutObj << "*** pingNeighbours" << peers.size() << "...";
for (auto & p: peers) {
if (!p.isNeighbour()) continue;
// piCout << " ping neighbour" << p.name << p.ping();
send_mutex.lock();
for (auto & a: p.addresses) {
// piCout << " address" << a.address << a.wait_ping;
if (a.wait_ping) {
if ((PISystemTime::current(true) - a.last_ping).abs().toSeconds() <= _PIPEER_PING_TIMEOUT) continue;
a.ping = -1.;
}
a.wait_ping = true;
sba = ba;
sba << p.name << a.address << PISystemTime::current(true);
// piCout << "ping" << p.name << a.address << a.last_ping;
if (eth_send.send(a.address, sba)) diag_s.sended(sba.size_s());
}
send_mutex.unlock();
}
// piCout << "*** pingNeighbours" << peers.size() << "done";
}
bool PIPeer::openDevice() {
PIConfig conf(
#ifndef WINDOWS
"/etc/pip.conf"
#else
"pip.conf"
#endif
,
PIIODevice::ReadOnly);
server_ip = conf.getValue("peer_server_ip", "").toString();
reinit();
diag_d.reset();
diag_s.reset();
// piCoutObj << "open...";
return true;
}
bool PIPeer::closeDevice() {
return false;
}
void PIPeer::syncPeers() {
// piCout << "[PIPeer \"" + self_info.name + "\"] sync " << peers.size_s() << " peers";
PIMutexLocker locker(eth_mutex);
// piCout << "syncPeers lock";
PIString pn;
bool change = false;
PIStringList dpeers;
peers_mutex.lock();
for (int i = 0; i < peers.size_s(); ++i) {
PeerInfo & cp(peers[i]);
if (cp.sync > 3) {
pn = cp.name;
// piCout << "sync: remove " << pn;
cp.destroy();
addToRemoved(cp);
peers.remove(i);
sendPeerRemove(pn);
--i;
removeNeighbour(pn);
dpeers << pn;
change = true;
continue;
}
if (cp.was_update)
cp.sync = 0;
else
cp.sync++;
if (cp._data) cp._data->setDist(cp.dist + 1);
cp.was_update = false;
}
if (change) buildMap();
self_info.cnt++;
self_info.time = PISystemTime::current();
PIByteArray ba;
ba << int(3) << self_info.name << self_info << peers;
peers_mutex.unlock();
sendMBcast(ba);
for (const auto & p: dpeers) {
peerDisconnected(p);
peerDisconnectedEvent(p);
}
}
void PIPeer::checkNetwork() {
PIEthernet::InterfaceList ifaces = PIEthernet::interfaces();
if (prev_ifaces == ifaces) return;
prev_ifaces = ifaces;
reinit();
}
void PIPeer::reinit() {
no_timer = true;
PIMutexLocker mbl(mc_mutex);
PIMutexLocker ethl(eth_mutex);
// piCout << "reinit lock";
PIMutexLocker pl(peers_mutex);
PIMutexLocker sl(send_mutex);
initNetwork();
sendSelfInfo();
no_timer = false;
if (!sync_timer.isRunning()) sync_timer.start(1_Hz);
}
void PIPeer::changeName(const PIString & new_name) {
PIString name_ = new_name;
if (name_.isEmpty()) name_ = "rnd_" + PIString::fromNumber(randomi() % 1000);
setName(name_);
self_info.name = name_;
diag_d.setName(name_ + "_data");
diag_s.setName(name_ + "_service");
}
void PIPeer::setTcpServerIP(const PIString & ip) {
server_ip = ip;
tcpClientReconnect();
}
ssize_t PIPeer::bytesAvailable() const {
ssize_t ret = 0;
read_buffer_mutex.lock();
if (!read_buffer.isEmpty()) ret = read_buffer.back().size();
read_buffer_mutex.unlock();
return ret;
}
ssize_t PIPeer::readDevice(void * read_to, ssize_t max_size) {
iterrupted = false;
read_buffer_mutex.lock();
bool empty = read_buffer.isEmpty();
read_buffer_mutex.unlock();
while (empty) {
read_buffer_mutex.lock();
empty = read_buffer.isEmpty();
read_buffer_mutex.unlock();
if (iterrupted) {
return 0;
}
piMSleep(10);
}
read_buffer_mutex.lock();
if (!read_buffer.isEmpty()) {
PIByteArray ba = read_buffer.dequeue();
read_buffer_mutex.unlock();
ssize_t sz = piMini(ba.size_s(), max_size);
memcpy(read_to, ba.data(), sz);
if (iterrupted) {
return 0;
}
return sz;
}
read_buffer_mutex.unlock();
return 0;
}
ssize_t PIPeer::writeDevice(const void * data, ssize_t size) {
if (trust_peer.isEmpty()) {
sendToAll(data, size);
return size;
}
if (send(trust_peer, data, size))
return size;
else
return -1;
}
void PIPeer::interrupt() {
iterrupted = true;
}
void PIPeer::newTcpClient(PIEthernet * client) {
client->setName("_S.PIPeer.TCP_ServerClient" + client->path());
piCoutObj << "client" << client->path();
CONNECT2(void, const uchar *, ssize_t, client, threadedReadEvent, this, mbcastRead);
client->startThreadedRead();
}
PIString PIPeer::constructFullPathDevice() const {
PIString ret;
ret += self_info.name + ":" + trustPeerName();
return ret;
}
void PIPeer::configureFromFullPathDevice(const PIString & full_path) {
PIStringList pl = full_path.split(":");
for (int i = 0; i < pl.size_s(); ++i) {
PIString p(pl[i]);
switch (i) {
case 0: changeName(p); break;
case 1: setTrustPeerName(p); break;
}
}
}
PIPropertyStorage PIPeer::constructVariantDevice() const {
PIPropertyStorage ret;
ret.addProperty("name", self_info.name);
ret.addProperty("trust peer", trustPeerName());
return ret;
}
void PIPeer::configureFromVariantDevice(const PIPropertyStorage & d) {
changeName(d.propertyValueByName("name").toString());
setTrustPeerName(d.propertyValueByName("trust peer").toString());
}
void PIPeer::initNetwork() {
// piCoutObj << "initNetwork ...";
eth_send.init();
destroyEths();
destroyMBcasts();
piMSleep(100);
// piCoutObj << self_info.addresses.size();
self_info.addresses.clear();
PIVector al = PIEthernet::allAddresses();
PIStringList sl;
for (const PINetworkAddress & a: al) {
sl << a.ipString();
}
initEths(sl);
// piCoutObj << sl << self_info.addresses.size();
sl.removeAll("127.0.0.1");
initMBcasts(sl);
diag_d.start();
diag_s.start();
// piCoutObj << "initNetwork done";
}
void PIPeer::buildMap() {
// piCout << "[PIPeer \"" + name_ + "\"] buildMap";
peers_map.clear();
addresses_map.clear();
for (auto & i: peers) {
i.trace = -1;
peers_map[i.name] = &i;
}
PIVector cwave, nwave;
int cwi = 0;
self_info.trace = 0;
cwave << &self_info;
while (!cwave.isEmpty()) {
nwave.clear();
++cwi;
for (const auto * p: cwave) {
for (const auto & nn: p->neighbours) {
PeerInfo * np = peers_map.value(nn);
if (!np) continue;
if (np->trace >= 0) continue;
np->trace = cwi;
nwave << np;
}
}
cwave = nwave;
}
PIVector cpath;
for (auto & c: peers) {
cpath.clear();
cpath << &c;
cwave << &c;
for (int w = c.trace - 1; w >= 0; --w) {
nwave.clear();
for (const auto * p: cwave) {
for (const auto & nn: p->neighbours) {
PeerInfo * np = peers_map.value(nn);
if (!np) continue;
if (np->trace != w) continue;
cpath << np;
nwave << np;
}
}
cwave = nwave;
}
addresses_map[c.name] = cpath;
// piCout << "map" << c.name << "=" << cpath;
}
}
void PIPeer::tcpClientReconnect() {
eth_tcp_cli.connect(server_ip, _PIPEER_TCP_PORT);
}
bool PIPeer::hasPeer(const PIString & name) {
for (const auto & i: peers)
if (i.name == name) return true;
return false;
}