/* PIP - Platform Independent Primitives Peer - named I/O ethernet node Copyright (C) 2015 Ivan Pelipenko peri4ko@gmail.com This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #include "pipeer.h" #define _PIPEER_MSG_SIZE 8000 #define _PIPEER_MULTICAST_TTL 4 #define _PIPEER_MULTICAST_IP "232.13.3.12" #define _PIPEER_LOOPBACK_PORT_S 13313 #define _PIPEER_LOOPBACK_PORT_E (13313+32) #define _PIPEER_MULTICAST_PORT 13360 #define _PIPEER_BROADCAST_PORT 13361 #define _PIPEER_TRAFFIC_PORT_S 13400 #define _PIPEER_TRAFFIC_PORT_E 14000 PIPeer::PeerInfo::Address::Address(const PIString & a, const PIString & m): address(a), netmask(m) { ping = -1.; wait_ping = false; last_ping = PISystemTime::current(true); } int PIPeer::PeerInfo::ping() const { int ret = -1; piForeachC (Address & a, addresses) if (a.ping > 0.) ret = piMaxi(ret, piRoundd(a.ping)); return ret; } PIString PIPeer::PeerInfo::fastestAddress() const { double mp = -1.; PIString ret; piForeachC (Address & a, addresses) if (a.ping > 0.) { mp = piMaxd(mp, a.ping); ret = a.address; } return ret; } PIPeer::PIPeer(const PIString & name_): PIObject() { setName(name_); self_info.name = name_; self_info.dist = 0; self_info.time = PISystemTime::current(); //joinMulticastGroup("239.240.241.242"); srand(uint(PISystemTime::current(true).toMicroseconds())); //id_ = self_info.name + "_" + PIString::fromNumber(rand()); CONNECTU(&timer, tickEvent, this, timerEvent); PIStringList sl = PIEthernet::allAddresses(); initEths(sl); sl.removeAll("127.0.0.1"); initMBcasts(sl); sendSelfInfo(); timer.addDelimiter(5); timer.start(200); } PIPeer::~PIPeer() { timer.stop(); diag_s.stop(); diag_d.stop(); piForeach (PIEthernet * i, eths_traffic) { i->stopThreadedRead(); delete i; } eths_traffic.clear(); piForeach (PIEthernet * i, eths_mcast) i->stopThreadedRead(); piForeach (PIEthernet * i, eths_bcast) i->stopThreadedRead(); eth_send.stopThreadedRead(); eth_lo.stopThreadedRead(); sendSelfRemove(); destroyMBcasts(); } void PIPeer::timerEvent(void * data, int delim) { switch (delim) { case 5: // 1 s syncPeers(); break; } //send("broadcast", 9); } void PIPeer::initEths(PIStringList al) { PIEthernet * ce; PIEthernet::InterfaceList il = PIEthernet::interfaces(); const PIEthernet::Interface * cint = 0; piForeachC (PIString & a, al) { ce = new PIEthernet(); ce->setDebug(false); ce->setName("__S__PIPeer_traffic_eth_rec_" + a); ce->setParameters(0); ce->setThreadSafe(true); bool ok = false; for (int p = _PIPEER_TRAFFIC_PORT_S; p < _PIPEER_TRAFFIC_PORT_E; ++p) { ce->setReadAddress(a, p); if (ce->open()) { eths_traffic << ce; cint = il.getByAddress(a); self_info.addresses << PeerInfo::Address(ce->path(), cint == 0 ? "255.255.255.0" : cint->netmask); CONNECTU(ce, threadedReadEvent, this, dataRead); ce->startThreadedRead(); //piCout << "dc binded to" << ce->path(); //piCout << "add eth" << ta; ok = true; break; } } if (!ok) delete ce; } eth_send.setDebug(false); eth_send.setName("__S__PIPeer_traffic_eth_send"); eth_send.setParameters(0); } void PIPeer::initMBcasts(PIStringList al) { destroyMBcasts(); PIEthernet * ce; PIEthernet::InterfaceList il = PIEthernet::interfaces(); const PIEthernet::Interface * cint; PIString nm; al << _PIPEER_MULTICAST_IP; piForeachC (PIString & a, al) { //piCout << "mcast try" << a; ce = new PIEthernet(); ce->setDebug(false); ce->setName("__S__PIPeer_mcast_eth_" + a); ce->setParameters(0); ce->setSendAddress(_PIPEER_MULTICAST_IP, _PIPEER_MULTICAST_PORT); ce->setReadAddress(a, _PIPEER_MULTICAST_PORT); ce->setMulticastTTL(_PIPEER_MULTICAST_TTL); ce->joinMulticastGroup(_PIPEER_MULTICAST_IP); eths_mcast << ce; CONNECTU(ce, threadedReadEvent, this, mbcastRead); ce->startThreadedRead(); } piForeachC (PIString & a, al) { ce = new PIEthernet(); ce->setDebug(false); ce->setName("__S__PIPeer_bcast_eth_" + a); ce->setParameters(PIEthernet::Broadcast); cint = il.getByAddress(a); nm = (cint == 0) ? "255.255.255.0" : cint->netmask; ce->setSendAddress(PIEthernet::getBroadcast(a, nm), _PIPEER_BROADCAST_PORT); ce->setReadAddress(a, _PIPEER_BROADCAST_PORT); //piCout << "mc BC try" << a << nm << ce->sendIP(); //piCout << "bcast try" << a << nm; eths_bcast << ce; CONNECTU(ce, threadedReadEvent, this, mbcastRead); ce->startThreadedRead(); } eth_lo.setDebug(false); eth_lo.setName("__S__PIPeer_eth_loopback"); eth_lo.setParameters(0); cint = il.getByAddress("127.0.0.1"); for (int p = _PIPEER_LOOPBACK_PORT_S; p <= _PIPEER_LOOPBACK_PORT_E; ++p) { eth_lo.setReadAddress("127.0.0.1", p); if (eth_lo.open()) { eth_lo.setSendIP("127.0.0.1"); CONNECTU(ð_lo, threadedReadEvent, this, mbcastRead); eth_lo.startThreadedRead(); //piCout << "lo binded to" << eth_lo.readAddress(); //piCout << "add eth" << ta; break; } } if (eths_mcast.isEmpty()) piCoutObj << "Warning! Can`t find suitable network interface for multicast receive, check for exists at least one interface with multicasting enabled!"; if (eths_bcast.isEmpty()) piCoutObj << "Warning! Can`t find suitable network interface for broadcast receive, check for exists at least one interface with broadcasting enabled!"; } void PIPeer::destroyMBcasts() { piForeach (PIEthernet * i, eths_mcast) { i->leaveMulticastGroup(_PIPEER_MULTICAST_IP); delete i; } piForeach (PIEthernet * i, eths_bcast) delete i; eth_lo.stop(); eth_lo.close(); eths_mcast.clear(); eths_bcast.clear(); } PIPeer::PeerInfo * PIPeer::quickestPeer(const PIString & to) { if (!peers_map.contains(to)) return 0; //piCout << "*** search quickest peer" << to; PIVector tp = addresses_map.value(to); /*PeerInfo * dp = 0; int mping = 0x7FFFFFFF; for (int i = 0; i < tp.size_s(); ++i) { int p = tp[i]->ping(); if (mping > p && p > 0) { mping = p; dp = tp[i]; } } //piCout << "*** search quickest peer: found" << (dp ? dp->name : "0"); return dp;*/ if (tp.isEmpty()) return 0; return tp.back(); } bool PIPeer::send(const PIString & to, const void * data, int size) { PIMutexLocker mlocker(peers_mutex); PeerInfo * dp = quickestPeer(to); if (dp == 0) { //piCoutObj << "Can`t find peer \"" << to << "\"!"; return false; } PIByteArray ba; ba << int(4) << self_info.name << to << int(0) << size; PIByteArray fmsg(data, size), cmsg; int msg_count = (size - 1) / _PIPEER_MSG_SIZE + 1; //piCout << "[PIPeer] send" << size << "bytes in" << msg_count << "packets ..."; for (int i = 0; i < msg_count; ++i) { int csize = (i == msg_count - 1) ? ((size - 1) % _PIPEER_MSG_SIZE + 1) : _PIPEER_MSG_SIZE; cmsg = ba; cmsg << msg_count << i; cmsg.append(fmsg.data(i * _PIPEER_MSG_SIZE), csize); if (!sendToNeighbour(dp, cmsg)) return false; } //piCout << "[PIPeer] send" << size << "bytes ok"; return true; } bool PIPeer::dataRead(uchar * readed, int size) { if (size < 16) return true; PIByteArray ba(readed, size), sba; int type, cnt, rec_size; PIString from, to; ba >> type; PIMutexLocker locker(eth_mutex); //piCout << "[PIPeer \"" + name_ + "\"] Received packet" << type; if (type == 5) { // ping request PIString addr; PISystemTime time; ba >> to >> from >> addr >> time; //piCout << "ping request" << to << from << addr; PIMutexLocker plocker(peers_mutex); if (from == self_info.name) { // send ping back const PeerInfo * pi = getPeerByName(to); if (pi) { if (pi->isNeighbour()) { sba << int(6) << to << from << addr << time; //piCout << "ping from" << from << addr << ", send back to" << pi->name; piForeachC (PeerInfo::Address & a, pi->addresses) { if (eth_send.send(a.address, sba)) diag_s.received(sba.size_s()); } } } } return true; } if (type == 6) { // ping request PIString addr; PISystemTime time, ptime, ctime = PISystemTime::current(true); ba >> to >> from >> addr >> time; //piCout << "ping reply" << to << from << addr; PIMutexLocker plocker(peers_mutex); if (to == self_info.name) { // ping echo piForeach (PeerInfo & p, peers) { if (!p.isNeighbour()) continue; if (p.name != from) continue; piForeach (PeerInfo::Address & a, p.addresses) { if (a.address != addr) continue; if (a.last_ping >= time) piBreak; ptime = ctime - time; a.last_ping = time; a.wait_ping = false; if (a.ping < 0) a.ping = ptime.toMilliseconds(); else a.ping = 0.6 * a.ping + 0.4 * ptime.toMilliseconds(); //piCout << "*** ping echo" << p.name << a.address << a.ping; return true; } } } return true; } if (type != 4) return true; diag_d.received(size); ba >> from >> to >> cnt >> rec_size; //piCout << "[PIPeer \"" + name_ + "\"] Received packet" << /*type << from << to << cnt <<*/ rec_size; if (type == 4) { // data packet if (to == self_info.name) { // my packet int msg_count, cmsg; ba >> msg_count >> cmsg; //piCout << "[PIPeer \"" + name_ + "\"] Received packet" << type << from << to << cnt << rec_size << msg_count << cmsg; if (cmsg == 0 && msg_count == 1) { dataReceived(from, ba); dataReceivedEvent(from, ba); return true; } peers_mutex.lock(); PeerInfo * fp = const_cast(getPeerByName(from)); if (fp == 0) { peers_mutex.unlock(); return true; } PeerData & pd(fp->_data); if (cmsg == 0) { //piCout << "[PIPeer \"" + name_ + "\"] Packet clear" << rec_size; pd.clear(); pd.msg_count = msg_count; } //piCout << "[PIPeer \"" + name_ + "\"] Packet add" << cmsg << ba.size_s(); pd.addData(ba); bool frec = pd.isFullReceived(); PIByteArray rba(pd.data); peers_mutex.unlock(); if (frec) { dataReceived(from, rba); dataReceivedEvent(from, rba); //piCout << "[PIPeer \"" + name_ + "\"] Packet received" << pd.data.size_s(); } return true; } PIMutexLocker plocker(peers_mutex); PeerInfo * dp = quickestPeer(to); if (dp == 0) { //piCoutObj << "Can`t find peer \"" << to << "\"!"; return true; } cnt++; if (cnt > 100 || from == dp->name) return true; sba << type << from << to << cnt << rec_size; sba.append(ba); //piCoutObj << "Translate data packet" << type << from << to << cnt << rec_size; sendToNeighbour(dp, sba); } return true; } bool PIPeer::mbcastRead(uchar * data, int size) { if (size < 8) return true; int type, dist; PIByteArray ba(data, size); ba >> type; if (type <= 0 || type >= 4) return true; PeerInfo pi; ba >> pi.name; //piCout << "read type" << type << "from" << pi.name; if (pi.name == self_info.name) return true; PIMutexLocker locker(mc_mutex); diag_s.received(size); const PeerInfo * rpi = 0; bool ch = false; PIVector rpeers; //piCout << "analyz ..."; switch (type) { case 1: // new peer //piCout << "new peer packet ..."; peers_mutex.lock(); if (!hasPeer(pi.name)) { ba >> pi; pi.sync = 0; if (pi.dist == 0) { pi.addNeighbour(self_info.name); self_info.addNeighbour(pi.name); } pi.resetPing(); peers << pi; buildMap(); //piCoutObj << "new peer \"" << pi.name << "\"" << " dist " << pi.dist; pi.dist++; sendSelfInfo(); sendPeerInfo(pi); peerConnected(pi.name); peerConnectedEvent(pi.name); //piCout << "new peer packet ok"; } peers_mutex.unlock(); break; case 2: // remove peer //piCout << "remove peer packet ..." << pi.name; peers_mutex.lock(); rpi = getPeerByName(pi.name); if (rpi) { dist = rpi->dist; addToRemoved(*rpi); removePeer(pi.name); //piCoutObj << "remove peer \"" << pi.name << "\""; if (dist == 0) self_info.removeNeighbour(pi.name); sendPeerRemove(pi.name); buildMap(); peerDisconnected(pi.name); peerDisconnectedEvent(pi.name); //piCout << "remove peer packet ok"; } peers_mutex.unlock(); break; case 3: // sync peers //piCout << "sync packet ..."; ba >> pi >> rpeers; rpeers << pi; //piCoutObj << "rec sync " << rpeers.size_s() << " peers"; peers_mutex.lock(); if (!self_info.neighbours.contains(pi.name)) { self_info.addNeighbour(pi.name); PeerInfo * np = peers_map.value(pi.name); if (np) { np->addNeighbour(self_info.name); np->dist = 0; } ch = true; } piForeach (PeerInfo & rpeer, rpeers) { //piCout << " to sync " << rpeer.name; if (rpeer.name == self_info.name) continue; bool exist = false; piForeach (PeerInfo & peer, peers) { if (peer.name == rpeer.name) { exist = true; if (isPeerRecent(peer, rpeer)) { //piCout << "synced " << peer.name; for (int z = 0; z < rpeer.addresses.size_s(); ++z) { PeerInfo::Address & ra(rpeer.addresses[z]); for (int k = 0; k < peer.addresses.size_s(); ++k) { PeerInfo::Address & a(peer.addresses[k]); if (ra.address == a.address) { ra.ping = a.ping; ra.wait_ping = a.wait_ping; ra.last_ping = a.last_ping; break; } } } peer.was_update = true; peer.addresses = rpeer.addresses; peer.cnt = rpeer.cnt; peer.time = rpeer.time; peer.addNeighbours(rpeer.neighbours); rpeer.neighbours = peer.neighbours; if (peer.name == pi.name) peer.sync = 0; } piBreak; } } if (exist || isRemoved(rpeer)) continue; rpeer.dist++; rpeer.resetPing(); peers << rpeer; ch = true; peerConnected(rpeer.name); peerConnectedEvent(rpeer.name); } //piCout << "***"; //piCout << self_info.name << self_info.neighbours; piForeach (PeerInfo & i, peers) { if (i.dist == 0) { self_info.addNeighbour(i.name); i.addNeighbour(self_info.name); } //piCout << i.name << i.neighbours; } if (ch) buildMap(); peers_mutex.unlock(); //piCoutObj << "after sync " << peers.size_s() << " peers"; break; } return true; } bool PIPeer::sendToNeighbour(PIPeer::PeerInfo * peer, const PIByteArray & ba) { //if (peer->_neth == 0) return false; PIString addr = peer->fastestAddress(); //piCout << "[PIPeer] sendToNeighbour" << peer->name << addr << ba.size_s() << "bytes ..."; //bool ok = peer->_neth->send(peer->_naddress, ba.data(), ba.size_s()); bool ok = eth_send.send(addr, ba); //piCout << "[PIPeer] sendToNeighbour" << (ok ? "ok" : "fail"); if (ok) diag_d.sended(ba.size_s()); return ok; } void PIPeer::sendMBcast(const PIByteArray & ba) { //piCout << "sendMBcast" << ba.size() << "bytes ..."; piForeach (PIEthernet * e, eths_mcast) { //errorClear(); //piCout << "send to" << e->path() << e->sendAddress();// << e->send(ba); //piCout << PIEthernet::ethErrorString(); if (e->isOpened()) if (e->send(ba)) diag_s.sended(ba.size_s()); } piForeach (PIEthernet * e, eths_bcast) { //errorClear(); //piCout << "send to" << e->path() << e->sendAddress();// << e->send(ba); //piCout << PIEthernet::ethErrorString(); if (e->isOpened()) if (e->send(ba)) diag_s.sended(ba.size_s()); } for (int p = _PIPEER_LOOPBACK_PORT_S; p <= _PIPEER_LOOPBACK_PORT_E; ++p) { eth_lo.setSendPort(p); if (eth_lo.send(ba)) diag_s.sended(ba.size_s()); } //piCout << "send muticast ok"; } void PIPeer::removeNeighbour(const PIString & name) { piForeach (PeerInfo & p, peers) p.neighbours.removeOne(name); self_info.removeNeighbour(name); } bool PIPeer::removePeer(const PIString & name) { removeNeighbour(name); for (int i = 0; i < peers.size_s(); ++i) if (peers[i].name == name) { peers.remove(i); return true; } return false; } void PIPeer::sendPeerInfo(const PeerInfo & info) { PIByteArray ba; ba << int(1) << info.name << info; sendMBcast(ba); } void PIPeer::sendPeerRemove(const PIString & peer) { PIByteArray ba; ba << int(2) << peer; sendMBcast(ba); } void PIPeer::pingNeighbours() { PIByteArray ba, sba; ba << int(5) << self_info.name; //piCout << "pingNeighbours" << peers.size(); piForeach (PeerInfo & p, peers) { //piCout << " ping neighbour" << p.name << p.ping(); if (!p.isNeighbour()) continue; piForeach (PeerInfo::Address & a, p.addresses) { //piCout << " address" << a.address << a.wait_ping; if (a.wait_ping) { if ((PISystemTime::current(true) - a.last_ping).abs().toSeconds() <= 5.) continue; a.ping = -1.; } a.wait_ping = true; sba = ba; sba << p.name << a.address << PISystemTime::current(true); //piCout << "ping" << p.name << a.address << a.last_ping; if (eth_send.send(a.address, sba)) diag_s.sended(sba.size_s()); } } } void PIPeer::syncPeers() { //piCout << "[PIPeer \"" + self_info.name + "\"] sync " << peers.size_s() << " peers"; PIMutexLocker locker(eth_mutex); PIString pn; bool change = false; peers_mutex.lock(); for (int i = 0; i < peers.size_s(); ++i) { PeerInfo & cp(peers[i]); if (cp.sync > 3) { pn = cp.name; //piCoutObj << "sync: remove " << pn; addToRemoved(cp); peers.remove(i); sendPeerRemove(pn); --i; removeNeighbour(pn); peerDisconnected(pn); peerDisconnectedEvent(pn); change = true; continue; } if (cp.was_update) cp.sync = 0; else cp.sync++; cp.was_update = false; } pingNeighbours(); if (change) buildMap(); self_info.cnt++; self_info.time = PISystemTime::current(); PIByteArray ba; ba << int(3) << self_info.name << self_info << peers; peers_mutex.unlock(); sendMBcast(ba); } void PIPeer::buildMap() { //piCout << "[PIPeer \"" + name_ + "\"] buildMap"; peers_map.clear(); addresses_map.clear(); piForeach (PeerInfo & i, peers) { i.trace = -1; peers_map[i.name] = &i; } PIVector cwave, nwave; int cwi = 0; self_info.trace = 0; cwave << &self_info; while (!cwave.isEmpty()) { nwave.clear(); ++cwi; piForeachC (PeerInfo * p, cwave) { piForeachC (PIString & nn, p->neighbours) { PeerInfo * np = peers_map.value(nn); if (!np) continue; if (np->trace >= 0) continue; np->trace = cwi; nwave << np; } } cwave = nwave; } PIVector cpath; piForeach (PeerInfo & c, peers) { cpath.clear(); cpath << &c; cwave << &c; for (int w = c.trace - 1; w >= 0; --w) { nwave.clear(); piForeachC (PeerInfo * p, cwave) { piForeachC (PIString & nn, p->neighbours) { PeerInfo * np = peers_map.value(nn); if (!np) continue; if (np->trace != w) continue; cpath << np; nwave << np; } } cwave = nwave; } addresses_map[c.name] = cpath; //piCout << "map" << c.name << "=" << cpath; } /*int max_dist = -1; self_info._nuses.resize(self_info.neighbours.size()); self_info._nuses.fill(0); self_info._first = &self_info; peers_map[self_info.name] = &self_info; piForeach (PeerInfo & i, peers) { i._nuses.resize(i.neighbours.size()); i._nuses.fill(0); i._first = 0; peers_map[i.name] = &i; if (max_dist < i.dist) max_dist = i.dist; if (i.dist > 0) continue; i._naddress.clear(); i._neth = 0; PIString mma, ma; bool af = false; for (int mi = 0; mi < self_info.addresses.size_s(); ++mi) { PeerInfo::Address & a(self_info.addresses[mi]); if (af) break; ma = a.address; //mma = m.left(m.findLast(".")); mma = PIEthernet::applyMask(a.address, a.netmask); for (int ii = 0; ii < i.addresses.size_s(); ++ii) { PeerInfo::Address & r(i.addresses[ii]); if (!r.isAvailable()) continue; if (mma == PIEthernet::applyMask(r.address, r.netmask)) { i._naddress = r.address; //piCout << "_naddress" << i.name << "=" << r; af = true; break; } } } if (!af) continue; //piCout << " peer" << i.name << ma; piForeach (PIEthernet * e, eths_traffic) if (e->readAddress() == ma) { i._neth = e; piBreak; } //piCout << i.name << i._naddress; } PIVector cwave, nwave; PeerInfo * npeer; cwave << &self_info; for (int d = 0; d <= max_dist; ++d) { if (cwave.isEmpty()) break; nwave.clear(); piForeach (PeerInfo * p, cwave) { int ns = p->neighbours.size_s(); for (int n = 0; n < ns; ++n) { if (p->_nuses[n] >= ns) continue; p->_nuses[n]++; npeer = peers_map[p->neighbours[n]]; if (npeer == 0) continue; if (d == 0) npeer->_first = npeer; else { if (d == 1) npeer->_first = p; else npeer->_first = p->_first; } nwave << npeer; } } cwave = nwave; //piCout << "wave" << d; for (int i = 0; i < cwave.size_s(); ++i) { //piCout << " peer" << cwave[i]->name << Hex << (uint)(cwave[i]->_first); if (cwave[i]->_first == 0) {cwave.remove(i); --i; continue;} if (addresses_map.contains(cwave[i]->name)) {cwave.remove(i); --i; continue;} } for (int i = 0; i < cwave.size_s(); ++i) { PIVector & pl(addresses_map[cwave[i]->name]); if (!pl.contains(cwave[i]->_first)) pl << cwave[i]->_first; } }*/ /*piCout << " ** addresses map **"; piForeachC (napair & i, addresses_map) piCout << i.first << i.second; piCout << " ** addresses map end **";*/ }