/*
PIP - Platform Independent Primitives
Peer - named I/O ethernet node
Copyright (C) 2019 Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
*/
#include "pipeer.h"
#include "piconfig.h"
#include "pidatatransfer.h"
#include "pipropertystorage.h"
#define _PIPEER_MSG_SIZE 4000
#define _PIPEER_MSG_TTL 100
#define _PIPEER_MULTICAST_TTL 4
#define _PIPEER_MULTICAST_IP "232.13.3.12"
#define _PIPEER_LOOPBACK_PORT_S 13313
#define _PIPEER_LOOPBACK_PORT_E (13313+32)
#define _PIPEER_MULTICAST_PORT 13360
#define _PIPEER_TCP_PORT _PIPEER_MULTICAST_PORT
#define _PIPEER_BROADCAST_PORT 13361
#define _PIPEER_TRAFFIC_PORT_S 13400
#define _PIPEER_TRAFFIC_PORT_E 14000
#define _PIPEER_PING_TIMEOUT 5.0
class PIPeer::PeerData: public PIObject {
PIOBJECT_SUBCLASS(PeerData, PIObject)
public:
PeerData(const PIString & n);
~PeerData();
EVENT_HANDLER1(void, dtSendRequestIn, PIByteArray &, data) {data.push_front(uchar(2)); sendRequest(name(), data);}
EVENT_HANDLER1(void, dtSendRequestOut, PIByteArray &, data) {data.push_front(uchar(3)); sendRequest(name(), data);}
EVENT_HANDLER1(void, dtReceiveFinishedIn, bool, ok) {if (ok) received(name(), dt_in.data());}
EVENT_HANDLER1(void, dtReceiveFinishedOut, bool, ok) {if (ok) received(name(), dt_out.data());}
EVENT_HANDLER(void, dtThread);
EVENT2(received, const PIString &, from, const PIByteArray &, data)
EVENT2(sendRequest, const PIString &, to, const PIByteArray &, data)
bool send(const PIByteArray & d);
void receivedPacket(uchar type, const PIByteArray & d);
void setDist(int dist);
PIByteArray data;
PIThread t;
PIDataTransfer dt_in, dt_out;
};
PIPeer::PeerData::PeerData(const PIString & n): PIObject(n) {
dt_in.setPacketSize(_PIPEER_MSG_SIZE);
dt_out.setPacketSize(_PIPEER_MSG_SIZE);
dt_in.setCRCEnabled(false);
dt_out.setCRCEnabled(false);
CONNECTU(&dt_in, sendRequest, this, dtSendRequestIn)
CONNECTU(&dt_out, sendRequest, this, dtSendRequestOut)
CONNECTU(&dt_in, receiveFinished, this, dtReceiveFinishedIn)
CONNECTU(&dt_out, receiveFinished, this, dtReceiveFinishedOut)
CONNECTU(&t, started, this, dtThread)
}
PIPeer::PeerData::~PeerData() {
dt_in.stop();
dt_out.stop();
t.stop();
if (!t.waitForFinish(1000))
t.terminate();
}
void PIPeer::PeerData::dtThread() {
// << "send DT ...";
dt_out.send(data);
//piCoutObj << "send DT done";
}
bool PIPeer::PeerData::send(const PIByteArray & d) {
//piCout << "send ..." << t.isRunning();
if (t.isRunning()) return false;
data = d;
t.startOnce();
return true;
}
void PIPeer::PeerData::receivedPacket(uchar type, const PIByteArray & d) {
PIDataTransfer * dt = 0;
if (type == 3)
dt = &dt_in;
if (type == 2)
dt = &dt_out;
//piCoutObj << "DT received" << int(type) << d.size_s() << "...";
if (dt) dt->received(d);
//piCoutObj << "DT received" << int(type) << d.size_s() << "done";
}
void PIPeer::PeerData::setDist(int dist) {
dt_in.setTimeout(10 * dist);
}
PIPeer::PeerInfo::PeerAddress::PeerAddress(const PIEthernet::Address & a, const PIEthernet::Address & m): address(a), netmask(m) {
ping = -1.;
wait_ping = false;
last_ping = PISystemTime::current(true);
}
int PIPeer::PeerInfo::ping() const {
int ret = -1;
piForeachC (PeerAddress & a, addresses)
if (a.ping > 0.) {
if (ret < 0) ret = piRoundd(a.ping);
else ret = piMini(ret, piRoundd(a.ping));
}
return ret;
}
void PIPeer::PeerInfo::init() {
if (_data == 0) _data = new PeerData(name);
}
void PIPeer::PeerInfo::destroy() {
if (_data) delete _data;
_data = 0;
}
PIEthernet::Address PIPeer::PeerInfo::fastestAddress() const {
double mp = -1.;
PIEthernet::Address ret;
piForeachC (PeerAddress & a, addresses) {
if (a.ping <= 0.) continue;
if ((mp < 0) || (mp > a.ping)) {
mp = a.ping;
ret = a.address;
}
}
return ret;
}
REGISTER_DEVICE(PIPeer)
PIPeer::PIPeer(const PIString & n): PIIODevice(), inited__(false), eth_tcp_srv(PIEthernet::TCP_Server), eth_tcp_cli(PIEthernet::TCP_Client), diag_s(false), diag_d(false) {
//piCout << " PIPeer" << uint(this);
destroyed = false;
setDebug(false);
PIMutexLocker mbl(mc_mutex);
PIMutexLocker ethl(eth_mutex);
PIMutexLocker pl(peers_mutex);
PIMutexLocker sl(send_mutex);
changeName(n);
setReopenTimeout(100);
read_buffer_size = 128;
self_info.dist = 0;
self_info.time = PISystemTime::current();
//joinMulticastGroup("239.240.241.242");
randomize();
//id_ = self_info.name + "_" + PIString::fromNumber(randomi());
CONNECTU(&sync_timer, tickEvent, this, timerEvent);
prev_ifaces = PIEthernet::interfaces();
no_timer = false;
// initNetwork();
sync_timer.addDelimiter(5);
}
PIPeer::~PIPeer() {
//piCout << "~PIPeer" << uint(this);
if (destroyed) return;
destroyed = true;
PIMutexLocker ml(peers_mutex);
piForeach (PeerInfo & p, peers)
if (p._data) {
p._data->dt_in.stop();
p._data->dt_out.stop();
p._data->t.stop(true);
}
sync_timer.stop();
diag_s.stop();
diag_d.stop();
destroyEths();
piForeach (PIEthernet * i, eths_mcast) {
if (!i) continue;
i->stopThreadedRead();
}
piForeach (PIEthernet * i, eths_bcast) {
if (!i) continue;
i->stopThreadedRead();
}
eth_lo.stopThreadedRead();
eth_tcp_srv.stopThreadedRead();
eth_tcp_cli.stopThreadedRead();
sendSelfRemove();
destroyMBcasts();
eth_send.close();
piForeach (PeerInfo & p, peers)
p.destroy();
}
void PIPeer::timerEvent(void * data, int delim) {
// piCoutObj << "timerEvent" << delim;
if (no_timer) return;
switch (delim) {
case 1: // every 1 s
syncPeers();
piMSleep(100);
pingNeighbours();
//piCoutObj << "isOpened" << isOpened();
break;
case 5: // every 5 s
checkNetwork();
break;
}
//send("broadcast", 9);
}
void PIPeer::initEths(PIStringList al) {
// piCoutObj << "initEths start";
PIEthernet * ce;
const PIEthernet::Interface * cint = 0;
piForeachC (PIString & a, al) {
ce = new PIEthernet();
ce->setDebug(false);
ce->setName("__S__PIPeer_traffic_eth_rec_" + a);
ce->setParameters(0);
bool ok = false;
for (int p = _PIPEER_TRAFFIC_PORT_S; p < _PIPEER_TRAFFIC_PORT_E; ++p) {
ce->setReadAddress(a, p);
if (ce->open()) {
eths_traffic << ce;
cint = prev_ifaces.getByAddress(a);
self_info.addresses << PeerInfo::PeerAddress(ce->path(), cint == 0 ? "255.255.255.0" : cint->netmask);
CONNECTU(ce, threadedReadEvent, this, dataRead);
ce->startThreadedRead();
// piCoutObj << "dc binded to" << ce->path();
// piCoutObj << "add eth" << a;
ok = true;
break;
}
}
if (!ok) delete ce;
}
eth_send.setDebug(false);
eth_send.setName("__S__PIPeer_traffic_eth_send");
eth_send.setParameters(0);
// piCoutObj << "initEths ok";
}
void PIPeer::initMBcasts(PIStringList al) {
// destroyMBcasts();
PIEthernet * ce;
const PIEthernet::Interface * cint;
PIString nm;
al << _PIPEER_MULTICAST_IP;
// piCoutObj << "initMBcasts start" << al;
piForeachC (PIString & a, al) {
//piCout << "mcast try" << a;
ce = new PIEthernet();
ce->setDebug(false);
ce->setName("__S__PIPeer_mcast_eth_" + a);
ce->setParameters(0);
ce->setSendAddress(_PIPEER_MULTICAST_IP, _PIPEER_MULTICAST_PORT);
ce->setReadAddress(a, _PIPEER_MULTICAST_PORT);
ce->setMulticastTTL(_PIPEER_MULTICAST_TTL);
ce->joinMulticastGroup(_PIPEER_MULTICAST_IP);
if (ce->open()) {
eths_mcast << ce;
CONNECTU(ce, threadedReadEvent, this, mbcastRead);
ce->startThreadedRead();
// piCout << "mcast bind to" << a << ce->sendIP();
} else {
delete ce;
//piCoutObj << "invalid address for mcast" << a;
}
}
al.removeAll(_PIPEER_MULTICAST_IP);
piForeachC (PIString & a, al) {
ce = new PIEthernet();
ce->setDebug(false);
ce->setName("__S__PIPeer_bcast_eth_" + a);
ce->setParameters(PIEthernet::Broadcast);
cint = prev_ifaces.getByAddress(a);
nm = (cint == 0) ? "255.255.255.0" : cint->netmask;
ce->setSendAddress(PIEthernet::getBroadcast(a, nm), _PIPEER_BROADCAST_PORT);
ce->setReadAddress(a, _PIPEER_BROADCAST_PORT);
if (ce->open()) {
eths_bcast << ce;
CONNECTU(ce, threadedReadEvent, this, mbcastRead);
ce->startThreadedRead();
// piCout << "mc BC try" << a << nm << ce->sendIP();
// piCout << "bcast bind to" << a << nm;
} else {
delete ce;
//piCoutObj << "invalid address for bcast" << a;
}
}
// eth_lo.setDebug(false);
eth_lo.setName("__S__PIPeer_eth_loopback");
eth_lo.setParameters(PIEthernet::SeparateSockets);
eth_lo.init();
cint = prev_ifaces.getByAddress("127.0.0.1");
for (int p = _PIPEER_LOOPBACK_PORT_S; p <= _PIPEER_LOOPBACK_PORT_E; ++p) {
eth_lo.setReadAddress("127.0.0.1", p);
if (eth_lo.open()) {
eth_lo.setSendIP("127.0.0.1");
CONNECTU(ð_lo, threadedReadEvent, this, mbcastRead);
eth_lo.startThreadedRead();
// piCout << "lo binded to" << eth_lo.readAddress() << eth_lo.sendAddress();
//piCout << "add eth" << ta;
break;
}
}
eth_tcp_srv.setName("__S__PIPeer_eth_TCP_Server");
eth_tcp_srv.init();
eth_tcp_srv.listen("0.0.0.0", _PIPEER_TCP_PORT, true);
eth_tcp_srv.setDebug(false);
CONNECTU(ð_tcp_srv, newConnection, this, newTcpClient);
eth_tcp_srv.startThreadedRead();
eth_tcp_cli.setName("__S__PIPeer_eth_TCP_Client");
eth_tcp_cli.init();
eth_tcp_cli.setDebug(false);
tcpClientReconnect();
CONNECTU(ð_tcp_cli, threadedReadEvent, this, mbcastRead);
CONNECTU(ð_tcp_cli, disconnected, this, tcpClientReconnect);
eth_tcp_cli.startThreadedRead();
if (eths_mcast.isEmpty() && eths_bcast.isEmpty() && !eth_lo.isOpened()) piCoutObj << "Warning! Can`t find suitable network interface for multicast receive, check for exists at least one interface with multicasting enabled!";
// piCoutObj << "initMBcasts ok";
}
void PIPeer::destroyEths() {
piForeach (PIEthernet * i, eths_traffic) {
if (!i) continue;
((PIThread*)i)->stop();
((PIThread*)i)->waitForFinish(100);
i->stopThreadedRead();
i->close();
delete i;
i = 0;
}
eths_traffic.clear();
}
void PIPeer::destroyMBcasts() {
piForeach (PIEthernet * i, eths_mcast) {
if (!i) continue;
((PIThread*)i)->stop();
((PIThread*)i)->waitForFinish(100);
i->stopThreadedRead();
i->leaveMulticastGroup(_PIPEER_MULTICAST_IP);
i->close();
delete i;
i = 0;
}
eths_mcast.clear();
piForeach (PIEthernet * i, eths_bcast) {
if (!i) continue;
((PIThread*)i)->stop();
((PIThread*)i)->waitForFinish(100);
i->stopThreadedRead();
i->close();
delete i;
i = 0;
}
eths_bcast.clear();
((PIThread*)ð_lo)->stop();
((PIThread*)ð_lo)->waitForFinish(100);
eth_lo.stopThreadedRead();
eth_lo.close();
eth_tcp_srv.stop();
}
PIPeer::PeerInfo * PIPeer::quickestPeer(const PIString & to) {
if (!peers_map.contains(to)) return 0;
//piCout << "*** search quickest peer" << to;
PIVector tp = addresses_map.value(to);
/*PeerInfo * dp = 0;
int mping = 0x7FFFFFFF;
for (int i = 0; i < tp.size_s(); ++i) {
int p = tp[i]->ping();
if (mping > p && p > 0) {
mping = p;
dp = tp[i];
}
}
//piCout << "*** search quickest peer: found" << (dp ? dp->name : "0");
return dp;*/
if (tp.isEmpty()) return 0;
return tp.back();
}
bool PIPeer::send(const PIString & to, const void * data, int size) {
PIByteArray ba(data, size);
// piCoutObj << "send" << ba.size_s() << "bytes" << _PIPEER_MSG_SIZE;
if (ba.size_s() <= _PIPEER_MSG_SIZE) {
ba.insert(0, uchar(1));
return sendInternal(to, ba);
} else {
//ba.insert(0, uchar(2));
PIMutexLocker mlocker(peers_mutex);
PeerInfo * dp = const_cast(getPeerByName(to));
if (!dp) return false;
return dp->_data->send(ba);
}
return true;
}
bool PIPeer::sendInternal(const PIString & to, const PIByteArray & data) {
PIMutexLocker mlocker(peers_mutex);
PeerInfo * dp = quickestPeer(to);
if (dp == 0) {
//piCoutObj << "Can`t find peer \"" << to << "\"!";
return false;
}
PIByteArray ba;
ba << int(4) << self_info.name << to << int(0) << data;
// piCoutObj << "sendInternal to" << to << data.size_s() << int(data.front());
if (!sendToNeighbour(dp, ba)) {
//piCoutObj << "send error";
return false;
}
return true;
}
void PIPeer::dtReceived(const PIString & from, const PIByteArray & data) {
PIByteArray ba = data;
dataReceived(from, data);
dataReceivedEvent(from, data);
if (trust_peer.isEmpty() || trust_peer == from) {
read_buffer_mutex.lock();
if (read_buffer.size_s() < read_buffer_size) read_buffer.enqueue(ba);
read_buffer_mutex.unlock();
}
}
bool PIPeer::dataRead(uchar * readed, int size) {
if (destroyed) {
//piCout << "[PIPeer] SegFault";
return true;
}
if (size < 16) return true;
PIByteArray ba(readed, size), sba, pba;
int type, cnt;
PIString from, to;
ba >> type;
// PIMutexLocker locker(eth_mutex);
eth_mutex.lock();
// piCout << "dataRead lock";
if (type == 5) { // ping request
PIEthernet::Address addr;
PISystemTime time;
ba >> to >> from >> addr >> time;
// piCout << "ping request" << to << from << addr;
PIMutexLocker plocker(peers_mutex);
if (from == self_info.name) { // send ping back
const PeerInfo * pi = getPeerByName(to);
if (pi) {
if (pi->isNeighbour()) {
sba << int(6) << to << from << addr << time;
// piCout << " ping from" << from << addr << ", send back to" << pi->name;
send_mutex.lock();
piForeachC (PeerInfo::PeerAddress & a, pi->addresses) {
if (eth_send.send(a.address, sba))
diag_s.received(sba.size_s());
}
send_mutex.unlock();
}
}
}
eth_mutex.unlock();
return true;
}
if (type == 6) { // ping request
PIEthernet::Address addr;
PISystemTime time, ptime, ctime = PISystemTime::current(true);
ba >> to >> from >> addr >> time;
// piCout << "ping reply" << to << from << addr;
PIMutexLocker plocker(peers_mutex);
if (to == self_info.name) { // ping echo
piForeach (PeerInfo & p, peers) {
if (!p.isNeighbour()) continue;
if (p.name != from) continue;
piForeach (PeerInfo::PeerAddress & a, p.addresses) {
if (a.address != addr) continue;
if (a.last_ping >= time) piBreak;
ptime = ctime - time;
a.last_ping = time;
a.wait_ping = false;
if (a.ping < 0) a.ping = ptime.toMilliseconds();
else a.ping = 0.6 * a.ping + 0.4 * ptime.toMilliseconds();
// piCout << " ping echo" << p.name << a.address << a.ping;
eth_mutex.unlock();
return true;
}
}
}
eth_mutex.unlock();
return true;
}
// piCoutObj << "received data from" << from << "packet" << type;
if (type != 4) {
eth_mutex.unlock();
return true;
}
diag_d.received(size);
ba >> from >> to >> cnt >> pba;
//piCoutObj << "Received packet" << type << from << to << pba.size_s();
if (type == 4) { // data packet
if (to == self_info.name) { // my packet
uchar pt = pba.take_front();
//piCoutObj << "Received packet" << type << from << to << int(pt) << pba.size_s();
peers_mutex.lock();
PeerInfo * fp = const_cast(getPeerByName(from));
if (fp == 0) {
peers_mutex.unlock();
eth_mutex.unlock();
return true;
}
if (pt == 1) {
peers_mutex.unlock();
eth_mutex.unlock();
dtReceived(from, pba);
return true;
}
if (pt == 2 || pt == 3) {
peers_mutex.unlock();
eth_mutex.unlock();
if (fp->_data)
fp->_data->receivedPacket(pt, pba);
return true;
}
peers_mutex.unlock();
eth_mutex.unlock();
return true;
}
PIMutexLocker plocker(peers_mutex);
PeerInfo * dp = quickestPeer(to);
if (dp == 0) {
//piCoutObj << "Can`t find peer \"" << to << "\"!";
eth_mutex.unlock();
return true;
}
cnt++;
if (cnt > _PIPEER_MSG_TTL || from == dp->name) return true;
sba << type << from << to << cnt << pba;
//piCout << "translate packet" << from << "->" << to << ", ttl =" << cnt;
sendToNeighbour(dp, sba);
}
eth_mutex.unlock();
return true;
}
bool PIPeer::mbcastRead(uchar * data, int size) {
if (destroyed) {
//piCout << "[PIPeer] SegFault";
return true;
}
if (size < 8) return true;
int type, dist;
PIByteArray ba(data, size);
ba >> type;
if (type <= 0 || type >= 4) return true;
PeerInfo pi;
ba >> pi.name;
// piCoutObj << "received mb from" << pi.name << "packet" << type;
if (pi.name == self_info.name) return true;
PIMutexLocker locker(mc_mutex);
diag_s.received(size);
const PeerInfo * rpi = 0;
bool ch = false;
PIVector rpeers;
//piCout << "analyz ...";
switch (type) {
case 1: // new peer
//piCout << "new peer packet ...";
peers_mutex.lock();
if (!hasPeer(pi.name)) {
ba >> pi;
pi.sync = 0;
if (pi.dist == 0) {
pi.addNeighbour(self_info.name);
self_info.addNeighbour(pi.name);
}
pi.resetPing();
addPeer(pi);
buildMap();
// piCoutObj << "new peer \"" << pi.name << "\"" << " dist " << pi.dist;
// piCoutObj << mode() << opened_;
pi.dist++;
sendSelfInfo();
sendPeerInfo(pi);
ch = true;
//piCout << "new peer packet ok";
}
peers_mutex.unlock();
if (ch) {
peerConnected(pi.name);
peerConnectedEvent(pi.name);
}
break;
case 2: // remove peer
//piCout << "remove peer packet ..." << pi.name;
peers_mutex.lock();
removeNeighbour(pi.name);
rpi = getPeerByName(pi.name);
if (rpi) {
dist = rpi->dist;
addToRemoved(*rpi);
removePeer(pi.name);
//piCoutObj << "remove peer \"" << pi.name << "\"";
if (dist == 0)
self_info.removeNeighbour(pi.name);
sendPeerRemove(pi.name);
buildMap();
ch = true;
//piCout << "remove peer packet ok";
}
peers_mutex.unlock();
if (ch) {
peerDisconnected(pi.name);
peerDisconnectedEvent(pi.name);
}
break;
case 3: // sync peers
//piCout << "sync packet ...";
ba >> pi >> rpeers;
rpeers << pi;
//piCoutObj << "rec sync " << rpeers.size_s() << " peers";
peers_mutex.lock();
if (!self_info.neighbours.contains(pi.name)) {
//piCoutObj << "add new nei to me" << pi.name;
self_info.addNeighbour(pi.name);
PeerInfo * np = peers_map.value(pi.name);
if (np) {
np->addNeighbour(self_info.name);
np->dist = 0;
}
ch = true;
}
piForeach (PeerInfo & rpeer, rpeers) {
//piCout << " to sync " << rpeer.name;
if (rpeer.name == self_info.name) continue;
bool exist = false;
piForeach (PeerInfo & peer, peers) {
if (peer.name == rpeer.name) {
exist = true;
if (isPeerRecent(peer, rpeer)) {
//piCout << "synced " << peer.name;
for (int z = 0; z < rpeer.addresses.size_s(); ++z) {
PeerInfo::PeerAddress & ra(rpeer.addresses[z]);
for (int k = 0; k < peer.addresses.size_s(); ++k) {
PeerInfo::PeerAddress & a(peer.addresses[k]);
if (ra.address == a.address) {
ra.ping = a.ping;
ra.wait_ping = a.wait_ping;
ra.last_ping = a.last_ping;
break;
}
}
}
peer.was_update = true;
peer.addresses = rpeer.addresses;
peer.cnt = rpeer.cnt;
peer.time = rpeer.time;
peer.addNeighbours(rpeer.neighbours);
rpeer.neighbours = peer.neighbours;
if (peer.name == pi.name) peer.sync = 0;
ch = true;
}
piBreak;
}
}
if (exist || isRemoved(rpeer)) continue;
rpeer.dist++;
if (rpeer.name == pi.name) rpeer.dist = 0;
rpeer.resetPing();
addPeer(rpeer);
ch = true;
peerConnected(rpeer.name);
peerConnectedEvent(rpeer.name);
}
//piCout << "***";
//piCout << self_info.name << self_info.neighbours;
piForeach (PeerInfo & i, peers) {
if (i.dist == 0) {
self_info.addNeighbour(i.name);
i.addNeighbour(self_info.name);
}
//piCout << i.name << i.neighbours;
}
if (ch)
buildMap();
peers_mutex.unlock();
//piCoutObj << "after sync " << peers.size_s() << " peers";
break;
}
return true;
}
bool PIPeer::sendToNeighbour(PIPeer::PeerInfo * peer, const PIByteArray & ba) {
//if (peer->_neth == 0) return false;
PIEthernet::Address addr = peer->fastestAddress();
//piCout << "[PIPeer] sendToNeighbour" << peer->name << addr << ba.size_s() << "bytes ...";
//bool ok = peer->_neth->send(peer->_naddress, ba.data(), ba.size_s());
send_mutex.lock();
bool ok = eth_send.send(addr, ba);
//piCout << "[PIPeer] sendToNeighbour" << (ok ? "ok" : "fail");
if (ok) diag_d.sended(ba.size_s());
send_mutex.unlock();
return ok;
}
void PIPeer::sendMBcast(const PIByteArray & ba) {
send_mc_mutex.lock();
// piCout << "sendMBcast" << ba.size() << "bytes ...";
piForeach (PIEthernet * e, eths_mcast) {
//errorClear();
//piCout << "send to" << e->path() << e->sendAddress();// << e->send(ba);
//piCout << PIEthernet::ethErrorString();
if (e->isOpened())
if (e->send(ba))
diag_s.sended(ba.size_s());
}
piForeach (PIEthernet * e, eths_bcast) {
//errorClear();
//piCout << "send to" << e->path() << e->sendAddress();// << e->send(ba);
//piCout << PIEthernet::ethErrorString();
if (e->isOpened())
if (e->send(ba))
diag_s.sended(ba.size_s());
}
for (int p = _PIPEER_LOOPBACK_PORT_S; p <= _PIPEER_LOOPBACK_PORT_E; ++p) {
eth_lo.setSendPort(p);
if (eth_lo.send(ba))
diag_s.sended(ba.size_s());
}
PIVector cl = eth_tcp_srv.clients();
piForeach (PIEthernet * e, cl) {
if (e->isOpened() && e->isConnected())
if (e->send(ba))
diag_s.sended(ba.size_s());
}
if (eth_tcp_cli.isOpened() && eth_tcp_cli.isConnected()) {
if (eth_tcp_cli.send(ba))
diag_s.sended(ba.size_s());
}
// piCout << "sendMBcast ok";
send_mc_mutex.unlock();
}
void PIPeer::removeNeighbour(const PIString & name) {
piForeach (PeerInfo & p, peers)
p.neighbours.removeOne(name);
self_info.removeNeighbour(name);
}
void PIPeer::addPeer(const PIPeer::PeerInfo & pd) {
peers << pd;
PeerInfo & p(peers.back());
p.init();
CONNECTU(p._data, sendRequest, this, sendInternal)
CONNECTU(p._data, received, this, dtReceived)
}
bool PIPeer::removePeer(const PIString & name) {
for (int i = 0; i < peers.size_s(); ++i)
if (peers[i].name == name) {
peers[i].destroy();
peers.remove(i);
return true;
}
return false;
}
void PIPeer::sendPeerInfo(const PeerInfo & info) {
PIByteArray ba;
ba << int(1) << info.name << info;
sendMBcast(ba);
}
void PIPeer::sendPeerRemove(const PIString & peer) {
PIByteArray ba;
ba << int(2) << peer;
sendMBcast(ba);
}
void PIPeer::pingNeighbours() {
PIMutexLocker ml(peers_mutex);
PIByteArray ba, sba;
ba << int(5) << self_info.name;
// piCoutObj << "*** pingNeighbours" << peers.size() << "...";
piForeach (PeerInfo & p, peers) {
if (!p.isNeighbour()) continue;
//piCout << " ping neighbour" << p.name << p.ping();
send_mutex.lock();
piForeach (PeerInfo::PeerAddress & a, p.addresses) {
// piCout << " address" << a.address << a.wait_ping;
if (a.wait_ping) {
if ((PISystemTime::current(true) - a.last_ping).abs().toSeconds() <= _PIPEER_PING_TIMEOUT)
continue;
a.ping = -1.;
}
a.wait_ping = true;
sba = ba;
sba << p.name << a.address << PISystemTime::current(true);
// piCout << "ping" << p.name << a.address << a.last_ping;
if (eth_send.send(a.address, sba))
diag_s.sended(sba.size_s());
}
send_mutex.unlock();
}
//piCout << "*** pingNeighbours" << peers.size() << "done";
}
bool PIPeer::openDevice() {
PIConfig conf(
#ifndef WINDOWS
"/etc/pip.conf"
#else
"pip.conf"
#endif
, PIIODevice::ReadOnly);
server_ip = conf.getValue("peer_server_ip", "");
reinit();
diag_d.reset();
diag_s.reset();
//piCoutObj << "open...";
return true;
// PIMutexLocker ml(peers_mutex);
// if (trust_peer.isEmpty())
// return !peers.isEmpty();
// return hasPeer(trust_peer);
}
bool PIPeer::closeDevice() {
return false;
}
void PIPeer::syncPeers() {
//piCout << "[PIPeer \"" + self_info.name + "\"] sync " << peers.size_s() << " peers";
PIMutexLocker locker(eth_mutex);
// piCout << "syncPeers lock";
PIString pn;
bool change = false;
PIStringList dpeers;
peers_mutex.lock();
for (int i = 0; i < peers.size_s(); ++i) {
PeerInfo & cp(peers[i]);
if (cp.sync > 3) {
pn = cp.name;
//piCoutObj << "sync: remove " << pn;
cp.destroy();
addToRemoved(cp);
peers.remove(i);
sendPeerRemove(pn);
--i;
removeNeighbour(pn);
dpeers << pn;
change = true;
continue;
}
if (cp.was_update)
cp.sync = 0;
else
cp.sync++;
if (cp._data)
cp._data->setDist(cp.dist + 1);
cp.was_update = false;
}
if (change) buildMap();
self_info.cnt++;
self_info.time = PISystemTime::current();
PIByteArray ba;
ba << int(3) << self_info.name << self_info << peers;
peers_mutex.unlock();
sendMBcast(ba);
piForeachC (PIString & p, dpeers) {
peerDisconnected(p);
peerDisconnectedEvent(p);
}
}
void PIPeer::checkNetwork() {
PIEthernet::InterfaceList ifaces = PIEthernet::interfaces();
if (prev_ifaces == ifaces) return;
prev_ifaces = ifaces;
reinit();
}
void PIPeer::reinit() {
no_timer = true;
// timer.stop();
// timer.clearDelimiters();
PIMutexLocker mbl(mc_mutex);
PIMutexLocker ethl(eth_mutex);
// piCout << "reinit lock";
PIMutexLocker pl(peers_mutex);
PIMutexLocker sl(send_mutex);
initNetwork();
sendSelfInfo();
// eth_send.close();
// eth_lo.stopThreadedRead();
// eth_lo.close();
// eth_send.init();
// eth_send.open();
// eth_lo.startThreadedRead();
// timer.addDelimiter(5);
// timer.start(1000);
no_timer = false;
if (!sync_timer.isRunning()) sync_timer.start(1000);
}
void PIPeer::changeName(const PIString &new_name) {
PIString name_ = new_name;
if (name_.isEmpty()) name_ = "rnd_" + PIString::fromNumber(randomi() % 1000);
setName(name_);
self_info.name = name_;
diag_d.setName(name_+"_data");
diag_s.setName(name_+"_service");
}
int PIPeer::readDevice(void *read_to, int max_size) {
read_buffer_mutex.lock();
bool empty = read_buffer.isEmpty();
read_buffer_mutex.unlock();
while (empty) {
read_buffer_mutex.lock();
empty = read_buffer.isEmpty();
read_buffer_mutex.unlock();
piMSleep(10);
}
read_buffer_mutex.lock();
if (!read_buffer.isEmpty()) {
PIByteArray ba = read_buffer.dequeue();
read_buffer_mutex.unlock();
int sz = piMini(ba.size_s(), max_size);
memcpy(read_to, ba.data(), sz);
return sz;
}
read_buffer_mutex.unlock();
return 0;
}
int PIPeer::writeDevice(const void *data, int size) {
if (trust_peer.isEmpty()) {
sendToAll(data, size);
return size;
}
if (send(trust_peer, data, size))
return size;
else return -1;
}
void PIPeer::newTcpClient(PIEthernet *client) {
client->setName("__S__PIPeer_eth_TCP_ServerClient" + client->path());
piCoutObj << "client" << client->path();
CONNECTU(client, threadedReadEvent, this, mbcastRead);
client->startThreadedRead();
}
PIString PIPeer::constructFullPathDevice() const {
PIString ret;
ret << self_info.name << ":" << trustPeerName();
return ret;
}
void PIPeer::configureFromFullPathDevice(const PIString & full_path) {
PIStringList pl = full_path.split(":");
for (int i = 0; i < pl.size_s(); ++i) {
PIString p(pl[i]);
switch (i) {
case 0: changeName(p); break;
case 1: setTrustPeerName(p); break;
}
}
}
PIPropertyStorage PIPeer::constructVariantDevice() const {
PIPropertyStorage ret;
ret.addProperty("name", self_info.name);
ret.addProperty("trust peer", trustPeerName());
return ret;
}
void PIPeer::configureFromVariantDevice(const PIPropertyStorage & d) {
changeName(d.propertyValueByName("name").toString());
setTrustPeerName(d.propertyValueByName("trust peer").toString());
}
void PIPeer::initNetwork() {
// piCoutObj << "initNetwork ...";
eth_send.init();
destroyEths();
destroyMBcasts();
piMSleep(100);
// piCoutObj << self_info.addresses.size();
self_info.addresses.clear();
PIVector al = PIEthernet::allAddresses();
PIStringList sl;
piForeachC (PIEthernet::Address & a, al)
sl << a.ipString();
initEths(sl);
// piCoutObj << sl << self_info.addresses.size();
sl.removeAll("127.0.0.1");
initMBcasts(sl);
diag_d.start();
diag_s.start();
// piCoutObj << "initNetwork done";
}
void PIPeer::buildMap() {
//piCout << "[PIPeer \"" + name_ + "\"] buildMap";
peers_map.clear();
addresses_map.clear();
piForeach (PeerInfo & i, peers) {
i.trace = -1;
peers_map[i.name] = &i;
}
PIVector cwave, nwave;
int cwi = 0;
self_info.trace = 0;
cwave << &self_info;
while (!cwave.isEmpty()) {
nwave.clear();
++cwi;
piForeachC (PeerInfo * p, cwave) {
piForeachC (PIString & nn, p->neighbours) {
PeerInfo * np = peers_map.value(nn);
if (!np) continue;
if (np->trace >= 0) continue;
np->trace = cwi;
nwave << np;
}
}
cwave = nwave;
}
PIVector cpath;
piForeach (PeerInfo & c, peers) {
cpath.clear();
cpath << &c;
cwave << &c;
for (int w = c.trace - 1; w >= 0; --w) {
nwave.clear();
piForeachC (PeerInfo * p, cwave) {
piForeachC (PIString & nn, p->neighbours) {
PeerInfo * np = peers_map.value(nn);
if (!np) continue;
if (np->trace != w) continue;
cpath << np;
nwave << np;
}
}
cwave = nwave;
}
addresses_map[c.name] = cpath;
//piCout << "map" << c.name << "=" << cpath;
}
}
void PIPeer::tcpClientReconnect() {
eth_tcp_cli.connect(server_ip, _PIPEER_TCP_PORT);
}