/* PIP - Platform Independent Primitives Peer - named I/O ethernet node Ivan Pelipenko peri4ko@yandex.ru This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include "pipeer.h" #include "piconfig.h" #include "pidatatransfer.h" #include "pipropertystorage.h" #define _PIPEER_MSG_SIZE 4000 #define _PIPEER_MSG_TTL 100 #define _PIPEER_MULTICAST_TTL 4 #define _PIPEER_MULTICAST_IP "232.13.3.12" #define _PIPEER_LOOPBACK_PORT_S 13313 #define _PIPEER_LOOPBACK_PORT_E (13313 + 32) #define _PIPEER_MULTICAST_PORT 13360 #define _PIPEER_TCP_PORT _PIPEER_MULTICAST_PORT #define _PIPEER_BROADCAST_PORT 13361 #define _PIPEER_TRAFFIC_PORT_S 13400 #define _PIPEER_TRAFFIC_PORT_E 14000 #define _PIPEER_PING_TIMEOUT 5.0 class PIPeer::PeerData: public PIObject { PIOBJECT_SUBCLASS(PeerData, PIObject); public: PeerData(const PIString & n); ~PeerData(); EVENT_HANDLER1(void, dtSendRequestIn, PIByteArray &, data) { data.push_front(uchar(2)); sendRequest(name(), data); } EVENT_HANDLER1(void, dtSendRequestOut, PIByteArray &, data) { data.push_front(uchar(3)); sendRequest(name(), data); } EVENT_HANDLER1(void, dtReceiveFinishedIn, bool, ok) { if (ok) received(name(), dt_in.data()); } EVENT_HANDLER1(void, dtReceiveFinishedOut, bool, ok) { if (ok) received(name(), dt_out.data()); } EVENT_HANDLER(void, dtThread); EVENT2(received, const PIString &, from, const PIByteArray &, data); EVENT2(sendRequest, const PIString &, to, const PIByteArray &, data); bool send(const PIByteArray & d); void receivedPacket(uchar type, const PIByteArray & d); void setDist(int dist); PIByteArray data; PIThread t; PIDataTransfer dt_in, dt_out; }; PIPeer::PeerData::PeerData(const PIString & n): PIObject(n) { dt_in.setPacketSize(_PIPEER_MSG_SIZE); dt_out.setPacketSize(_PIPEER_MSG_SIZE); dt_in.setCRCEnabled(false); dt_out.setCRCEnabled(false); CONNECT1(void, PIByteArray &, &dt_in, sendRequest, this, dtSendRequestIn); CONNECT1(void, PIByteArray &, &dt_out, sendRequest, this, dtSendRequestOut); CONNECT1(void, bool, &dt_in, receiveFinished, this, dtReceiveFinishedIn); CONNECT1(void, bool, &dt_out, receiveFinished, this, dtReceiveFinishedOut); CONNECT0(void, &t, started, this, dtThread); } PIPeer::PeerData::~PeerData() { dt_in.stop(); dt_out.stop(); t.stop(); if (!t.waitForFinish(1000)) t.terminate(); } void PIPeer::PeerData::dtThread() { dt_out.send(data); // piCoutObj << "send DT done"; } bool PIPeer::PeerData::send(const PIByteArray & d) { // piCout << "send ..." << t.isRunning(); if (t.isRunning()) return false; data = d; t.startOnce(); return true; } void PIPeer::PeerData::receivedPacket(uchar type, const PIByteArray & d) { PIDataTransfer * dt = 0; if (type == 3) dt = &dt_in; if (type == 2) dt = &dt_out; // piCoutObj << "DT received" << int(type) << d.size_s() << "..."; if (dt) dt->received(d); // piCoutObj << "DT received" << int(type) << d.size_s() << "done"; } void PIPeer::PeerData::setDist(int dist) { dt_in.setTimeout(10 * dist); } PIPeer::PeerInfo::PeerAddress::PeerAddress(const PIEthernet::Address & a, const PIEthernet::Address & m): address(a), netmask(m) { ping = -1.; wait_ping = false; last_ping = PISystemTime::current(true); } int PIPeer::PeerInfo::ping() const { int ret = -1; piForeachC(PeerAddress & a, addresses) if (a.ping > 0.) { if (ret < 0) ret = piRoundd(a.ping); else ret = piMini(ret, piRoundd(a.ping)); } return ret; } void PIPeer::PeerInfo::init() { if (_data == 0) _data = new PeerData(name); } void PIPeer::PeerInfo::destroy() { if (_data) delete _data; _data = 0; } PIEthernet::Address PIPeer::PeerInfo::fastestAddress() const { double mp = -1.; PIEthernet::Address ret; piForeachC(PeerAddress & a, addresses) { if (a.ping <= 0.) continue; if ((mp < 0) || (mp > a.ping)) { mp = a.ping; ret = a.address; } } return ret; } REGISTER_DEVICE(PIPeer) PIPeer::PIPeer(const PIString & n) : PIIODevice() , inited__(false) , eth_tcp_srv(PIEthernet::TCP_Server) , eth_tcp_cli(PIEthernet::TCP_Client) , diag_s(false) , diag_d(false) { sync_timer.setName("__S__.PIPeer.sync_timer"); // piCout << " PIPeer" << uint(this); destroyed = false; setDebug(false); PIMutexLocker mbl(mc_mutex); PIMutexLocker ethl(eth_mutex); PIMutexLocker pl(peers_mutex); PIMutexLocker sl(send_mutex); changeName(n); setReopenTimeout(100); read_buffer_size = 128; self_info.dist = 0; self_info.time = PISystemTime::current(); randomize(); CONNECT2(void, void *, int, &sync_timer, tickEvent, this, timerEvent); prev_ifaces = PIEthernet::interfaces(); no_timer = false; sync_timer.addDelimiter(5); } PIPeer::~PIPeer() { // piCout << "~PIPeer" << uint(this); stop(); if (destroyed) return; destroyed = true; sync_timer.stop(); diag_s.stop(); diag_d.stop(); PIMutexLocker ml(peers_mutex); piForeach(PeerInfo & p, peers) if (p._data) { p._data->dt_in.stop(); p._data->dt_out.stop(); p._data->t.stopAndWait(); } destroyEths(); piForeach(PIEthernet * i, eths_mcast) { if (!i) continue; i->stopThreadedRead(); } piForeach(PIEthernet * i, eths_bcast) { if (!i) continue; i->stopThreadedRead(); } eth_lo.stopThreadedRead(); eth_tcp_srv.stopThreadedRead(); eth_tcp_cli.stopThreadedRead(); sendSelfRemove(); destroyMBcasts(); eth_send.close(); piForeach(PeerInfo & p, peers) p.destroy(); } void PIPeer::timerEvent(void * data, int delim) { // piCoutObj << "timerEvent" << delim; if (no_timer) return; switch (delim) { case 1: // every 1 s syncPeers(); piMSleep(100); pingNeighbours(); // piCoutObj << "isOpened" << isOpened(); break; case 5: // every 5 s checkNetwork(); break; } } void PIPeer::initEths(PIStringList al) { // piCoutObj << "initEths start"; PIEthernet * ce; const PIEthernet::Interface * cint = 0; piForeachC(PIString & a, al) { ce = new PIEthernet(); ce->setDebug(false); ce->setName("__S__PIPeer_traffic_eth_rec_" + a); ce->setParameters(0); bool ok = false; for (int p = _PIPEER_TRAFFIC_PORT_S; p < _PIPEER_TRAFFIC_PORT_E; ++p) { ce->setReadAddress(a, p); if (ce->open()) { eths_traffic << ce; cint = prev_ifaces.getByAddress(a); self_info.addresses << PeerInfo::PeerAddress(ce->path(), cint == 0 ? "255.255.255.0" : cint->netmask); CONNECT2(void, const uchar *, ssize_t, ce, threadedReadEvent, this, dataRead); ce->startThreadedRead(); // piCoutObj << "dc binded to" << ce->path(); // piCoutObj << "add eth" << a; ok = true; break; } } if (!ok) delete ce; } eth_send.setDebug(false); eth_send.setName("__S__PIPeer_traffic_eth_send"); eth_send.setParameters(0); // piCoutObj << "initEths ok"; } void PIPeer::initMBcasts(PIStringList al) { PIEthernet * ce; const PIEthernet::Interface * cint; PIString nm; al << _PIPEER_MULTICAST_IP; // piCoutObj << "initMBcasts start" << al; piForeachC(PIString & a, al) { // piCout << "mcast try" << a; ce = new PIEthernet(); ce->setDebug(false); ce->setName("__S__PIPeer_mcast_eth_" + a); ce->setParameters(0); ce->setSendAddress(_PIPEER_MULTICAST_IP, _PIPEER_MULTICAST_PORT); ce->setReadAddress(a, _PIPEER_MULTICAST_PORT); ce->setMulticastTTL(_PIPEER_MULTICAST_TTL); ce->joinMulticastGroup(_PIPEER_MULTICAST_IP); if (ce->open()) { eths_mcast << ce; CONNECT2(void, const uchar *, ssize_t, ce, threadedReadEvent, this, mbcastRead); ce->startThreadedRead(); // piCout << "mcast bind to" << a << ce->sendIP(); } else { delete ce; // piCoutObj << "invalid address for mcast" << a; } } al.removeAll(_PIPEER_MULTICAST_IP); piForeachC(PIString & a, al) { ce = new PIEthernet(); ce->setDebug(false); ce->setName("__S__PIPeer_bcast_eth_" + a); ce->setParameters(PIEthernet::Broadcast); cint = prev_ifaces.getByAddress(a); nm = (cint == 0) ? "255.255.255.0" : cint->netmask; ce->setSendAddress(PIEthernet::getBroadcast(a, nm), _PIPEER_BROADCAST_PORT); ce->setReadAddress(a, _PIPEER_BROADCAST_PORT); if (ce->open()) { eths_bcast << ce; CONNECT2(void, const uchar *, ssize_t, ce, threadedReadEvent, this, mbcastRead); ce->startThreadedRead(); // piCout << "mc BC try" << a << nm << ce->sendIP(); // piCout << "bcast bind to" << a << nm; } else { delete ce; // piCoutObj << "invalid address for bcast" << a; } } eth_lo.setName("__S__PIPeer_eth_loopback"); eth_lo.setParameters(PIEthernet::SeparateSockets); eth_lo.init(); cint = prev_ifaces.getByAddress("127.0.0.1"); for (int p = _PIPEER_LOOPBACK_PORT_S; p <= _PIPEER_LOOPBACK_PORT_E; ++p) { eth_lo.setReadAddress("127.0.0.1", p); if (eth_lo.open()) { eth_lo.setSendIP("127.0.0.1"); CONNECT2(void, const uchar *, ssize_t, ð_lo, threadedReadEvent, this, mbcastRead); eth_lo.startThreadedRead(); // piCout << "lo binded to" << eth_lo.readAddress() << eth_lo.sendAddress(); // piCout << "add eth" << ta; break; } } eth_tcp_srv.setName("__S__PIPeer_eth_TCP_Server"); eth_tcp_srv.init(); eth_tcp_srv.listen("0.0.0.0", _PIPEER_TCP_PORT, true); eth_tcp_srv.setDebug(false); CONNECT1(void, PIEthernet *, ð_tcp_srv, newConnection, this, newTcpClient); eth_tcp_srv.startThreadedRead(); eth_tcp_cli.setName("__S__PIPeer_eth_TCP_Client"); eth_tcp_cli.init(); eth_tcp_cli.setDebug(false); tcpClientReconnect(); CONNECT2(void, const uchar *, ssize_t, ð_tcp_cli, threadedReadEvent, this, mbcastRead); CONNECTU(ð_tcp_cli, disconnected, this, tcpClientReconnect); eth_tcp_cli.startThreadedRead(); if (eths_mcast.isEmpty() && eths_bcast.isEmpty() && !eth_lo.isOpened()) piCoutObj << "Warning! Can`t find suitable network interface for multicast receive, check for exists at least one interface with " "multicasting enabled!"; // piCoutObj << "initMBcasts ok"; } void PIPeer::destroyEths() { piForeach(PIEthernet * i, eths_traffic) { if (!i) continue; ((PIThread *)i)->stop(); ((PIThread *)i)->waitForFinish(100); i->stopThreadedRead(); i->close(); delete i; i = 0; } eths_traffic.clear(); } void PIPeer::destroyMBcasts() { piForeach(PIEthernet * i, eths_mcast) { if (!i) continue; ((PIThread *)i)->stop(); ((PIThread *)i)->waitForFinish(100); i->stopThreadedRead(); i->leaveMulticastGroup(_PIPEER_MULTICAST_IP); i->close(); delete i; i = 0; } eths_mcast.clear(); piForeach(PIEthernet * i, eths_bcast) { if (!i) continue; ((PIThread *)i)->stop(); ((PIThread *)i)->waitForFinish(100); i->stopThreadedRead(); i->close(); delete i; i = 0; } eths_bcast.clear(); ((PIThread *)ð_lo)->stop(); ((PIThread *)ð_lo)->waitForFinish(100); eth_lo.stopThreadedRead(); eth_lo.close(); eth_tcp_srv.stop(); } PIPeer::PeerInfo * PIPeer::quickestPeer(const PIString & to) { if (!peers_map.contains(to)) return 0; // piCout << "*** search quickest peer" << to; PIVector tp = addresses_map.value(to); if (tp.isEmpty()) return 0; return tp.back(); } bool PIPeer::send(const PIString & to, const void * data, int size) { PIByteArray ba(data, size); // piCoutObj << "send" << ba.size_s() << "bytes" << _PIPEER_MSG_SIZE; if (ba.size_s() <= _PIPEER_MSG_SIZE) { ba.insert(0, uchar(1)); return sendInternal(to, ba); } else { // ba.insert(0, uchar(2)); PIMutexLocker mlocker(peers_mutex); PeerInfo * dp = const_cast(getPeerByName(to)); if (!dp) return false; return dp->_data->send(ba); } return true; } bool PIPeer::sendInternal(const PIString & to, const PIByteArray & data) { PIMutexLocker mlocker(peers_mutex); PeerInfo * dp = quickestPeer(to); if (dp == 0) { // piCoutObj << "Can`t find peer \"" << to << "\"!"; return false; } PIByteArray ba; ba << int(4) << self_info.name << to << int(0) << data; // piCoutObj << "sendInternal to" << to << data.size_s() << int(data.front()); if (!sendToNeighbour(dp, ba)) { // piCoutObj << "send error"; return false; } return true; } void PIPeer::dtReceived(const PIString & from, const PIByteArray & data) { PIByteArray ba = data; dataReceived(from, data); dataReceivedEvent(from, data); if (trust_peer.isEmpty() || trust_peer == from) { read_buffer_mutex.lock(); if (read_buffer.size_s() < read_buffer_size) read_buffer.enqueue(ba); read_buffer_mutex.unlock(); } } bool PIPeer::dataRead(const uchar * readed, ssize_t size) { if (destroyed) { // piCout << "[PIPeer] SegFault"; return true; } if (size < 16) return true; PIByteArray ba(readed, size), sba, pba; int type, cnt; PIString from, to; ba >> type; eth_mutex.lock(); // piCout << "dataRead lock"; if (type == 5) { // ping request PIEthernet::Address addr; PISystemTime time; ba >> to >> from >> addr >> time; // piCout << "ping request" << to << from << addr; PIMutexLocker plocker(peers_mutex); if (from == self_info.name) { // send ping back const PeerInfo * pi = getPeerByName(to); if (pi) { if (pi->isNeighbour()) { sba << int(6) << to << from << addr << time; // piCout << " ping from" << from << addr << ", send back to" << pi->name; send_mutex.lock(); piForeachC(PeerInfo::PeerAddress & a, pi->addresses) { if (eth_send.send(a.address, sba)) diag_s.received(sba.size_s()); } send_mutex.unlock(); } } } eth_mutex.unlock(); return true; } if (type == 6) { // ping request PIEthernet::Address addr; PISystemTime time, ptime, ctime = PISystemTime::current(true); ba >> to >> from >> addr >> time; // piCout << "ping reply" << to << from << addr; PIMutexLocker plocker(peers_mutex); if (to == self_info.name) { // ping echo piForeach(PeerInfo & p, peers) { if (!p.isNeighbour()) continue; if (p.name != from) continue; piForeach(PeerInfo::PeerAddress & a, p.addresses) { if (a.address != addr) continue; if (a.last_ping >= time) break; ptime = ctime - time; a.last_ping = time; a.wait_ping = false; if (a.ping < 0) a.ping = ptime.toMilliseconds(); else a.ping = 0.6 * a.ping + 0.4 * ptime.toMilliseconds(); // piCout << " ping echo" << p.name << a.address << a.ping; eth_mutex.unlock(); return true; } } } eth_mutex.unlock(); return true; } // piCoutObj << "received data from" << from << "packet" << type; if (type != 4) { eth_mutex.unlock(); return true; } diag_d.received(size); ba >> from >> to >> cnt >> pba; // piCoutObj << "Received packet" << type << from << to << pba.size_s(); if (type == 4) { // data packet if (to == self_info.name) { // my packet uchar pt = pba.take_front(); // piCoutObj << "Received packet" << type << from << to << int(pt) << pba.size_s(); peers_mutex.lock(); PeerInfo * fp = const_cast(getPeerByName(from)); if (fp == 0) { peers_mutex.unlock(); eth_mutex.unlock(); return true; } if (pt == 1) { peers_mutex.unlock(); eth_mutex.unlock(); dtReceived(from, pba); return true; } if (pt == 2 || pt == 3) { peers_mutex.unlock(); eth_mutex.unlock(); if (fp->_data) fp->_data->receivedPacket(pt, pba); return true; } peers_mutex.unlock(); eth_mutex.unlock(); return true; } PIMutexLocker plocker(peers_mutex); PeerInfo * dp = quickestPeer(to); if (dp == 0) { // piCoutObj << "Can`t find peer \"" << to << "\"!"; eth_mutex.unlock(); return true; } cnt++; if (cnt > _PIPEER_MSG_TTL || from == dp->name) return true; sba << type << from << to << cnt << pba; // piCout << "translate packet" << from << "->" << to << ", ttl =" << cnt; sendToNeighbour(dp, sba); } eth_mutex.unlock(); return true; } bool PIPeer::mbcastRead(const uchar * data, ssize_t size) { if (destroyed) { // piCout << "[PIPeer] SegFault"; return true; } if (size < 8) return true; int type, dist; PIByteArray ba(data, size); ba >> type; if (type <= 0 || type >= 4) return true; PeerInfo pi; ba >> pi.name; // piCout << "received mb from" << pi.name << "packet" << type; if (pi.name == self_info.name) return true; PIMutexLocker locker(mc_mutex); diag_s.received(size); const PeerInfo * rpi = 0; bool ch = false; PIVector rpeers; // piCout << "analyz ..."; switch (type) { case 1: // new peer // piCout << "new peer packet ..."; peers_mutex.lock(); if (!hasPeer(pi.name)) { ba >> pi; pi.sync = 0; if (pi.dist == 0) { pi.addNeighbour(self_info.name); self_info.addNeighbour(pi.name); } pi.resetPing(); addPeer(pi); buildMap(); // piCoutObj << "new peer \"" << pi.name << "\"" << " dist " << pi.dist; // piCoutObj << mode() << opened_; pi.dist++; sendSelfInfo(); sendPeerInfo(pi); ch = true; // piCout << "new peer packet ok"; } peers_mutex.unlock(); if (ch) { peerConnected(pi.name); peerConnectedEvent(pi.name); } break; case 2: // remove peer // piCout << "remove peer packet ..." << pi.name; peers_mutex.lock(); removeNeighbour(pi.name); rpi = getPeerByName(pi.name); if (rpi) { dist = rpi->dist; addToRemoved(*rpi); removePeer(pi.name); // piCoutObj << "remove peer \"" << pi.name << "\""; if (dist == 0) self_info.removeNeighbour(pi.name); sendPeerRemove(pi.name); buildMap(); ch = true; // piCout << "remove peer packet ok"; } peers_mutex.unlock(); if (ch) { peerDisconnected(pi.name); peerDisconnectedEvent(pi.name); } break; case 3: // sync peers // piCout << "sync packet ..."; ba >> pi >> rpeers; rpeers << pi; // piCoutObj << "rec sync " << rpeers.size_s() << " peers"; peers_mutex.lock(); if (!self_info.neighbours.contains(pi.name)) { // piCoutObj << "add new nei to me" << pi.name; self_info.addNeighbour(pi.name); PeerInfo * np = peers_map.value(pi.name); if (np) { np->addNeighbour(self_info.name); np->dist = 0; } ch = true; } piForeach(PeerInfo & rpeer, rpeers) { // piCout << " to sync " << rpeer.name; if (rpeer.name == self_info.name) continue; bool exist = false; piForeach(PeerInfo & peer, peers) { if (peer.name == rpeer.name) { exist = true; if (isPeerRecent(peer, rpeer)) { // piCout << "synced " << peer.name; for (int z = 0; z < rpeer.addresses.size_s(); ++z) { PeerInfo::PeerAddress & ra(rpeer.addresses[z]); for (int k = 0; k < peer.addresses.size_s(); ++k) { PeerInfo::PeerAddress & a(peer.addresses[k]); if (ra.address == a.address) { ra.ping = a.ping; ra.wait_ping = a.wait_ping; ra.last_ping = a.last_ping; break; } } } peer.was_update = true; peer.addresses = rpeer.addresses; peer.cnt = rpeer.cnt; peer.time = rpeer.time; peer.addNeighbours(rpeer.neighbours); rpeer.neighbours = peer.neighbours; if (peer.name == pi.name) peer.sync = 0; ch = true; } break; } } if (exist || isRemoved(rpeer)) continue; rpeer.dist++; if (rpeer.name == pi.name) rpeer.dist = 0; rpeer.resetPing(); addPeer(rpeer); ch = true; peerConnected(rpeer.name); peerConnectedEvent(rpeer.name); } // piCout << "***"; // piCout << self_info.name << self_info.neighbours; piForeach(PeerInfo & i, peers) { if (i.dist == 0) { self_info.addNeighbour(i.name); i.addNeighbour(self_info.name); } // piCout << i.name << i.neighbours; } if (ch) buildMap(); peers_mutex.unlock(); // piCoutObj << "after sync " << peers.size_s() << " peers"; break; } return true; } bool PIPeer::sendToNeighbour(PIPeer::PeerInfo * peer, const PIByteArray & ba) { PIEthernet::Address addr = peer->fastestAddress(); // piCout << "[PIPeer] sendToNeighbour" << peer->name << addr << ba.size_s() << "bytes ..."; send_mutex.lock(); bool ok = eth_send.send(addr, ba); // piCout << "[PIPeer] sendToNeighbour" << (ok ? "ok" : "fail"); if (ok) diag_d.sended(ba.size_s()); send_mutex.unlock(); return ok; } void PIPeer::sendMBcast(const PIByteArray & ba) { send_mc_mutex.lock(); // piCout << "sendMBcast" << ba.size() << "bytes ..."; piForeach(PIEthernet * e, eths_mcast) { if (e->isOpened()) if (e->send(ba)) diag_s.sended(ba.size_s()); } piForeach(PIEthernet * e, eths_bcast) { if (e->isOpened()) if (e->send(ba)) diag_s.sended(ba.size_s()); } for (int p = _PIPEER_LOOPBACK_PORT_S; p <= _PIPEER_LOOPBACK_PORT_E; ++p) { eth_lo.setSendPort(p); if (eth_lo.send(ba)) diag_s.sended(ba.size_s()); } PIVector cl = eth_tcp_srv.clients(); piForeach(PIEthernet * e, cl) { if (e->isOpened() && e->isConnected()) if (e->send(ba)) diag_s.sended(ba.size_s()); } if (eth_tcp_cli.isOpened() && eth_tcp_cli.isConnected()) { if (eth_tcp_cli.send(ba)) diag_s.sended(ba.size_s()); } // piCout << "sendMBcast ok"; send_mc_mutex.unlock(); } void PIPeer::removeNeighbour(const PIString & name) { piForeach(PeerInfo & p, peers) p.neighbours.removeOne(name); self_info.removeNeighbour(name); } void PIPeer::addPeer(const PIPeer::PeerInfo & pd) { peers << pd; PeerInfo & p(peers.back()); p.init(); CONNECT2(void, const PIString &, const PIByteArray &, p._data, sendRequest, this, sendInternal) CONNECT2(void, const PIString &, const PIByteArray &, p._data, received, this, dtReceived) } bool PIPeer::removePeer(const PIString & name) { for (int i = 0; i < peers.size_s(); ++i) if (peers[i].name == name) { peers[i].destroy(); peers.remove(i); return true; } return false; } void PIPeer::sendPeerInfo(const PeerInfo & info) { PIByteArray ba; ba << int(1) << info.name << info; sendMBcast(ba); } void PIPeer::sendPeerRemove(const PIString & peer) { // piCout << name() << "sendPeerRemove" << peer; PIByteArray ba; ba << int(2) << peer; sendMBcast(ba); } void PIPeer::pingNeighbours() { PIMutexLocker ml(peers_mutex); PIByteArray ba, sba; ba << int(5) << self_info.name; // piCoutObj << "*** pingNeighbours" << peers.size() << "..."; piForeach(PeerInfo & p, peers) { if (!p.isNeighbour()) continue; // piCout << " ping neighbour" << p.name << p.ping(); send_mutex.lock(); piForeach(PeerInfo::PeerAddress & a, p.addresses) { // piCout << " address" << a.address << a.wait_ping; if (a.wait_ping) { if ((PISystemTime::current(true) - a.last_ping).abs().toSeconds() <= _PIPEER_PING_TIMEOUT) continue; a.ping = -1.; } a.wait_ping = true; sba = ba; sba << p.name << a.address << PISystemTime::current(true); // piCout << "ping" << p.name << a.address << a.last_ping; if (eth_send.send(a.address, sba)) diag_s.sended(sba.size_s()); } send_mutex.unlock(); } // piCout << "*** pingNeighbours" << peers.size() << "done"; } bool PIPeer::openDevice() { PIConfig conf( #ifndef WINDOWS "/etc/pip.conf" #else "pip.conf" #endif , PIIODevice::ReadOnly); server_ip = conf.getValue("peer_server_ip", "").toString(); reinit(); diag_d.reset(); diag_s.reset(); // piCoutObj << "open..."; return true; } bool PIPeer::closeDevice() { return false; } void PIPeer::syncPeers() { // piCout << "[PIPeer \"" + self_info.name + "\"] sync " << peers.size_s() << " peers"; PIMutexLocker locker(eth_mutex); // piCout << "syncPeers lock"; PIString pn; bool change = false; PIStringList dpeers; peers_mutex.lock(); for (int i = 0; i < peers.size_s(); ++i) { PeerInfo & cp(peers[i]); if (cp.sync > 3) { pn = cp.name; // piCout << "sync: remove " << pn; cp.destroy(); addToRemoved(cp); peers.remove(i); sendPeerRemove(pn); --i; removeNeighbour(pn); dpeers << pn; change = true; continue; } if (cp.was_update) cp.sync = 0; else cp.sync++; if (cp._data) cp._data->setDist(cp.dist + 1); cp.was_update = false; } if (change) buildMap(); self_info.cnt++; self_info.time = PISystemTime::current(); PIByteArray ba; ba << int(3) << self_info.name << self_info << peers; peers_mutex.unlock(); sendMBcast(ba); piForeachC(PIString & p, dpeers) { peerDisconnected(p); peerDisconnectedEvent(p); } } void PIPeer::checkNetwork() { PIEthernet::InterfaceList ifaces = PIEthernet::interfaces(); if (prev_ifaces == ifaces) return; prev_ifaces = ifaces; reinit(); } void PIPeer::reinit() { no_timer = true; PIMutexLocker mbl(mc_mutex); PIMutexLocker ethl(eth_mutex); // piCout << "reinit lock"; PIMutexLocker pl(peers_mutex); PIMutexLocker sl(send_mutex); initNetwork(); sendSelfInfo(); no_timer = false; if (!sync_timer.isRunning()) sync_timer.start(1000); } void PIPeer::changeName(const PIString & new_name) { PIString name_ = new_name; if (name_.isEmpty()) name_ = "rnd_" + PIString::fromNumber(randomi() % 1000); setName(name_); self_info.name = name_; diag_d.setName(name_ + "_data"); diag_s.setName(name_ + "_service"); } ssize_t PIPeer::bytesAvailable() const { ssize_t ret = 0; read_buffer_mutex.lock(); if (!read_buffer.isEmpty()) ret = read_buffer.back().size(); read_buffer_mutex.unlock(); return ret; } ssize_t PIPeer::readDevice(void * read_to, ssize_t max_size) { read_buffer_mutex.lock(); bool empty = read_buffer.isEmpty(); read_buffer_mutex.unlock(); while (empty) { read_buffer_mutex.lock(); empty = read_buffer.isEmpty(); read_buffer_mutex.unlock(); piMSleep(10); } read_buffer_mutex.lock(); if (!read_buffer.isEmpty()) { PIByteArray ba = read_buffer.dequeue(); read_buffer_mutex.unlock(); ssize_t sz = piMini(ba.size_s(), max_size); memcpy(read_to, ba.data(), sz); return sz; } read_buffer_mutex.unlock(); return 0; } ssize_t PIPeer::writeDevice(const void * data, ssize_t size) { if (trust_peer.isEmpty()) { sendToAll(data, size); return size; } if (send(trust_peer, data, size)) return size; else return -1; } void PIPeer::newTcpClient(PIEthernet * client) { client->setName("__S__PIPeer_eth_TCP_ServerClient" + client->path()); piCoutObj << "client" << client->path(); CONNECT2(void, const uchar *, ssize_t, client, threadedReadEvent, this, mbcastRead); client->startThreadedRead(); } PIString PIPeer::constructFullPathDevice() const { PIString ret; ret += self_info.name + ":" + trustPeerName(); return ret; } void PIPeer::configureFromFullPathDevice(const PIString & full_path) { PIStringList pl = full_path.split(":"); for (int i = 0; i < pl.size_s(); ++i) { PIString p(pl[i]); switch (i) { case 0: changeName(p); break; case 1: setTrustPeerName(p); break; } } } PIPropertyStorage PIPeer::constructVariantDevice() const { PIPropertyStorage ret; ret.addProperty("name", self_info.name); ret.addProperty("trust peer", trustPeerName()); return ret; } void PIPeer::configureFromVariantDevice(const PIPropertyStorage & d) { changeName(d.propertyValueByName("name").toString()); setTrustPeerName(d.propertyValueByName("trust peer").toString()); } void PIPeer::initNetwork() { // piCoutObj << "initNetwork ..."; eth_send.init(); destroyEths(); destroyMBcasts(); piMSleep(100); // piCoutObj << self_info.addresses.size(); self_info.addresses.clear(); PIVector al = PIEthernet::allAddresses(); PIStringList sl; for (const PIEthernet::Address & a: al) { sl << a.ipString(); } initEths(sl); // piCoutObj << sl << self_info.addresses.size(); sl.removeAll("127.0.0.1"); initMBcasts(sl); diag_d.start(); diag_s.start(); // piCoutObj << "initNetwork done"; } void PIPeer::buildMap() { // piCout << "[PIPeer \"" + name_ + "\"] buildMap"; peers_map.clear(); addresses_map.clear(); piForeach(PeerInfo & i, peers) { i.trace = -1; peers_map[i.name] = &i; } PIVector cwave, nwave; int cwi = 0; self_info.trace = 0; cwave << &self_info; while (!cwave.isEmpty()) { nwave.clear(); ++cwi; piForeachC(PeerInfo * p, cwave) { piForeachC(PIString & nn, p->neighbours) { PeerInfo * np = peers_map.value(nn); if (!np) continue; if (np->trace >= 0) continue; np->trace = cwi; nwave << np; } } cwave = nwave; } PIVector cpath; piForeach(PeerInfo & c, peers) { cpath.clear(); cpath << &c; cwave << &c; for (int w = c.trace - 1; w >= 0; --w) { nwave.clear(); piForeachC(PeerInfo * p, cwave) { piForeachC(PIString & nn, p->neighbours) { PeerInfo * np = peers_map.value(nn); if (!np) continue; if (np->trace != w) continue; cpath << np; nwave << np; } } cwave = nwave; } addresses_map[c.name] = cpath; // piCout << "map" << c.name << "=" << cpath; } } void PIPeer::tcpClientReconnect() { eth_tcp_cli.connect(server_ip, _PIPEER_TCP_PORT); }