Files
pip/libs/main/io_devices/pipeer.cpp
peri4 caa7880cc4 get rid of piForeach
apply some code analyzer recommendations
ICU flag now check if libicu exists
prepare for more accurate growth of containers (limited PoT, then constantly increase size)
2024-11-20 20:01:47 +03:00

1176 lines
30 KiB
C++

/*
PIP - Platform Independent Primitives
Peer - named I/O ethernet node
Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pipeer.h"
#include "piconfig.h"
#include "pidatatransfer.h"
#include "piliterals_time.h"
#include "pipropertystorage.h"
#include "pitime.h"
#define _PIPEER_MSG_SIZE 4000
#define _PIPEER_MSG_TTL 100
#define _PIPEER_MULTICAST_TTL 4
#define _PIPEER_MULTICAST_IP "232.13.3.12"
#define _PIPEER_LOOPBACK_PORT_S 13313
#define _PIPEER_LOOPBACK_PORT_E (13313 + 32)
#define _PIPEER_MULTICAST_PORT 13360
#define _PIPEER_TCP_PORT _PIPEER_MULTICAST_PORT
#define _PIPEER_BROADCAST_PORT 13361
#define _PIPEER_TRAFFIC_PORT_S 13400
#define _PIPEER_TRAFFIC_PORT_E 14000
#define _PIPEER_PING_TIMEOUT 5.0
class PIPeer::PeerData: public PIObject {
PIOBJECT_SUBCLASS(PeerData, PIObject);
public:
PeerData(const PIString & n);
~PeerData();
EVENT_HANDLER1(void, dtSendRequestIn, PIByteArray &, data) {
data.push_front(uchar(2));
sendRequest(name(), data);
}
EVENT_HANDLER1(void, dtSendRequestOut, PIByteArray &, data) {
data.push_front(uchar(3));
sendRequest(name(), data);
}
EVENT_HANDLER1(void, dtReceiveFinishedIn, bool, ok) {
if (ok) received(name(), dt_in.data());
}
EVENT_HANDLER1(void, dtReceiveFinishedOut, bool, ok) {
if (ok) received(name(), dt_out.data());
}
EVENT_HANDLER(void, dtThread);
EVENT2(received, const PIString &, from, const PIByteArray &, data);
EVENT2(sendRequest, const PIString &, to, const PIByteArray &, data);
bool send(const PIByteArray & d);
void receivedPacket(uchar type, const PIByteArray & d);
void setDist(int dist);
PIByteArray data;
PIThread t;
PIDataTransfer dt_in, dt_out;
};
PIPeer::PeerData::PeerData(const PIString & n): PIObject(n) {
dt_in.setPacketSize(_PIPEER_MSG_SIZE);
dt_out.setPacketSize(_PIPEER_MSG_SIZE);
dt_in.setCRCEnabled(false);
dt_out.setCRCEnabled(false);
CONNECT1(void, PIByteArray &, &dt_in, sendRequest, this, dtSendRequestIn);
CONNECT1(void, PIByteArray &, &dt_out, sendRequest, this, dtSendRequestOut);
CONNECT1(void, bool, &dt_in, receiveFinished, this, dtReceiveFinishedIn);
CONNECT1(void, bool, &dt_out, receiveFinished, this, dtReceiveFinishedOut);
CONNECT0(void, &t, started, this, dtThread);
}
PIPeer::PeerData::~PeerData() {
dt_in.stop();
dt_out.stop();
t.stop();
if (!t.waitForFinish(1_s)) t.terminate();
}
void PIPeer::PeerData::dtThread() {
dt_out.send(data);
// piCoutObj << "send DT done";
}
bool PIPeer::PeerData::send(const PIByteArray & d) {
// piCout << "send ..." << t.isRunning();
if (t.isRunning()) return false;
data = d;
t.startOnce();
return true;
}
void PIPeer::PeerData::receivedPacket(uchar type, const PIByteArray & d) {
PIDataTransfer * dt = 0;
if (type == 3) dt = &dt_in;
if (type == 2) dt = &dt_out;
// piCoutObj << "DT received" << int(type) << d.size_s() << "...";
if (dt) dt->received(d);
// piCoutObj << "DT received" << int(type) << d.size_s() << "done";
}
void PIPeer::PeerData::setDist(int dist) {
dt_in.setTimeout(10 * dist);
}
PIPeer::PeerInfo::PeerAddress::PeerAddress(const PINetworkAddress & a, const PINetworkAddress & m): address(a), netmask(m) {
ping = -1.;
wait_ping = false;
last_ping = PISystemTime::current(true);
}
int PIPeer::PeerInfo::ping() const {
int ret = -1;
for (const auto & a: addresses)
if (a.ping > 0.) {
if (ret < 0)
ret = piRoundd(a.ping);
else
ret = piMini(ret, piRoundd(a.ping));
}
return ret;
}
void PIPeer::PeerInfo::init() {
if (_data == 0) _data = new PeerData(name);
}
void PIPeer::PeerInfo::destroy() {
if (_data) delete _data;
_data = 0;
}
PINetworkAddress PIPeer::PeerInfo::fastestAddress() const {
double mp = -1.;
PINetworkAddress ret;
for (const auto & a: addresses) {
if (a.ping <= 0.) continue;
if ((mp < 0) || (mp > a.ping)) {
mp = a.ping;
ret = a.address;
}
}
return ret;
}
void PIPeer::PeerInfo::addNeighbour(const PIString & n) {
if (!neighbours.contains(n)) neighbours << n;
}
void PIPeer::PeerInfo::addNeighbours(const PIStringList & l) {
for (const auto & n: l)
if (!neighbours.contains(n)) neighbours << n;
}
void PIPeer::PeerInfo::removeNeighbour(const PIString & n) {
neighbours.removeAll(n);
}
void PIPeer::PeerInfo::resetPing() {
for (auto & a: addresses)
a.ping = -1;
}
REGISTER_DEVICE(PIPeer)
PIPeer::PIPeer(const PIString & n)
: PIIODevice()
, inited__(false)
, eth_tcp_srv(PIEthernet::TCP_Server)
, eth_tcp_cli(PIEthernet::TCP_Client)
, diag_s(false)
, diag_d(false) {
sync_timer.setName("_S.PIPeer.sync");
// piCout << " PIPeer" << uint(this);
destroyed = false;
setDebug(false);
PIMutexLocker mbl(mc_mutex);
PIMutexLocker ethl(eth_mutex);
PIMutexLocker pl(peers_mutex);
PIMutexLocker sl(send_mutex);
changeName(n);
setReopenTimeout(100_ms);
read_buffer_size = 128;
self_info.dist = 0;
self_info.time = PISystemTime::current();
randomize();
CONNECT1(void, int, &sync_timer, tickEvent, this, timerEvent);
prev_ifaces = PIEthernet::interfaces();
no_timer = false;
sync_timer.addDelimiter(5);
}
PIPeer::~PIPeer() {
// piCout << "~PIPeer" << uint(this);
stop();
if (destroyed) return;
destroyed = true;
destroy();
}
void PIPeer::timerEvent(int delim) {
// piCoutObj << "timerEvent" << delim;
if (no_timer) return;
switch (delim) {
case 1: // every 1 s
syncPeers();
piMSleep(100);
pingNeighbours();
// piCoutObj << "isOpened" << isOpened();
break;
case 5: // every 5 s
checkNetwork();
break;
}
}
void PIPeer::initEths(PIStringList al) {
// piCoutObj << "initEths start";
PIEthernet * ce;
const PIEthernet::Interface * cint = 0;
for (const auto & a: al) {
ce = new PIEthernet();
ce->setDebug(false);
ce->setName("_S.PIPeer.traf_rec_" + a);
ce->setParameters(0);
bool ok = false;
for (int p = _PIPEER_TRAFFIC_PORT_S; p < _PIPEER_TRAFFIC_PORT_E; ++p) {
ce->setReadAddress(a, p);
if (ce->open()) {
eths_traffic << ce;
cint = prev_ifaces.getByAddress(a);
self_info.addresses << PeerInfo::PeerAddress(ce->path(), cint == 0 ? "255.255.255.0" : cint->netmask);
CONNECT2(void, const uchar *, ssize_t, ce, threadedReadEvent, this, dataRead);
ce->startThreadedRead();
// piCoutObj << "dc binded to" << ce->path();
// piCoutObj << "add eth" << a;
ok = true;
break;
}
}
if (!ok) delete ce;
}
eth_send.setDebug(false);
eth_send.setName("_S.PIPeer.traf_send");
eth_send.setParameters(0);
// piCoutObj << "initEths ok";
}
void PIPeer::initMBcasts(PIStringList al) {
PIEthernet * ce;
const PIEthernet::Interface * cint;
PIString nm;
al << _PIPEER_MULTICAST_IP;
// piCoutObj << "initMBcasts start" << al;
for (const auto & a: al) {
// piCout << "mcast try" << a;
ce = new PIEthernet();
ce->setDebug(false);
ce->setName("_S.PIPeer.mcast_" + a);
ce->setParameters(0);
ce->setSendAddress(_PIPEER_MULTICAST_IP, _PIPEER_MULTICAST_PORT);
ce->setReadAddress(a, _PIPEER_MULTICAST_PORT);
ce->setMulticastTTL(_PIPEER_MULTICAST_TTL);
ce->joinMulticastGroup(_PIPEER_MULTICAST_IP);
if (ce->open()) {
eths_mcast << ce;
CONNECT2(void, const uchar *, ssize_t, ce, threadedReadEvent, this, mbcastRead);
ce->startThreadedRead();
// piCout << "mcast bind to" << a << ce->sendIP();
} else {
delete ce;
// piCoutObj << "invalid address for mcast" << a;
}
}
al.removeAll(_PIPEER_MULTICAST_IP);
for (const auto & a: al) {
ce = new PIEthernet();
ce->setDebug(false);
ce->setName("_S.PIPeer.bcast_" + a);
ce->setParameters(PIEthernet::Broadcast);
cint = prev_ifaces.getByAddress(a);
nm = (cint == 0) ? "255.255.255.0" : cint->netmask;
ce->setSendAddress(PIEthernet::getBroadcast(a, nm), _PIPEER_BROADCAST_PORT);
ce->setReadAddress(a, _PIPEER_BROADCAST_PORT);
if (ce->open()) {
eths_bcast << ce;
CONNECT2(void, const uchar *, ssize_t, ce, threadedReadEvent, this, mbcastRead);
ce->startThreadedRead();
// piCout << "mc BC try" << a << nm << ce->sendIP();
// piCout << "bcast bind to" << a << nm;
} else {
delete ce;
// piCoutObj << "invalid address for bcast" << a;
}
}
eth_lo.setName("_S.PIPeer.lo");
eth_lo.setParameters(PIEthernet::SeparateSockets);
eth_lo.init();
cint = prev_ifaces.getByAddress("127.0.0.1");
for (int p = _PIPEER_LOOPBACK_PORT_S; p <= _PIPEER_LOOPBACK_PORT_E; ++p) {
eth_lo.setReadAddress("127.0.0.1", p);
if (eth_lo.open()) {
eth_lo.setSendIP("127.0.0.1");
CONNECT2(void, const uchar *, ssize_t, &eth_lo, threadedReadEvent, this, mbcastRead);
eth_lo.startThreadedRead();
// piCout << "lo binded to" << eth_lo.readAddress() << eth_lo.sendAddress();
// piCout << "add eth" << ta;
break;
}
}
eth_tcp_srv.setName("_S.PIPeer.TCP_Server");
eth_tcp_srv.init();
eth_tcp_srv.listen("0.0.0.0", _PIPEER_TCP_PORT, true);
eth_tcp_srv.setDebug(false);
CONNECT1(void, PIEthernet *, &eth_tcp_srv, newConnection, this, newTcpClient);
eth_tcp_srv.startThreadedRead();
eth_tcp_cli.setName("_S.PIPeer.TCP_Client");
eth_tcp_cli.init();
eth_tcp_cli.setDebug(false);
tcpClientReconnect();
CONNECT2(void, const uchar *, ssize_t, &eth_tcp_cli, threadedReadEvent, this, mbcastRead);
CONNECTU(&eth_tcp_cli, disconnected, this, tcpClientReconnect);
eth_tcp_cli.startThreadedRead();
if (eths_mcast.isEmpty() && eths_bcast.isEmpty() && !eth_lo.isOpened())
piCoutObj << "Warning! Can`t find suitable network interface for multicast receive, check for exists at least one interface with "
"multicasting enabled!";
// piCoutObj << "initMBcasts ok";
}
void PIPeer::destroy() {
sync_timer.stopAndWait();
diag_s.stopAndWait();
diag_d.stopAndWait();
PIMutexLocker ml(peers_mutex);
for (auto & p: peers)
if (p._data) {
p._data->dt_in.stop();
p._data->dt_out.stop();
p._data->t.stopAndWait();
}
destroyEths();
for (auto * i: eths_mcast) {
if (!i) continue;
i->stopAndWait();
}
for (auto * i: eths_bcast) {
if (!i) continue;
i->stopAndWait();
}
eth_lo.stopAndWait();
eth_tcp_srv.stopAndWait();
eth_tcp_cli.stopAndWait();
sendSelfRemove();
eth_lo.close();
eth_tcp_srv.close();
eth_tcp_cli.close();
destroyMBcasts();
eth_send.close();
for (auto & p: peers)
p.destroy();
peers.clear();
destroyed = true;
}
void PIPeer::destroyEths() {
for (auto * i: eths_traffic) {
if (!i) continue;
i->stopAndWait();
i->close();
}
piDeleteAllAndClear(eths_traffic);
}
void PIPeer::destroyMBcasts() {
for (auto * i: eths_mcast) {
if (!i) continue;
i->leaveMulticastGroup(_PIPEER_MULTICAST_IP);
i->stopAndWait();
i->close();
}
for (auto * i: eths_bcast) {
if (!i) continue;
i->stopAndWait();
i->close();
}
piDeleteAllAndClear(eths_mcast);
piDeleteAllAndClear(eths_bcast);
eth_lo.stopAndWait();
eth_lo.close();
eth_tcp_srv.stopAndWait();
}
PIPeer::PeerInfo * PIPeer::quickestPeer(const PIString & to) {
if (!peers_map.contains(to)) return 0;
// piCout << "*** search quickest peer" << to;
PIVector<PeerInfo *> tp = addresses_map.value(to);
if (tp.isEmpty()) return 0;
return tp.back();
}
bool PIPeer::send(const PIString & to, const void * data, int size) {
PIByteArray ba(data, size);
// piCoutObj << "send" << ba.size_s() << "bytes" << _PIPEER_MSG_SIZE;
if (ba.size_s() <= _PIPEER_MSG_SIZE) {
ba.insert(0, uchar(1));
return sendInternal(to, ba);
} else {
// ba.insert(0, uchar(2));
PIMutexLocker mlocker(peers_mutex);
PeerInfo * dp = const_cast<PeerInfo *>(getPeerByName(to));
if (!dp) return false;
return dp->_data->send(ba);
}
return true;
}
bool PIPeer::send(const PeerInfo * to, const PIByteArray & data) {
if (!to) return false;
return send(to->name, data.data(), data.size_s());
}
bool PIPeer::send(const PeerInfo * to, const PIString & data) {
if (!to) return false;
return send(to->name, data.data(), data.size_s());
}
bool PIPeer::send(const PeerInfo * to, const void * data, int size) {
if (!to) return false;
return send(to->name, data, size);
}
void PIPeer::sendToAll(const PIByteArray & data) {
for (const auto & i: peers)
send(i.name, data.data(), data.size_s());
}
void PIPeer::sendToAll(const PIString & data) {
for (const auto & i: peers)
send(i.name, data.data(), data.size_s());
}
void PIPeer::sendToAll(const void * data, int size) {
for (const auto & i: peers)
send(i.name, data, size);
}
bool PIPeer::sendInternal(const PIString & to, const PIByteArray & data) {
PIMutexLocker mlocker(peers_mutex);
PeerInfo * dp = quickestPeer(to);
if (dp == 0) {
// piCoutObj << "Can`t find peer \"" << to << "\"!";
return false;
}
PIByteArray ba;
ba << int(4) << self_info.name << to << int(0) << data;
// piCoutObj << "sendInternal to" << to << data.size_s() << int(data.front());
if (!sendToNeighbour(dp, ba)) {
// piCoutObj << "send error";
return false;
}
return true;
}
void PIPeer::dtReceived(const PIString & from, const PIByteArray & data) {
PIByteArray ba = data;
dataReceived(from, data);
dataReceivedEvent(from, data);
if (trust_peer.isEmpty() || trust_peer == from) {
read_buffer_mutex.lock();
if (read_buffer.size_s() < read_buffer_size) read_buffer.enqueue(ba);
read_buffer_mutex.unlock();
}
}
bool PIPeer::dataRead(const uchar * readed, ssize_t size) {
if (destroyed) {
// piCout << "[PIPeer] SegFault";
return true;
}
if (size < 16) return true;
PIByteArray ba(readed, size), sba, pba;
int type, cnt;
PIString from, to;
ba >> type;
eth_mutex.lock();
// piCout << "dataRead lock";
if (type == 5) { // ping request
PINetworkAddress addr;
PISystemTime time;
ba >> to >> from >> addr >> time;
// piCout << "ping request" << to << from << addr;
PIMutexLocker plocker(peers_mutex);
if (from == self_info.name) { // send ping back
const PeerInfo * pi = getPeerByName(to);
if (pi) {
if (pi->isNeighbour()) {
sba << int(6) << to << from << addr << time;
// piCout << " ping from" << from << addr << ", send back to" << pi->name;
send_mutex.lock();
for (const auto & a: pi->addresses) {
if (eth_send.send(a.address, sba)) diag_s.received(sba.size_s());
}
send_mutex.unlock();
}
}
}
eth_mutex.unlock();
return true;
}
if (type == 6) { // ping request
PINetworkAddress addr;
PISystemTime time, ptime, ctime = PISystemTime::current(true);
ba >> to >> from >> addr >> time;
// piCout << "ping reply" << to << from << addr;
PIMutexLocker plocker(peers_mutex);
if (to == self_info.name) { // ping echo
for (auto & p: peers) {
if (!p.isNeighbour()) continue;
if (p.name != from) continue;
for (auto & a: p.addresses) {
if (a.address != addr) continue;
if (a.last_ping >= time) break;
ptime = ctime - time;
a.last_ping = time;
a.wait_ping = false;
if (a.ping < 0)
a.ping = ptime.toMilliseconds();
else
a.ping = 0.6 * a.ping + 0.4 * ptime.toMilliseconds();
// piCout << " ping echo" << p.name << a.address << a.ping;
eth_mutex.unlock();
return true;
}
}
}
eth_mutex.unlock();
return true;
}
// piCoutObj << "received data from" << from << "packet" << type;
if (type != 4) {
eth_mutex.unlock();
return true;
}
diag_d.received(size);
ba >> from >> to >> cnt >> pba;
// piCoutObj << "Received packet" << type << from << to << pba.size_s();
if (type == 4) { // data packet
if (to == self_info.name) { // my packet
uchar pt = pba.take_front();
// piCoutObj << "Received packet" << type << from << to << int(pt) << pba.size_s();
peers_mutex.lock();
PeerInfo * fp = const_cast<PeerInfo *>(getPeerByName(from));
if (fp == 0) {
peers_mutex.unlock();
eth_mutex.unlock();
return true;
}
if (pt == 1) {
peers_mutex.unlock();
eth_mutex.unlock();
dtReceived(from, pba);
return true;
}
if (pt == 2 || pt == 3) {
peers_mutex.unlock();
eth_mutex.unlock();
if (fp->_data) fp->_data->receivedPacket(pt, pba);
return true;
}
peers_mutex.unlock();
eth_mutex.unlock();
return true;
}
PIMutexLocker plocker(peers_mutex);
PeerInfo * dp = quickestPeer(to);
if (dp == 0) {
// piCoutObj << "Can`t find peer \"" << to << "\"!";
eth_mutex.unlock();
return true;
}
cnt++;
if (cnt > _PIPEER_MSG_TTL || from == dp->name) return true;
sba << type << from << to << cnt << pba;
// piCout << "translate packet" << from << "->" << to << ", ttl =" << cnt;
sendToNeighbour(dp, sba);
}
eth_mutex.unlock();
return true;
}
bool PIPeer::mbcastRead(const uchar * data, ssize_t size) {
if (destroyed) {
// piCout << "[PIPeer] SegFault";
return true;
}
if (size < 8) return true;
int type, dist;
PIByteArray ba(data, size);
ba >> type;
if (type <= 0 || type >= 4) return true;
PeerInfo pi;
ba >> pi.name;
// piCout << "received mb from" << pi.name << "packet" << type;
if (pi.name == self_info.name) return true;
PIMutexLocker locker(mc_mutex);
diag_s.received(size);
const PeerInfo * rpi = 0;
bool ch = false;
PIVector<PeerInfo> rpeers;
// piCout << "analyz ...";
switch (type) {
case 1: // new peer
// piCout << "new peer packet ...";
peers_mutex.lock();
if (!hasPeer(pi.name)) {
ba >> pi;
pi.sync = 0;
if (pi.dist == 0) {
pi.addNeighbour(self_info.name);
self_info.addNeighbour(pi.name);
}
pi.resetPing();
addPeer(pi);
buildMap();
// piCoutObj << "new peer \"" << pi.name << "\"" << " dist " << pi.dist;
// piCoutObj << mode() << opened_;
pi.dist++;
sendSelfInfo();
sendPeerInfo(pi);
ch = true;
// piCout << "new peer packet ok";
}
peers_mutex.unlock();
if (ch) {
peerConnected(pi.name);
peerConnectedEvent(pi.name);
}
break;
case 2: // remove peer
// piCout << "remove peer packet ..." << pi.name;
peers_mutex.lock();
removeNeighbour(pi.name);
rpi = getPeerByName(pi.name);
if (rpi) {
dist = rpi->dist;
addToRemoved(*rpi);
removePeer(pi.name);
// piCoutObj << "remove peer \"" << pi.name << "\"";
if (dist == 0) self_info.removeNeighbour(pi.name);
sendPeerRemove(pi.name);
buildMap();
ch = true;
// piCout << "remove peer packet ok";
}
peers_mutex.unlock();
if (ch) {
peerDisconnected(pi.name);
peerDisconnectedEvent(pi.name);
}
break;
case 3: // sync peers
// piCout << "sync packet ...";
ba >> pi >> rpeers;
rpeers << pi;
// piCoutObj << "rec sync " << rpeers.size_s() << " peers";
peers_mutex.lock();
if (!self_info.neighbours.contains(pi.name)) {
// piCoutObj << "add new nei to me" << pi.name;
self_info.addNeighbour(pi.name);
PeerInfo * np = peers_map.value(pi.name);
if (np) {
np->addNeighbour(self_info.name);
np->dist = 0;
}
ch = true;
}
for (auto & rpeer: rpeers) {
// piCout << " to sync " << rpeer.name;
if (rpeer.name == self_info.name) continue;
bool exist = false;
for (auto & peer: peers) {
if (peer.name == rpeer.name) {
exist = true;
if (isPeerRecent(peer, rpeer)) {
// piCout << "synced " << peer.name;
for (int z = 0; z < rpeer.addresses.size_s(); ++z) {
PeerInfo::PeerAddress & ra(rpeer.addresses[z]);
for (int k = 0; k < peer.addresses.size_s(); ++k) {
PeerInfo::PeerAddress & a(peer.addresses[k]);
if (ra.address == a.address) {
ra.ping = a.ping;
ra.wait_ping = a.wait_ping;
ra.last_ping = a.last_ping;
break;
}
}
}
peer.was_update = true;
peer.addresses = rpeer.addresses;
peer.cnt = rpeer.cnt;
peer.time = rpeer.time;
peer.addNeighbours(rpeer.neighbours);
rpeer.neighbours = peer.neighbours;
if (peer.name == pi.name) peer.sync = 0;
ch = true;
}
break;
}
}
if (exist || isRemoved(rpeer)) continue;
rpeer.dist++;
if (rpeer.name == pi.name) rpeer.dist = 0;
rpeer.resetPing();
addPeer(rpeer);
ch = true;
peerConnected(rpeer.name);
peerConnectedEvent(rpeer.name);
}
// piCout << "***";
// piCout << self_info.name << self_info.neighbours;
for (auto & i: peers) {
if (i.dist == 0) {
self_info.addNeighbour(i.name);
i.addNeighbour(self_info.name);
}
// piCout << i.name << i.neighbours;
}
if (ch) buildMap();
peers_mutex.unlock();
// piCoutObj << "after sync " << peers.size_s() << " peers";
break;
}
return true;
}
bool PIPeer::sendToNeighbour(PIPeer::PeerInfo * peer, const PIByteArray & ba) {
PINetworkAddress addr = peer->fastestAddress();
// piCout << "[PIPeer] sendToNeighbour" << peer->name << addr << ba.size_s() << "bytes ...";
send_mutex.lock();
bool ok = eth_send.send(addr, ba);
// piCout << "[PIPeer] sendToNeighbour" << (ok ? "ok" : "fail");
if (ok) diag_d.sended(ba.size_s());
send_mutex.unlock();
return ok;
}
void PIPeer::sendMBcast(const PIByteArray & ba) {
send_mc_mutex.lock();
// piCout << "sendMBcast" << ba.size() << "bytes ...";
for (auto * e: eths_mcast) {
if (e->isOpened())
if (e->send(ba)) diag_s.sended(ba.size_s());
}
for (auto * e: eths_bcast) {
if (e->isOpened())
if (e->send(ba)) diag_s.sended(ba.size_s());
}
for (int p = _PIPEER_LOOPBACK_PORT_S; p <= _PIPEER_LOOPBACK_PORT_E; ++p) {
eth_lo.setSendPort(p);
if (eth_lo.send(ba)) diag_s.sended(ba.size_s());
}
PIVector<PIEthernet *> cl = eth_tcp_srv.clients();
for (auto * e: cl) {
if (e->isOpened() && e->isConnected())
if (e->send(ba)) diag_s.sended(ba.size_s());
}
if (eth_tcp_cli.isOpened() && eth_tcp_cli.isConnected()) {
if (eth_tcp_cli.send(ba)) diag_s.sended(ba.size_s());
}
// piCout << "sendMBcast ok";
send_mc_mutex.unlock();
}
void PIPeer::removeNeighbour(const PIString & name) {
for (auto & p: peers)
p.neighbours.removeOne(name);
self_info.removeNeighbour(name);
}
void PIPeer::addPeer(const PIPeer::PeerInfo & pd) {
peers << pd;
PeerInfo & p(peers.back());
p.init();
CONNECT2(void, const PIString &, const PIByteArray &, p._data, sendRequest, this, sendInternal)
CONNECT2(void, const PIString &, const PIByteArray &, p._data, received, this, dtReceived)
}
bool PIPeer::removePeer(const PIString & name) {
for (int i = 0; i < peers.size_s(); ++i)
if (peers[i].name == name) {
peers[i].destroy();
peers.remove(i);
return true;
}
return false;
}
void PIPeer::sendPeerInfo(const PeerInfo & info) {
PIByteArray ba;
ba << int(1) << info.name << info;
sendMBcast(ba);
}
void PIPeer::sendPeerRemove(const PIString & peer) {
// piCout << name() << "sendPeerRemove" << peer;
PIByteArray ba;
ba << int(2) << peer;
sendMBcast(ba);
}
void PIPeer::pingNeighbours() {
PIMutexLocker ml(peers_mutex);
PIByteArray ba, sba;
ba << int(5) << self_info.name;
// piCoutObj << "*** pingNeighbours" << peers.size() << "...";
for (auto & p: peers) {
if (!p.isNeighbour()) continue;
// piCout << " ping neighbour" << p.name << p.ping();
send_mutex.lock();
for (auto & a: p.addresses) {
// piCout << " address" << a.address << a.wait_ping;
if (a.wait_ping) {
if ((PISystemTime::current(true) - a.last_ping).abs().toSeconds() <= _PIPEER_PING_TIMEOUT) continue;
a.ping = -1.;
}
a.wait_ping = true;
sba = ba;
sba << p.name << a.address << PISystemTime::current(true);
// piCout << "ping" << p.name << a.address << a.last_ping;
if (eth_send.send(a.address, sba)) diag_s.sended(sba.size_s());
}
send_mutex.unlock();
}
// piCout << "*** pingNeighbours" << peers.size() << "done";
}
bool PIPeer::openDevice() {
PIConfig conf(
#ifndef WINDOWS
"/etc/pip.conf"
#else
"pip.conf"
#endif
,
PIIODevice::ReadOnly);
server_ip = conf.getValue("peer_server_ip", "").toString();
reinit();
diag_d.reset();
diag_s.reset();
// piCoutObj << "open...";
return true;
}
bool PIPeer::closeDevice() {
return false;
}
void PIPeer::syncPeers() {
// piCout << "[PIPeer \"" + self_info.name + "\"] sync " << peers.size_s() << " peers";
PIMutexLocker locker(eth_mutex);
// piCout << "syncPeers lock";
PIString pn;
bool change = false;
PIStringList dpeers;
peers_mutex.lock();
for (int i = 0; i < peers.size_s(); ++i) {
PeerInfo & cp(peers[i]);
if (cp.sync > 3) {
pn = cp.name;
// piCout << "sync: remove " << pn;
cp.destroy();
addToRemoved(cp);
peers.remove(i);
sendPeerRemove(pn);
--i;
removeNeighbour(pn);
dpeers << pn;
change = true;
continue;
}
if (cp.was_update)
cp.sync = 0;
else
cp.sync++;
if (cp._data) cp._data->setDist(cp.dist + 1);
cp.was_update = false;
}
if (change) buildMap();
self_info.cnt++;
self_info.time = PISystemTime::current();
PIByteArray ba;
ba << int(3) << self_info.name << self_info << peers;
peers_mutex.unlock();
sendMBcast(ba);
for (const auto & p: dpeers) {
peerDisconnected(p);
peerDisconnectedEvent(p);
}
}
void PIPeer::checkNetwork() {
PIEthernet::InterfaceList ifaces = PIEthernet::interfaces();
if (prev_ifaces == ifaces) return;
prev_ifaces = ifaces;
reinit();
}
void PIPeer::reinit() {
no_timer = true;
PIMutexLocker mbl(mc_mutex);
PIMutexLocker ethl(eth_mutex);
// piCout << "reinit lock";
PIMutexLocker pl(peers_mutex);
PIMutexLocker sl(send_mutex);
initNetwork();
sendSelfInfo();
no_timer = false;
if (!sync_timer.isRunning()) sync_timer.start(1_Hz);
}
void PIPeer::changeName(const PIString & new_name) {
PIString name_ = new_name;
if (name_.isEmpty()) name_ = "rnd_" + PIString::fromNumber(randomi() % 1000);
setName(name_);
self_info.name = name_;
diag_d.setName(name_ + "_data");
diag_s.setName(name_ + "_service");
}
void PIPeer::setTcpServerIP(const PIString & ip) {
server_ip = ip;
tcpClientReconnect();
}
ssize_t PIPeer::bytesAvailable() const {
ssize_t ret = 0;
read_buffer_mutex.lock();
if (!read_buffer.isEmpty()) ret = read_buffer.back().size();
read_buffer_mutex.unlock();
return ret;
}
ssize_t PIPeer::readDevice(void * read_to, ssize_t max_size) {
iterrupted = false;
read_buffer_mutex.lock();
bool empty = read_buffer.isEmpty();
read_buffer_mutex.unlock();
while (empty) {
read_buffer_mutex.lock();
empty = read_buffer.isEmpty();
read_buffer_mutex.unlock();
if (iterrupted) {
return 0;
}
piMSleep(10);
}
read_buffer_mutex.lock();
if (!read_buffer.isEmpty()) {
PIByteArray ba = read_buffer.dequeue();
read_buffer_mutex.unlock();
ssize_t sz = piMini(ba.size_s(), max_size);
memcpy(read_to, ba.data(), sz);
if (iterrupted) {
return 0;
}
return sz;
}
read_buffer_mutex.unlock();
return 0;
}
ssize_t PIPeer::writeDevice(const void * data, ssize_t size) {
if (trust_peer.isEmpty()) {
sendToAll(data, size);
return size;
}
if (send(trust_peer, data, size))
return size;
else
return -1;
}
void PIPeer::interrupt() {
iterrupted = true;
}
void PIPeer::newTcpClient(PIEthernet * client) {
client->setName("_S.PIPeer.TCP_ServerClient" + client->path());
piCoutObj << "client" << client->path();
CONNECT2(void, const uchar *, ssize_t, client, threadedReadEvent, this, mbcastRead);
client->startThreadedRead();
}
PIString PIPeer::constructFullPathDevice() const {
PIString ret;
ret += self_info.name + ":" + trustPeerName();
return ret;
}
void PIPeer::configureFromFullPathDevice(const PIString & full_path) {
PIStringList pl = full_path.split(":");
for (int i = 0; i < pl.size_s(); ++i) {
PIString p(pl[i]);
switch (i) {
case 0: changeName(p); break;
case 1: setTrustPeerName(p); break;
}
}
}
PIPropertyStorage PIPeer::constructVariantDevice() const {
PIPropertyStorage ret;
ret.addProperty("name", self_info.name);
ret.addProperty("trust peer", trustPeerName());
return ret;
}
void PIPeer::configureFromVariantDevice(const PIPropertyStorage & d) {
changeName(d.propertyValueByName("name").toString());
setTrustPeerName(d.propertyValueByName("trust peer").toString());
}
void PIPeer::initNetwork() {
// piCoutObj << "initNetwork ...";
eth_send.init();
destroyEths();
destroyMBcasts();
piMSleep(100);
// piCoutObj << self_info.addresses.size();
self_info.addresses.clear();
PIVector<PINetworkAddress> al = PIEthernet::allAddresses();
PIStringList sl;
for (const PINetworkAddress & a: al) {
sl << a.ipString();
}
initEths(sl);
// piCoutObj << sl << self_info.addresses.size();
sl.removeAll("127.0.0.1");
initMBcasts(sl);
diag_d.start();
diag_s.start();
// piCoutObj << "initNetwork done";
}
void PIPeer::buildMap() {
// piCout << "[PIPeer \"" + name_ + "\"] buildMap";
peers_map.clear();
addresses_map.clear();
for (auto & i: peers) {
i.trace = -1;
peers_map[i.name] = &i;
}
PIVector<PeerInfo *> cwave, nwave;
int cwi = 0;
self_info.trace = 0;
cwave << &self_info;
while (!cwave.isEmpty()) {
nwave.clear();
++cwi;
for (const auto * p: cwave) {
for (const auto & nn: p->neighbours) {
PeerInfo * np = peers_map.value(nn);
if (!np) continue;
if (np->trace >= 0) continue;
np->trace = cwi;
nwave << np;
}
}
cwave = nwave;
}
PIVector<PeerInfo *> cpath;
for (auto & c: peers) {
cpath.clear();
cpath << &c;
cwave << &c;
for (int w = c.trace - 1; w >= 0; --w) {
nwave.clear();
for (const auto * p: cwave) {
for (const auto & nn: p->neighbours) {
PeerInfo * np = peers_map.value(nn);
if (!np) continue;
if (np->trace != w) continue;
cpath << np;
nwave << np;
}
}
cwave = nwave;
}
addresses_map[c.name] = cpath;
// piCout << "map" << c.name << "=" << cpath;
}
}
void PIPeer::tcpClientReconnect() {
eth_tcp_cli.connect(server_ip, _PIPEER_TCP_PORT);
}
bool PIPeer::hasPeer(const PIString & name) {
for (const auto & i: peers)
if (i.name == name) return true;
return false;
}