1176 lines
30 KiB
C++
1176 lines
30 KiB
C++
/*
|
|
PIP - Platform Independent Primitives
|
|
Peer - named I/O ethernet node
|
|
Ivan Pelipenko peri4ko@yandex.ru
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Lesser General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Lesser General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Lesser General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "pipeer.h"
|
|
|
|
#include "piconfig.h"
|
|
#include "pidatatransfer.h"
|
|
#include "piliterals_time.h"
|
|
#include "pipropertystorage.h"
|
|
#include "pitime.h"
|
|
|
|
#define _PIPEER_MSG_SIZE 4000
|
|
#define _PIPEER_MSG_TTL 100
|
|
#define _PIPEER_MULTICAST_TTL 4
|
|
#define _PIPEER_MULTICAST_IP "232.13.3.12"
|
|
#define _PIPEER_LOOPBACK_PORT_S 13313
|
|
#define _PIPEER_LOOPBACK_PORT_E (13313 + 32)
|
|
#define _PIPEER_MULTICAST_PORT 13360
|
|
#define _PIPEER_TCP_PORT _PIPEER_MULTICAST_PORT
|
|
#define _PIPEER_BROADCAST_PORT 13361
|
|
#define _PIPEER_TRAFFIC_PORT_S 13400
|
|
#define _PIPEER_TRAFFIC_PORT_E 14000
|
|
#define _PIPEER_PING_TIMEOUT 5.0
|
|
|
|
class PIPeer::PeerData: public PIObject {
|
|
PIOBJECT_SUBCLASS(PeerData, PIObject);
|
|
|
|
public:
|
|
PeerData(const PIString & n);
|
|
~PeerData();
|
|
EVENT_HANDLER1(void, dtSendRequestIn, PIByteArray &, data) {
|
|
data.push_front(uchar(2));
|
|
sendRequest(name(), data);
|
|
}
|
|
EVENT_HANDLER1(void, dtSendRequestOut, PIByteArray &, data) {
|
|
data.push_front(uchar(3));
|
|
sendRequest(name(), data);
|
|
}
|
|
EVENT_HANDLER1(void, dtReceiveFinishedIn, bool, ok) {
|
|
if (ok) received(name(), dt_in.data());
|
|
}
|
|
EVENT_HANDLER1(void, dtReceiveFinishedOut, bool, ok) {
|
|
if (ok) received(name(), dt_out.data());
|
|
}
|
|
EVENT_HANDLER(void, dtThread);
|
|
EVENT2(received, const PIString &, from, const PIByteArray &, data);
|
|
EVENT2(sendRequest, const PIString &, to, const PIByteArray &, data);
|
|
bool send(const PIByteArray & d);
|
|
void receivedPacket(uchar type, const PIByteArray & d);
|
|
void setDist(int dist);
|
|
PIByteArray data;
|
|
PIThread t;
|
|
PIDataTransfer dt_in, dt_out;
|
|
};
|
|
|
|
|
|
PIPeer::PeerData::PeerData(const PIString & n): PIObject(n) {
|
|
dt_in.setPacketSize(_PIPEER_MSG_SIZE);
|
|
dt_out.setPacketSize(_PIPEER_MSG_SIZE);
|
|
dt_in.setCRCEnabled(false);
|
|
dt_out.setCRCEnabled(false);
|
|
CONNECT1(void, PIByteArray &, &dt_in, sendRequest, this, dtSendRequestIn);
|
|
CONNECT1(void, PIByteArray &, &dt_out, sendRequest, this, dtSendRequestOut);
|
|
CONNECT1(void, bool, &dt_in, receiveFinished, this, dtReceiveFinishedIn);
|
|
CONNECT1(void, bool, &dt_out, receiveFinished, this, dtReceiveFinishedOut);
|
|
CONNECT0(void, &t, started, this, dtThread);
|
|
}
|
|
|
|
|
|
PIPeer::PeerData::~PeerData() {
|
|
dt_in.stop();
|
|
dt_out.stop();
|
|
t.stop();
|
|
if (!t.waitForFinish(1_s)) t.terminate();
|
|
}
|
|
|
|
|
|
void PIPeer::PeerData::dtThread() {
|
|
dt_out.send(data);
|
|
// piCoutObj << "send DT done";
|
|
}
|
|
|
|
|
|
bool PIPeer::PeerData::send(const PIByteArray & d) {
|
|
// piCout << "send ..." << t.isRunning();
|
|
if (t.isRunning()) return false;
|
|
data = d;
|
|
t.startOnce();
|
|
return true;
|
|
}
|
|
|
|
|
|
void PIPeer::PeerData::receivedPacket(uchar type, const PIByteArray & d) {
|
|
PIDataTransfer * dt = 0;
|
|
if (type == 3) dt = &dt_in;
|
|
if (type == 2) dt = &dt_out;
|
|
// piCoutObj << "DT received" << int(type) << d.size_s() << "...";
|
|
if (dt) dt->received(d);
|
|
// piCoutObj << "DT received" << int(type) << d.size_s() << "done";
|
|
}
|
|
|
|
|
|
void PIPeer::PeerData::setDist(int dist) {
|
|
dt_in.setTimeout(10 * dist);
|
|
}
|
|
|
|
|
|
PIPeer::PeerInfo::PeerAddress::PeerAddress(const PINetworkAddress & a, const PINetworkAddress & m): address(a), netmask(m) {
|
|
ping = -1.;
|
|
wait_ping = false;
|
|
last_ping = PISystemTime::current(true);
|
|
}
|
|
|
|
|
|
int PIPeer::PeerInfo::ping() const {
|
|
int ret = -1;
|
|
for (const auto & a: addresses)
|
|
if (a.ping > 0.) {
|
|
if (ret < 0)
|
|
ret = piRoundd(a.ping);
|
|
else
|
|
ret = piMini(ret, piRoundd(a.ping));
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
|
|
void PIPeer::PeerInfo::init() {
|
|
if (_data == 0) _data = new PeerData(name);
|
|
}
|
|
|
|
|
|
void PIPeer::PeerInfo::destroy() {
|
|
if (_data) delete _data;
|
|
_data = 0;
|
|
}
|
|
|
|
|
|
PINetworkAddress PIPeer::PeerInfo::fastestAddress() const {
|
|
double mp = -1.;
|
|
PINetworkAddress ret;
|
|
for (const auto & a: addresses) {
|
|
if (a.ping <= 0.) continue;
|
|
if ((mp < 0) || (mp > a.ping)) {
|
|
mp = a.ping;
|
|
ret = a.address;
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
|
|
void PIPeer::PeerInfo::addNeighbour(const PIString & n) {
|
|
if (!neighbours.contains(n)) neighbours << n;
|
|
}
|
|
|
|
|
|
void PIPeer::PeerInfo::addNeighbours(const PIStringList & l) {
|
|
for (const auto & n: l)
|
|
if (!neighbours.contains(n)) neighbours << n;
|
|
}
|
|
|
|
void PIPeer::PeerInfo::removeNeighbour(const PIString & n) {
|
|
neighbours.removeAll(n);
|
|
}
|
|
|
|
|
|
void PIPeer::PeerInfo::resetPing() {
|
|
for (auto & a: addresses)
|
|
a.ping = -1;
|
|
}
|
|
|
|
|
|
REGISTER_DEVICE(PIPeer)
|
|
|
|
PIPeer::PIPeer(const PIString & n)
|
|
: PIIODevice()
|
|
, inited__(false)
|
|
, eth_tcp_srv(PIEthernet::TCP_Server)
|
|
, eth_tcp_cli(PIEthernet::TCP_Client)
|
|
, diag_s(false)
|
|
, diag_d(false) {
|
|
sync_timer.setName("_S.PIPeer.sync");
|
|
// piCout << " PIPeer" << uint(this);
|
|
destroyed = false;
|
|
setDebug(false);
|
|
PIMutexLocker mbl(mc_mutex);
|
|
PIMutexLocker ethl(eth_mutex);
|
|
PIMutexLocker pl(peers_mutex);
|
|
PIMutexLocker sl(send_mutex);
|
|
changeName(n);
|
|
setReopenTimeout(100_ms);
|
|
read_buffer_size = 128;
|
|
self_info.dist = 0;
|
|
self_info.time = PISystemTime::current();
|
|
randomize();
|
|
CONNECT1(void, int, &sync_timer, tickEvent, this, timerEvent);
|
|
prev_ifaces = PIEthernet::interfaces();
|
|
no_timer = false;
|
|
sync_timer.addDelimiter(5);
|
|
}
|
|
|
|
|
|
PIPeer::~PIPeer() {
|
|
// piCout << "~PIPeer" << uint(this);
|
|
stop();
|
|
if (destroyed) return;
|
|
destroyed = true;
|
|
destroy();
|
|
}
|
|
|
|
|
|
void PIPeer::timerEvent(int delim) {
|
|
// piCoutObj << "timerEvent" << delim;
|
|
if (no_timer) return;
|
|
switch (delim) {
|
|
case 1: // every 1 s
|
|
syncPeers();
|
|
piMSleep(100);
|
|
pingNeighbours();
|
|
// piCoutObj << "isOpened" << isOpened();
|
|
break;
|
|
case 5: // every 5 s
|
|
checkNetwork();
|
|
break;
|
|
}
|
|
}
|
|
|
|
|
|
void PIPeer::initEths(PIStringList al) {
|
|
// piCoutObj << "initEths start";
|
|
PIEthernet * ce;
|
|
const PIEthernet::Interface * cint = 0;
|
|
for (const auto & a: al) {
|
|
ce = new PIEthernet();
|
|
ce->setDebug(false);
|
|
ce->setName("_S.PIPeer.traf_rec_" + a);
|
|
ce->setParameters(0);
|
|
bool ok = false;
|
|
for (int p = _PIPEER_TRAFFIC_PORT_S; p < _PIPEER_TRAFFIC_PORT_E; ++p) {
|
|
ce->setReadAddress(a, p);
|
|
if (ce->open()) {
|
|
eths_traffic << ce;
|
|
cint = prev_ifaces.getByAddress(a);
|
|
self_info.addresses << PeerInfo::PeerAddress(ce->path(), cint == 0 ? "255.255.255.0" : cint->netmask);
|
|
CONNECT2(void, const uchar *, ssize_t, ce, threadedReadEvent, this, dataRead);
|
|
ce->startThreadedRead();
|
|
// piCoutObj << "dc binded to" << ce->path();
|
|
// piCoutObj << "add eth" << a;
|
|
ok = true;
|
|
break;
|
|
}
|
|
}
|
|
if (!ok) delete ce;
|
|
}
|
|
eth_send.setDebug(false);
|
|
eth_send.setName("_S.PIPeer.traf_send");
|
|
eth_send.setParameters(0);
|
|
// piCoutObj << "initEths ok";
|
|
}
|
|
|
|
|
|
void PIPeer::initMBcasts(PIStringList al) {
|
|
PIEthernet * ce;
|
|
const PIEthernet::Interface * cint;
|
|
PIString nm;
|
|
al << _PIPEER_MULTICAST_IP;
|
|
// piCoutObj << "initMBcasts start" << al;
|
|
for (const auto & a: al) {
|
|
// piCout << "mcast try" << a;
|
|
ce = new PIEthernet();
|
|
ce->setDebug(false);
|
|
ce->setName("_S.PIPeer.mcast_" + a);
|
|
ce->setParameters(0);
|
|
ce->setSendAddress(_PIPEER_MULTICAST_IP, _PIPEER_MULTICAST_PORT);
|
|
ce->setReadAddress(a, _PIPEER_MULTICAST_PORT);
|
|
ce->setMulticastTTL(_PIPEER_MULTICAST_TTL);
|
|
ce->joinMulticastGroup(_PIPEER_MULTICAST_IP);
|
|
if (ce->open()) {
|
|
eths_mcast << ce;
|
|
CONNECT2(void, const uchar *, ssize_t, ce, threadedReadEvent, this, mbcastRead);
|
|
ce->startThreadedRead();
|
|
// piCout << "mcast bind to" << a << ce->sendIP();
|
|
} else {
|
|
delete ce;
|
|
// piCoutObj << "invalid address for mcast" << a;
|
|
}
|
|
}
|
|
al.removeAll(_PIPEER_MULTICAST_IP);
|
|
for (const auto & a: al) {
|
|
ce = new PIEthernet();
|
|
ce->setDebug(false);
|
|
ce->setName("_S.PIPeer.bcast_" + a);
|
|
ce->setParameters(PIEthernet::Broadcast);
|
|
cint = prev_ifaces.getByAddress(a);
|
|
nm = (cint == 0) ? "255.255.255.0" : cint->netmask;
|
|
ce->setSendAddress(PIEthernet::getBroadcast(a, nm), _PIPEER_BROADCAST_PORT);
|
|
ce->setReadAddress(a, _PIPEER_BROADCAST_PORT);
|
|
if (ce->open()) {
|
|
eths_bcast << ce;
|
|
CONNECT2(void, const uchar *, ssize_t, ce, threadedReadEvent, this, mbcastRead);
|
|
ce->startThreadedRead();
|
|
// piCout << "mc BC try" << a << nm << ce->sendIP();
|
|
// piCout << "bcast bind to" << a << nm;
|
|
} else {
|
|
delete ce;
|
|
// piCoutObj << "invalid address for bcast" << a;
|
|
}
|
|
}
|
|
eth_lo.setName("_S.PIPeer.lo");
|
|
eth_lo.setParameters(PIEthernet::SeparateSockets);
|
|
eth_lo.init();
|
|
cint = prev_ifaces.getByAddress("127.0.0.1");
|
|
for (int p = _PIPEER_LOOPBACK_PORT_S; p <= _PIPEER_LOOPBACK_PORT_E; ++p) {
|
|
eth_lo.setReadAddress("127.0.0.1", p);
|
|
if (eth_lo.open()) {
|
|
eth_lo.setSendIP("127.0.0.1");
|
|
CONNECT2(void, const uchar *, ssize_t, ð_lo, threadedReadEvent, this, mbcastRead);
|
|
eth_lo.startThreadedRead();
|
|
// piCout << "lo binded to" << eth_lo.readAddress() << eth_lo.sendAddress();
|
|
// piCout << "add eth" << ta;
|
|
break;
|
|
}
|
|
}
|
|
eth_tcp_srv.setName("_S.PIPeer.TCP_Server");
|
|
eth_tcp_srv.init();
|
|
eth_tcp_srv.listen("0.0.0.0", _PIPEER_TCP_PORT, true);
|
|
eth_tcp_srv.setDebug(false);
|
|
CONNECT1(void, PIEthernet *, ð_tcp_srv, newConnection, this, newTcpClient);
|
|
eth_tcp_srv.startThreadedRead();
|
|
eth_tcp_cli.setName("_S.PIPeer.TCP_Client");
|
|
eth_tcp_cli.init();
|
|
eth_tcp_cli.setDebug(false);
|
|
tcpClientReconnect();
|
|
CONNECT2(void, const uchar *, ssize_t, ð_tcp_cli, threadedReadEvent, this, mbcastRead);
|
|
CONNECTU(ð_tcp_cli, disconnected, this, tcpClientReconnect);
|
|
eth_tcp_cli.startThreadedRead();
|
|
if (eths_mcast.isEmpty() && eths_bcast.isEmpty() && !eth_lo.isOpened())
|
|
piCoutObj << "Warning! Can`t find suitable network interface for multicast receive, check for exists at least one interface with "
|
|
"multicasting enabled!";
|
|
// piCoutObj << "initMBcasts ok";
|
|
}
|
|
|
|
|
|
void PIPeer::destroy() {
|
|
sync_timer.stopAndWait();
|
|
diag_s.stopAndWait();
|
|
diag_d.stopAndWait();
|
|
PIMutexLocker ml(peers_mutex);
|
|
for (auto & p: peers)
|
|
if (p._data) {
|
|
p._data->dt_in.stop();
|
|
p._data->dt_out.stop();
|
|
p._data->t.stopAndWait();
|
|
}
|
|
destroyEths();
|
|
for (auto * i: eths_mcast) {
|
|
if (!i) continue;
|
|
i->stopAndWait();
|
|
}
|
|
for (auto * i: eths_bcast) {
|
|
if (!i) continue;
|
|
i->stopAndWait();
|
|
}
|
|
eth_lo.stopAndWait();
|
|
eth_tcp_srv.stopAndWait();
|
|
eth_tcp_cli.stopAndWait();
|
|
sendSelfRemove();
|
|
eth_lo.close();
|
|
eth_tcp_srv.close();
|
|
eth_tcp_cli.close();
|
|
destroyMBcasts();
|
|
eth_send.close();
|
|
for (auto & p: peers)
|
|
p.destroy();
|
|
peers.clear();
|
|
destroyed = true;
|
|
}
|
|
|
|
|
|
void PIPeer::destroyEths() {
|
|
for (auto * i: eths_traffic) {
|
|
if (!i) continue;
|
|
i->stopAndWait();
|
|
i->close();
|
|
}
|
|
piDeleteAllAndClear(eths_traffic);
|
|
}
|
|
|
|
|
|
void PIPeer::destroyMBcasts() {
|
|
for (auto * i: eths_mcast) {
|
|
if (!i) continue;
|
|
i->leaveMulticastGroup(_PIPEER_MULTICAST_IP);
|
|
i->stopAndWait();
|
|
i->close();
|
|
}
|
|
for (auto * i: eths_bcast) {
|
|
if (!i) continue;
|
|
i->stopAndWait();
|
|
i->close();
|
|
}
|
|
piDeleteAllAndClear(eths_mcast);
|
|
piDeleteAllAndClear(eths_bcast);
|
|
eth_lo.stopAndWait();
|
|
eth_lo.close();
|
|
eth_tcp_srv.stopAndWait();
|
|
}
|
|
|
|
|
|
PIPeer::PeerInfo * PIPeer::quickestPeer(const PIString & to) {
|
|
if (!peers_map.contains(to)) return 0;
|
|
// piCout << "*** search quickest peer" << to;
|
|
PIVector<PeerInfo *> tp = addresses_map.value(to);
|
|
if (tp.isEmpty()) return 0;
|
|
return tp.back();
|
|
}
|
|
|
|
|
|
bool PIPeer::send(const PIString & to, const void * data, int size) {
|
|
PIByteArray ba(data, size);
|
|
// piCoutObj << "send" << ba.size_s() << "bytes" << _PIPEER_MSG_SIZE;
|
|
if (ba.size_s() <= _PIPEER_MSG_SIZE) {
|
|
ba.insert(0, uchar(1));
|
|
return sendInternal(to, ba);
|
|
} else {
|
|
// ba.insert(0, uchar(2));
|
|
PIMutexLocker mlocker(peers_mutex);
|
|
PeerInfo * dp = const_cast<PeerInfo *>(getPeerByName(to));
|
|
if (!dp) return false;
|
|
return dp->_data->send(ba);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
bool PIPeer::send(const PeerInfo * to, const PIByteArray & data) {
|
|
if (!to) return false;
|
|
return send(to->name, data.data(), data.size_s());
|
|
}
|
|
|
|
|
|
bool PIPeer::send(const PeerInfo * to, const PIString & data) {
|
|
if (!to) return false;
|
|
return send(to->name, data.data(), data.size_s());
|
|
}
|
|
|
|
|
|
bool PIPeer::send(const PeerInfo * to, const void * data, int size) {
|
|
if (!to) return false;
|
|
return send(to->name, data, size);
|
|
}
|
|
|
|
|
|
void PIPeer::sendToAll(const PIByteArray & data) {
|
|
for (const auto & i: peers)
|
|
send(i.name, data.data(), data.size_s());
|
|
}
|
|
|
|
|
|
void PIPeer::sendToAll(const PIString & data) {
|
|
for (const auto & i: peers)
|
|
send(i.name, data.data(), data.size_s());
|
|
}
|
|
|
|
|
|
void PIPeer::sendToAll(const void * data, int size) {
|
|
for (const auto & i: peers)
|
|
send(i.name, data, size);
|
|
}
|
|
|
|
|
|
bool PIPeer::sendInternal(const PIString & to, const PIByteArray & data) {
|
|
PIMutexLocker mlocker(peers_mutex);
|
|
PeerInfo * dp = quickestPeer(to);
|
|
if (dp == 0) {
|
|
// piCoutObj << "Can`t find peer \"" << to << "\"!";
|
|
return false;
|
|
}
|
|
PIByteArray ba;
|
|
ba << int(4) << self_info.name << to << int(0) << data;
|
|
// piCoutObj << "sendInternal to" << to << data.size_s() << int(data.front());
|
|
if (!sendToNeighbour(dp, ba)) {
|
|
// piCoutObj << "send error";
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
void PIPeer::dtReceived(const PIString & from, const PIByteArray & data) {
|
|
PIByteArray ba = data;
|
|
dataReceived(from, data);
|
|
dataReceivedEvent(from, data);
|
|
if (trust_peer.isEmpty() || trust_peer == from) {
|
|
read_buffer_mutex.lock();
|
|
if (read_buffer.size_s() < read_buffer_size) read_buffer.enqueue(ba);
|
|
read_buffer_mutex.unlock();
|
|
}
|
|
}
|
|
|
|
|
|
bool PIPeer::dataRead(const uchar * readed, ssize_t size) {
|
|
if (destroyed) {
|
|
// piCout << "[PIPeer] SegFault";
|
|
return true;
|
|
}
|
|
if (size < 16) return true;
|
|
PIByteArray ba(readed, size), sba, pba;
|
|
int type, cnt;
|
|
PIString from, to;
|
|
ba >> type;
|
|
eth_mutex.lock();
|
|
// piCout << "dataRead lock";
|
|
if (type == 5) { // ping request
|
|
PINetworkAddress addr;
|
|
PISystemTime time;
|
|
ba >> to >> from >> addr >> time;
|
|
// piCout << "ping request" << to << from << addr;
|
|
PIMutexLocker plocker(peers_mutex);
|
|
if (from == self_info.name) { // send ping back
|
|
const PeerInfo * pi = getPeerByName(to);
|
|
if (pi) {
|
|
if (pi->isNeighbour()) {
|
|
sba << int(6) << to << from << addr << time;
|
|
// piCout << " ping from" << from << addr << ", send back to" << pi->name;
|
|
send_mutex.lock();
|
|
for (const auto & a: pi->addresses) {
|
|
if (eth_send.send(a.address, sba)) diag_s.received(sba.size_s());
|
|
}
|
|
send_mutex.unlock();
|
|
}
|
|
}
|
|
}
|
|
eth_mutex.unlock();
|
|
return true;
|
|
}
|
|
if (type == 6) { // ping request
|
|
PINetworkAddress addr;
|
|
PISystemTime time, ptime, ctime = PISystemTime::current(true);
|
|
ba >> to >> from >> addr >> time;
|
|
// piCout << "ping reply" << to << from << addr;
|
|
PIMutexLocker plocker(peers_mutex);
|
|
if (to == self_info.name) { // ping echo
|
|
for (auto & p: peers) {
|
|
if (!p.isNeighbour()) continue;
|
|
if (p.name != from) continue;
|
|
for (auto & a: p.addresses) {
|
|
if (a.address != addr) continue;
|
|
if (a.last_ping >= time) break;
|
|
ptime = ctime - time;
|
|
a.last_ping = time;
|
|
a.wait_ping = false;
|
|
if (a.ping < 0)
|
|
a.ping = ptime.toMilliseconds();
|
|
else
|
|
a.ping = 0.6 * a.ping + 0.4 * ptime.toMilliseconds();
|
|
// piCout << " ping echo" << p.name << a.address << a.ping;
|
|
eth_mutex.unlock();
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
eth_mutex.unlock();
|
|
return true;
|
|
}
|
|
// piCoutObj << "received data from" << from << "packet" << type;
|
|
if (type != 4) {
|
|
eth_mutex.unlock();
|
|
return true;
|
|
}
|
|
diag_d.received(size);
|
|
ba >> from >> to >> cnt >> pba;
|
|
// piCoutObj << "Received packet" << type << from << to << pba.size_s();
|
|
if (type == 4) { // data packet
|
|
if (to == self_info.name) { // my packet
|
|
uchar pt = pba.take_front();
|
|
// piCoutObj << "Received packet" << type << from << to << int(pt) << pba.size_s();
|
|
peers_mutex.lock();
|
|
PeerInfo * fp = const_cast<PeerInfo *>(getPeerByName(from));
|
|
if (fp == 0) {
|
|
peers_mutex.unlock();
|
|
eth_mutex.unlock();
|
|
return true;
|
|
}
|
|
if (pt == 1) {
|
|
peers_mutex.unlock();
|
|
eth_mutex.unlock();
|
|
dtReceived(from, pba);
|
|
return true;
|
|
}
|
|
if (pt == 2 || pt == 3) {
|
|
peers_mutex.unlock();
|
|
eth_mutex.unlock();
|
|
if (fp->_data) fp->_data->receivedPacket(pt, pba);
|
|
return true;
|
|
}
|
|
peers_mutex.unlock();
|
|
eth_mutex.unlock();
|
|
return true;
|
|
}
|
|
PIMutexLocker plocker(peers_mutex);
|
|
PeerInfo * dp = quickestPeer(to);
|
|
if (dp == 0) {
|
|
// piCoutObj << "Can`t find peer \"" << to << "\"!";
|
|
eth_mutex.unlock();
|
|
return true;
|
|
}
|
|
cnt++;
|
|
if (cnt > _PIPEER_MSG_TTL || from == dp->name) return true;
|
|
sba << type << from << to << cnt << pba;
|
|
// piCout << "translate packet" << from << "->" << to << ", ttl =" << cnt;
|
|
sendToNeighbour(dp, sba);
|
|
}
|
|
eth_mutex.unlock();
|
|
return true;
|
|
}
|
|
|
|
|
|
bool PIPeer::mbcastRead(const uchar * data, ssize_t size) {
|
|
if (destroyed) {
|
|
// piCout << "[PIPeer] SegFault";
|
|
return true;
|
|
}
|
|
if (size < 8) return true;
|
|
int type, dist;
|
|
PIByteArray ba(data, size);
|
|
ba >> type;
|
|
if (type <= 0 || type >= 4) return true;
|
|
PeerInfo pi;
|
|
ba >> pi.name;
|
|
// piCout << "received mb from" << pi.name << "packet" << type;
|
|
if (pi.name == self_info.name) return true;
|
|
PIMutexLocker locker(mc_mutex);
|
|
diag_s.received(size);
|
|
const PeerInfo * rpi = 0;
|
|
bool ch = false;
|
|
PIVector<PeerInfo> rpeers;
|
|
// piCout << "analyz ...";
|
|
switch (type) {
|
|
case 1: // new peer
|
|
// piCout << "new peer packet ...";
|
|
peers_mutex.lock();
|
|
if (!hasPeer(pi.name)) {
|
|
ba >> pi;
|
|
pi.sync = 0;
|
|
if (pi.dist == 0) {
|
|
pi.addNeighbour(self_info.name);
|
|
self_info.addNeighbour(pi.name);
|
|
}
|
|
pi.resetPing();
|
|
addPeer(pi);
|
|
buildMap();
|
|
// piCoutObj << "new peer \"" << pi.name << "\"" << " dist " << pi.dist;
|
|
// piCoutObj << mode() << opened_;
|
|
pi.dist++;
|
|
sendSelfInfo();
|
|
sendPeerInfo(pi);
|
|
ch = true;
|
|
// piCout << "new peer packet ok";
|
|
}
|
|
peers_mutex.unlock();
|
|
if (ch) {
|
|
peerConnected(pi.name);
|
|
peerConnectedEvent(pi.name);
|
|
}
|
|
break;
|
|
case 2: // remove peer
|
|
// piCout << "remove peer packet ..." << pi.name;
|
|
peers_mutex.lock();
|
|
removeNeighbour(pi.name);
|
|
rpi = getPeerByName(pi.name);
|
|
if (rpi) {
|
|
dist = rpi->dist;
|
|
addToRemoved(*rpi);
|
|
removePeer(pi.name);
|
|
// piCoutObj << "remove peer \"" << pi.name << "\"";
|
|
if (dist == 0) self_info.removeNeighbour(pi.name);
|
|
sendPeerRemove(pi.name);
|
|
buildMap();
|
|
ch = true;
|
|
// piCout << "remove peer packet ok";
|
|
}
|
|
peers_mutex.unlock();
|
|
if (ch) {
|
|
peerDisconnected(pi.name);
|
|
peerDisconnectedEvent(pi.name);
|
|
}
|
|
break;
|
|
case 3: // sync peers
|
|
// piCout << "sync packet ...";
|
|
ba >> pi >> rpeers;
|
|
rpeers << pi;
|
|
// piCoutObj << "rec sync " << rpeers.size_s() << " peers";
|
|
peers_mutex.lock();
|
|
if (!self_info.neighbours.contains(pi.name)) {
|
|
// piCoutObj << "add new nei to me" << pi.name;
|
|
self_info.addNeighbour(pi.name);
|
|
PeerInfo * np = peers_map.value(pi.name);
|
|
if (np) {
|
|
np->addNeighbour(self_info.name);
|
|
np->dist = 0;
|
|
}
|
|
ch = true;
|
|
}
|
|
for (auto & rpeer: rpeers) {
|
|
// piCout << " to sync " << rpeer.name;
|
|
if (rpeer.name == self_info.name) continue;
|
|
bool exist = false;
|
|
for (auto & peer: peers) {
|
|
if (peer.name == rpeer.name) {
|
|
exist = true;
|
|
if (isPeerRecent(peer, rpeer)) {
|
|
// piCout << "synced " << peer.name;
|
|
for (int z = 0; z < rpeer.addresses.size_s(); ++z) {
|
|
PeerInfo::PeerAddress & ra(rpeer.addresses[z]);
|
|
for (int k = 0; k < peer.addresses.size_s(); ++k) {
|
|
PeerInfo::PeerAddress & a(peer.addresses[k]);
|
|
if (ra.address == a.address) {
|
|
ra.ping = a.ping;
|
|
ra.wait_ping = a.wait_ping;
|
|
ra.last_ping = a.last_ping;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
peer.was_update = true;
|
|
peer.addresses = rpeer.addresses;
|
|
peer.cnt = rpeer.cnt;
|
|
peer.time = rpeer.time;
|
|
peer.addNeighbours(rpeer.neighbours);
|
|
rpeer.neighbours = peer.neighbours;
|
|
if (peer.name == pi.name) peer.sync = 0;
|
|
ch = true;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
if (exist || isRemoved(rpeer)) continue;
|
|
rpeer.dist++;
|
|
if (rpeer.name == pi.name) rpeer.dist = 0;
|
|
rpeer.resetPing();
|
|
addPeer(rpeer);
|
|
ch = true;
|
|
peerConnected(rpeer.name);
|
|
peerConnectedEvent(rpeer.name);
|
|
}
|
|
// piCout << "***";
|
|
// piCout << self_info.name << self_info.neighbours;
|
|
for (auto & i: peers) {
|
|
if (i.dist == 0) {
|
|
self_info.addNeighbour(i.name);
|
|
i.addNeighbour(self_info.name);
|
|
}
|
|
// piCout << i.name << i.neighbours;
|
|
}
|
|
if (ch) buildMap();
|
|
peers_mutex.unlock();
|
|
// piCoutObj << "after sync " << peers.size_s() << " peers";
|
|
break;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
bool PIPeer::sendToNeighbour(PIPeer::PeerInfo * peer, const PIByteArray & ba) {
|
|
PINetworkAddress addr = peer->fastestAddress();
|
|
// piCout << "[PIPeer] sendToNeighbour" << peer->name << addr << ba.size_s() << "bytes ...";
|
|
send_mutex.lock();
|
|
bool ok = eth_send.send(addr, ba);
|
|
// piCout << "[PIPeer] sendToNeighbour" << (ok ? "ok" : "fail");
|
|
if (ok) diag_d.sended(ba.size_s());
|
|
send_mutex.unlock();
|
|
return ok;
|
|
}
|
|
|
|
|
|
void PIPeer::sendMBcast(const PIByteArray & ba) {
|
|
send_mc_mutex.lock();
|
|
// piCout << "sendMBcast" << ba.size() << "bytes ...";
|
|
for (auto * e: eths_mcast) {
|
|
if (e->isOpened())
|
|
if (e->send(ba)) diag_s.sended(ba.size_s());
|
|
}
|
|
for (auto * e: eths_bcast) {
|
|
if (e->isOpened())
|
|
if (e->send(ba)) diag_s.sended(ba.size_s());
|
|
}
|
|
for (int p = _PIPEER_LOOPBACK_PORT_S; p <= _PIPEER_LOOPBACK_PORT_E; ++p) {
|
|
eth_lo.setSendPort(p);
|
|
if (eth_lo.send(ba)) diag_s.sended(ba.size_s());
|
|
}
|
|
PIVector<PIEthernet *> cl = eth_tcp_srv.clients();
|
|
for (auto * e: cl) {
|
|
if (e->isOpened() && e->isConnected())
|
|
if (e->send(ba)) diag_s.sended(ba.size_s());
|
|
}
|
|
if (eth_tcp_cli.isOpened() && eth_tcp_cli.isConnected()) {
|
|
if (eth_tcp_cli.send(ba)) diag_s.sended(ba.size_s());
|
|
}
|
|
// piCout << "sendMBcast ok";
|
|
send_mc_mutex.unlock();
|
|
}
|
|
|
|
|
|
void PIPeer::removeNeighbour(const PIString & name) {
|
|
for (auto & p: peers)
|
|
p.neighbours.removeOne(name);
|
|
self_info.removeNeighbour(name);
|
|
}
|
|
|
|
|
|
void PIPeer::addPeer(const PIPeer::PeerInfo & pd) {
|
|
peers << pd;
|
|
PeerInfo & p(peers.back());
|
|
p.init();
|
|
CONNECT2(void, const PIString &, const PIByteArray &, p._data, sendRequest, this, sendInternal)
|
|
CONNECT2(void, const PIString &, const PIByteArray &, p._data, received, this, dtReceived)
|
|
}
|
|
|
|
|
|
bool PIPeer::removePeer(const PIString & name) {
|
|
for (int i = 0; i < peers.size_s(); ++i)
|
|
if (peers[i].name == name) {
|
|
peers[i].destroy();
|
|
peers.remove(i);
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
|
|
void PIPeer::sendPeerInfo(const PeerInfo & info) {
|
|
PIByteArray ba;
|
|
ba << int(1) << info.name << info;
|
|
sendMBcast(ba);
|
|
}
|
|
|
|
|
|
void PIPeer::sendPeerRemove(const PIString & peer) {
|
|
// piCout << name() << "sendPeerRemove" << peer;
|
|
PIByteArray ba;
|
|
ba << int(2) << peer;
|
|
sendMBcast(ba);
|
|
}
|
|
|
|
|
|
void PIPeer::pingNeighbours() {
|
|
PIMutexLocker ml(peers_mutex);
|
|
PIByteArray ba, sba;
|
|
ba << int(5) << self_info.name;
|
|
// piCoutObj << "*** pingNeighbours" << peers.size() << "...";
|
|
for (auto & p: peers) {
|
|
if (!p.isNeighbour()) continue;
|
|
// piCout << " ping neighbour" << p.name << p.ping();
|
|
send_mutex.lock();
|
|
for (auto & a: p.addresses) {
|
|
// piCout << " address" << a.address << a.wait_ping;
|
|
if (a.wait_ping) {
|
|
if ((PISystemTime::current(true) - a.last_ping).abs().toSeconds() <= _PIPEER_PING_TIMEOUT) continue;
|
|
a.ping = -1.;
|
|
}
|
|
a.wait_ping = true;
|
|
sba = ba;
|
|
sba << p.name << a.address << PISystemTime::current(true);
|
|
// piCout << "ping" << p.name << a.address << a.last_ping;
|
|
if (eth_send.send(a.address, sba)) diag_s.sended(sba.size_s());
|
|
}
|
|
send_mutex.unlock();
|
|
}
|
|
// piCout << "*** pingNeighbours" << peers.size() << "done";
|
|
}
|
|
|
|
|
|
bool PIPeer::openDevice() {
|
|
PIConfig conf(
|
|
#ifndef WINDOWS
|
|
"/etc/pip.conf"
|
|
#else
|
|
"pip.conf"
|
|
#endif
|
|
,
|
|
PIIODevice::ReadOnly);
|
|
server_ip = conf.getValue("peer_server_ip", "").toString();
|
|
reinit();
|
|
diag_d.reset();
|
|
diag_s.reset();
|
|
// piCoutObj << "open...";
|
|
return true;
|
|
}
|
|
|
|
|
|
bool PIPeer::closeDevice() {
|
|
return false;
|
|
}
|
|
|
|
|
|
void PIPeer::syncPeers() {
|
|
// piCout << "[PIPeer \"" + self_info.name + "\"] sync " << peers.size_s() << " peers";
|
|
PIMutexLocker locker(eth_mutex);
|
|
// piCout << "syncPeers lock";
|
|
PIString pn;
|
|
bool change = false;
|
|
PIStringList dpeers;
|
|
peers_mutex.lock();
|
|
for (int i = 0; i < peers.size_s(); ++i) {
|
|
PeerInfo & cp(peers[i]);
|
|
if (cp.sync > 3) {
|
|
pn = cp.name;
|
|
// piCout << "sync: remove " << pn;
|
|
cp.destroy();
|
|
addToRemoved(cp);
|
|
peers.remove(i);
|
|
sendPeerRemove(pn);
|
|
--i;
|
|
removeNeighbour(pn);
|
|
dpeers << pn;
|
|
change = true;
|
|
continue;
|
|
}
|
|
if (cp.was_update)
|
|
cp.sync = 0;
|
|
else
|
|
cp.sync++;
|
|
if (cp._data) cp._data->setDist(cp.dist + 1);
|
|
cp.was_update = false;
|
|
}
|
|
if (change) buildMap();
|
|
self_info.cnt++;
|
|
self_info.time = PISystemTime::current();
|
|
PIByteArray ba;
|
|
ba << int(3) << self_info.name << self_info << peers;
|
|
peers_mutex.unlock();
|
|
sendMBcast(ba);
|
|
for (const auto & p: dpeers) {
|
|
peerDisconnected(p);
|
|
peerDisconnectedEvent(p);
|
|
}
|
|
}
|
|
|
|
|
|
void PIPeer::checkNetwork() {
|
|
PIEthernet::InterfaceList ifaces = PIEthernet::interfaces();
|
|
if (prev_ifaces == ifaces) return;
|
|
prev_ifaces = ifaces;
|
|
reinit();
|
|
}
|
|
|
|
|
|
void PIPeer::reinit() {
|
|
no_timer = true;
|
|
PIMutexLocker mbl(mc_mutex);
|
|
PIMutexLocker ethl(eth_mutex);
|
|
// piCout << "reinit lock";
|
|
PIMutexLocker pl(peers_mutex);
|
|
PIMutexLocker sl(send_mutex);
|
|
initNetwork();
|
|
sendSelfInfo();
|
|
no_timer = false;
|
|
if (!sync_timer.isRunning()) sync_timer.start(1_Hz);
|
|
}
|
|
|
|
|
|
void PIPeer::changeName(const PIString & new_name) {
|
|
PIString name_ = new_name;
|
|
if (name_.isEmpty()) name_ = "rnd_" + PIString::fromNumber(randomi() % 1000);
|
|
setName(name_);
|
|
self_info.name = name_;
|
|
diag_d.setName(name_ + "_data");
|
|
diag_s.setName(name_ + "_service");
|
|
}
|
|
|
|
|
|
void PIPeer::setTcpServerIP(const PIString & ip) {
|
|
server_ip = ip;
|
|
tcpClientReconnect();
|
|
}
|
|
|
|
|
|
ssize_t PIPeer::bytesAvailable() const {
|
|
ssize_t ret = 0;
|
|
read_buffer_mutex.lock();
|
|
if (!read_buffer.isEmpty()) ret = read_buffer.back().size();
|
|
read_buffer_mutex.unlock();
|
|
return ret;
|
|
}
|
|
|
|
|
|
ssize_t PIPeer::readDevice(void * read_to, ssize_t max_size) {
|
|
iterrupted = false;
|
|
read_buffer_mutex.lock();
|
|
bool empty = read_buffer.isEmpty();
|
|
read_buffer_mutex.unlock();
|
|
while (empty) {
|
|
read_buffer_mutex.lock();
|
|
empty = read_buffer.isEmpty();
|
|
read_buffer_mutex.unlock();
|
|
if (iterrupted) {
|
|
return 0;
|
|
}
|
|
piMSleep(10);
|
|
}
|
|
read_buffer_mutex.lock();
|
|
if (!read_buffer.isEmpty()) {
|
|
PIByteArray ba = read_buffer.dequeue();
|
|
read_buffer_mutex.unlock();
|
|
ssize_t sz = piMini(ba.size_s(), max_size);
|
|
memcpy(read_to, ba.data(), sz);
|
|
if (iterrupted) {
|
|
return 0;
|
|
}
|
|
return sz;
|
|
}
|
|
read_buffer_mutex.unlock();
|
|
return 0;
|
|
}
|
|
|
|
|
|
ssize_t PIPeer::writeDevice(const void * data, ssize_t size) {
|
|
if (trust_peer.isEmpty()) {
|
|
sendToAll(data, size);
|
|
return size;
|
|
}
|
|
if (send(trust_peer, data, size))
|
|
return size;
|
|
else
|
|
return -1;
|
|
}
|
|
|
|
|
|
void PIPeer::interrupt() {
|
|
iterrupted = true;
|
|
}
|
|
|
|
|
|
void PIPeer::newTcpClient(PIEthernet * client) {
|
|
client->setName("_S.PIPeer.TCP_ServerClient" + client->path());
|
|
piCoutObj << "client" << client->path();
|
|
CONNECT2(void, const uchar *, ssize_t, client, threadedReadEvent, this, mbcastRead);
|
|
client->startThreadedRead();
|
|
}
|
|
|
|
|
|
PIString PIPeer::constructFullPathDevice() const {
|
|
PIString ret;
|
|
ret += self_info.name + ":" + trustPeerName();
|
|
return ret;
|
|
}
|
|
|
|
|
|
void PIPeer::configureFromFullPathDevice(const PIString & full_path) {
|
|
PIStringList pl = full_path.split(":");
|
|
for (int i = 0; i < pl.size_s(); ++i) {
|
|
PIString p(pl[i].trimmed());
|
|
switch (i) {
|
|
case 0: changeName(p); break;
|
|
case 1: setTrustPeerName(p); break;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
PIPropertyStorage PIPeer::constructVariantDevice() const {
|
|
PIPropertyStorage ret;
|
|
ret.addProperty("name", self_info.name);
|
|
ret.addProperty("trust peer", trustPeerName());
|
|
return ret;
|
|
}
|
|
|
|
|
|
void PIPeer::configureFromVariantDevice(const PIPropertyStorage & d) {
|
|
changeName(d.propertyValueByName("name").toString());
|
|
setTrustPeerName(d.propertyValueByName("trust peer").toString());
|
|
}
|
|
|
|
|
|
void PIPeer::initNetwork() {
|
|
// piCoutObj << "initNetwork ...";
|
|
eth_send.init();
|
|
destroyEths();
|
|
destroyMBcasts();
|
|
piMSleep(100);
|
|
// piCoutObj << self_info.addresses.size();
|
|
self_info.addresses.clear();
|
|
PIVector<PINetworkAddress> al = PIEthernet::allAddresses();
|
|
PIStringList sl;
|
|
for (const PINetworkAddress & a: al) {
|
|
sl << a.ipString();
|
|
}
|
|
initEths(sl);
|
|
// piCoutObj << sl << self_info.addresses.size();
|
|
sl.removeAll("127.0.0.1");
|
|
initMBcasts(sl);
|
|
diag_d.start();
|
|
diag_s.start();
|
|
// piCoutObj << "initNetwork done";
|
|
}
|
|
|
|
|
|
void PIPeer::buildMap() {
|
|
// piCout << "[PIPeer \"" + name_ + "\"] buildMap";
|
|
peers_map.clear();
|
|
addresses_map.clear();
|
|
for (auto & i: peers) {
|
|
i.trace = -1;
|
|
peers_map[i.name] = &i;
|
|
}
|
|
PIVector<PeerInfo *> cwave, nwave;
|
|
int cwi = 0;
|
|
self_info.trace = 0;
|
|
cwave << &self_info;
|
|
while (!cwave.isEmpty()) {
|
|
nwave.clear();
|
|
++cwi;
|
|
for (const auto * p: cwave) {
|
|
for (const auto & nn: p->neighbours) {
|
|
PeerInfo * np = peers_map.value(nn);
|
|
if (!np) continue;
|
|
if (np->trace >= 0) continue;
|
|
np->trace = cwi;
|
|
nwave << np;
|
|
}
|
|
}
|
|
cwave = nwave;
|
|
}
|
|
PIVector<PeerInfo *> cpath;
|
|
for (auto & c: peers) {
|
|
cpath.clear();
|
|
cpath << &c;
|
|
cwave << &c;
|
|
for (int w = c.trace - 1; w >= 0; --w) {
|
|
nwave.clear();
|
|
for (const auto * p: cwave) {
|
|
for (const auto & nn: p->neighbours) {
|
|
PeerInfo * np = peers_map.value(nn);
|
|
if (!np) continue;
|
|
if (np->trace != w) continue;
|
|
cpath << np;
|
|
nwave << np;
|
|
}
|
|
}
|
|
cwave = nwave;
|
|
}
|
|
addresses_map[c.name] = cpath;
|
|
// piCout << "map" << c.name << "=" << cpath;
|
|
}
|
|
}
|
|
|
|
|
|
void PIPeer::tcpClientReconnect() {
|
|
eth_tcp_cli.connect(server_ip, _PIPEER_TCP_PORT);
|
|
}
|
|
|
|
|
|
bool PIPeer::hasPeer(const PIString & name) {
|
|
for (const auto & i: peers)
|
|
if (i.name == name) return true;
|
|
return false;
|
|
}
|