/* PIP - Platform Independent Primitives Peer - named I/O ethernet node Copyright (C) 2013 Ivan Pelipenko peri4ko@gmail.com This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #include "pipeer.h" #define _PIPEER_PORT_SYNC_START 13313 #define _PIPEER_PORT_SYNC_END 13353 #define _PIPEER_IP_MULTICAST "239.13.3.12" #define _PIPEER_MSG_SIZE 8192 PIPeer::PIPeer(const PIString & name): PIObject() { rec_mc = rec_bc = false; setName(name); self_info.name = name_; self_info.dist = 0; //joinMulticastGroup("239.240.241.242"); srand(uint(PITimer::elapsed_system_m())); //id_ = name() + "_" + PIString::fromNumber(rand()); CONNECT2(void, void * , int, &timer, timeout, this, timerEvent); PIStringList sl = PIEthernet::allAddresses(); sl.removeAll("127.0.0.1"); sl << "127.0.0.1"; initMulticasts(sl); initEths(sl); //piCout << "Peer" << name_; timer.addDelimiter(5); timer.start(1000); sendSelfInfo(); } PIPeer::~PIPeer() { piForeach (PIEthernet * i, mc_eths) i->stopThreadedRead(); sendSelfRemove(); destroyMulticasts(); piForeach (PIEthernet * i, eths) delete i; eths.clear(); } void PIPeer::timerEvent(void * data, int delim) { switch (delim) { case 5: // 5 s syncPeers(); break; } //send("broadcast", 9); } void PIPeer::initEths(const PIStringList & al) { PIEthernet * ce; PIEthernet::InterfaceList il = PIEthernet::interfaces(); const PIEthernet::Interface * cint = 0; piForeachC (PIString & a, al) { ce = new PIEthernet(); ce->setParameters(0); ce->setDebug(false); for (int p = _PIPEER_PORT_SYNC_START; p < 65536; ++p) { ce->setReadAddress(a, p); if (ce->open()) { eths << ce; cint = il.getByAddress(a); self_info.addresses << ce->path(); self_info.netmasks << (cint == 0 ? "255.255.255.0" : cint->netmask); CONNECT2(bool, void * , int, ce, threadedReadEvent, this, dataRead); ce->startThreadedRead(); //piCout << "dc binded to" << ce->path(); //piCout << "add eth" << ta; break; } } } } void PIPeer::initMulticasts(const PIStringList & al) { destroyMulticasts(); PIEthernet * ce; PIEthernet::InterfaceList il = PIEthernet::interfaces(); const PIEthernet::Interface * cint; PIStringList nal = al; PIString nm; nal << "main" << "bc"; rec_mc = rec_bc = false; piForeachC (PIString & a, nal) { bool is_main = (a == "main"); bool is_bc = (a == "bc"); ce = new PIEthernet(); ce->setParameters((is_main || is_bc) ? PIEthernet::Broadcast : 0); ce->setDebug(false); //cint = il.getByAddress(a); //nm = (cint == 0) ? "255.255.255.0" : cint->netmask; ce->setSendIP(is_bc ? "255.255.255.255" : _PIPEER_IP_MULTICAST); //piCout << "mc try" << a << nm << ce->sendIP(); for (int p = _PIPEER_PORT_SYNC_START; p < _PIPEER_PORT_SYNC_END; ++p) { ce->setReadAddress(((is_main || is_bc) ? "255.255.255.255" : a) + ":" + PIString::fromNumber(p)); ce->close(); if (!ce->open()) continue; if (is_main) if (!ce->joinMulticastGroup(_PIPEER_IP_MULTICAST)) continue; //piCout << "mc binded to" << ce->path(); mc_eths << ce; if (is_main || is_bc) { if (is_main) rec_mc = true; if (is_bc) rec_bc = true; CONNECT2(bool, void * , int, ce, threadedReadEvent, this, multicastRead); ce->startThreadedRead(); if (is_bc) ce->setParameter(PIEthernet::Broadcast, false); } break; } } piForeachC (PIString & a, al) { ce = new PIEthernet(); ce->setParameters(PIEthernet::Broadcast); ce->setDebug(false); cint = il.getByAddress(a); nm = (cint == 0) ? "255.255.255.0" : cint->netmask; ce->setSendIP(PIEthernet::getBroadcast(a, nm)); //piCout << "mc BC try" << a << nm << ce->sendIP(); for (int p = _PIPEER_PORT_SYNC_START; p < _PIPEER_PORT_SYNC_END; ++p) { ce->setReadAddress(a + ":" + PIString::fromNumber(p)); ce->close(); if (!ce->open()) continue; //piCout << "BC binded to" << ce->path(); mc_eths << ce; break; } } if (!rec_mc) piCoutObj << "[PIPeer \"" + name_ + "\"] Can`t find suitable network interface for multicast receive, check for exists at least one interface with multicasting enabled!"; if (!rec_bc) piCoutObj << "[PIPeer \"" + name_ + "\"] Can`t find suitable network interface for broadcast receive, check for exists at least one interface with broadcasting enabled!"; } void PIPeer::destroyMulticasts() { piForeach (PIEthernet * i, mc_eths) { i->leaveMulticastGroup(_PIPEER_IP_MULTICAST); delete i; } mc_eths.clear(); } PIPeer::PeerInfo * PIPeer::quickestPeer(const PIString & to) { if (!addresses_map.contains(to)) return 0; //piCout << "*** search quickest peer" << to; PIVector tp = addresses_map[to]; PeerInfo * dp = 0; int mping = 99999; for (int i = 0; i < tp.size_s(); ++i) { if (mping > tp[i]->ping) { mping = tp[i]->ping; dp = tp[i]; } } //piCout << "*** search quickest peer: found" << dp->name; return dp; } bool PIPeer::sendToNeighbour(PIPeer::PeerInfo * peer, const PIByteArray & ba) { if (peer->_neth == 0) return false; //piCout << "send to" << peer->name << peer->_naddress << ba.size_s() << "bytes"; diag_d.sended(ba.size_s()); return peer->_neth->send(peer->_naddress, ba.data(), ba.size_s()); } bool PIPeer::send(const PIString & to, const void * data, int size) { PeerInfo * dp = quickestPeer(to); if (dp == 0) { //piCoutObj << "[PIPeer \"" + name_ + "\"] Can`t find peer \"" << to << "\"!"; return false; } PIByteArray ba; ba << int(4) << self_info.name << to << int(0) << size; PIByteArray fmsg(data, size), cmsg; int msg_count = (size - 1) / _PIPEER_MSG_SIZE + 1; for (int i = 0; i < msg_count; ++i) { int csize = (i == msg_count - 1) ? ((size - 1) % _PIPEER_MSG_SIZE + 1) : _PIPEER_MSG_SIZE; cmsg.clear(); cmsg.append(ba); cmsg << msg_count << i; cmsg.append(fmsg.data(i * _PIPEER_MSG_SIZE), csize); if (!sendToNeighbour(dp, cmsg)) return false; } return true; } bool PIPeer::dataRead(uchar * readed, int size) { diag_d.received(size); PIByteArray ba(readed, size), sba; int type, cnt, rec_size; PIString from, to; ba >> type >> from >> to >> cnt >> rec_size; //piCout << "[PIPeer \"" + name_ + "\"] Received packet" << type << from << to << cnt << rec_size; if (type == 4) { // data packet if (to == self_info.name) { // my packet int msg_count, cmsg; ba >> msg_count >> cmsg; //piCout << "[PIPeer \"" + name_ + "\"] Received packet" << type << from << to << cnt << rec_size << msg_count << cmsg; if (cmsg == 0 && msg_count == 1) { dataReceived(from, ba); dataReceivedEvent(from, ba); return true; } PeerInfo * fp = const_cast(getPeerByName(from)); if (fp == 0) return true; PeerData & pd(fp->_data); if (cmsg == 0) { //piCout << "[PIPeer \"" + name_ + "\"] Packet clear" << rec_size; pd.clear(); pd.msg_count = msg_count; } //piCout << "[PIPeer \"" + name_ + "\"] Packet add" << cmsg << ba.size_s(); pd.addData(ba); if (pd.isFullReceived()) { dataReceived(from, pd.data); dataReceivedEvent(from, pd.data); //piCout << "[PIPeer \"" + name_ + "\"] Packet received" << pd.data.size_s(); } return true; } PeerInfo * dp = quickestPeer(to); if (dp == 0) { //piCoutObj << "[PIPeer \"" + name_ + "\"] Can`t find peer \"" << to << "\"!"; return true; } cnt++; if (cnt > 100 || from == dp->name) return true; sba << type << from << to << cnt << rec_size; sba.append(ba); //piCoutObj << "[PIPeer \"" + name_ + "\"] Translate data packet" << type << from << to << cnt << rec_size; sendToNeighbour(dp, sba); } return true; } bool PIPeer::multicastRead(uchar * data, int size) { PIMutexLocker locker(mc_mutex); diag_s.received(size); int header; PeerInfo pi; PIByteArray ba(data, size); PIVector rpeers; ba >> header >> pi.name; //piCout << "read type" << header << "from" << pi.name; if (pi.name == name_) return true; //piCout << "analyz ..."; switch (header) { case 1: // new peer accepted //piCout << "new peer packet ..."; if (hasPeer(pi.name)) break; ba >> pi.dist >> pi.addresses >> pi.netmasks >> pi.neighbours; pi.sync = 0; if (pi.dist == 0) { pi.addNeighbour(self_info.name); self_info.addNeighbour(pi.name); } peers << pi; //piCoutObj << "[PIPeer \"" + name_ + "\"] new peer \"" << pi.name << "\"" << " dist " << pi.dist; pi.dist++; sendSelfInfo(); sendPeerInfo(pi); findNearestAddresses(); peerConnected(pi.name); peerConnectedEvent(pi.name); //piCout << "new peer packet ok"; break; case 2: // remove peer accepted //piCout << "remove peer packet ..."; if (removePeer(pi.name)) { //piCoutObj << "[PIPeer \"" + name_ + "\"] remove peer \"" << pi.name << "\""; if (pi.dist == 0) { pi.removeNeighbour(self_info.name); self_info.removeNeighbour(pi.name); } sendPeerRemove(pi.name); findNearestAddresses(); peerDisconnected(pi.name); peerDisconnectedEvent(pi.name); } //piCout << "remove peer packet ok"; break; case 3: // sync peers //piCout << "sync packet ..."; ba >> pi.addresses >> pi.netmasks >> pi.neighbours >> rpeers; rpeers << pi; //piCout << "[PIPeer \"" + name_ + "\"] rec sync " << rpeers.size_s() << " peers"; for (uint i = 0; i < rpeers.size(); ++i) { PeerInfo & rpeer(rpeers[i]); //piCout << " to sync " << rpeer.name; if (rpeer.name == name_) continue; bool exist = false; for (uint j = 0; j < peers.size(); ++j) { PeerInfo & peer(peers[j]); if (peer.name == rpeer.name) { //piCout << "synced " << peer.name; peer.addresses == rpeer.addresses; peer.netmasks == rpeer.netmasks; peer.addNeighbours(rpeer.neighbours); rpeer.neighbours = peer.neighbours; if (peer.name == pi.name) peer.sync = 0; exist = true; break; } } if (exist) continue; peers << rpeer; peers.back().dist++; findNearestAddresses(); peerConnected(rpeer.name); peerConnectedEvent(rpeer.name); } //piCout << "***";; //piCout << self_info.name << self_info.neighbours; piForeach (PeerInfo & i, peers) { if (i.dist == 0) { self_info.addNeighbour(i.name); i.addNeighbour(self_info.name); } //piCout << i.name << i.neighbours; } break; } return true; } void PIPeer::sendMulticast(const PIByteArray & ba) { piForeach (PIEthernet * e, mc_eths) { for (int p = _PIPEER_PORT_SYNC_START; p < _PIPEER_PORT_SYNC_END; ++p) { e->setSendPort(p); //errorClear(); //piCout << "send to" << e->sendAddress() << e->send(ba); //piCout << PIEthernet::ethErrorString(); e->send(ba); diag_s.sended(ba.size_s()); } } } void PIPeer::sendPeerInfo(const PeerInfo & info) { PIByteArray ba; ba << int(1) << info.name << info.dist << info.addresses << info.netmasks << info.neighbours; sendMulticast(ba); } void PIPeer::sendPeerRemove(const PIString & peer) { PIByteArray ba; ba << int(2) << peer; sendMulticast(ba); } void PIPeer::syncPeers() { //piCout << "[PIPeer \"" + name_ + "\"] sync " << peers.size_s() << " peers"; PIString pn; bool change = false; for (uint i = 0; i < peers.size(); ++i) { PeerInfo & cp(peers[i]); if (cp.sync > 3 && cp.dist == 0) { pn = cp.name; //piCoutObj << "[PIPeer \"" + name_ + "\"] sync: remove " << pn; peers.remove(i); sendPeerRemove(pn); --i; piForeach (PeerInfo & p, peers) p.removeNeighbour(pn); self_info.removeNeighbour(pn); peerDisconnected(pn); peerDisconnectedEvent(pn); change = true; continue; } cp.sync++; } if (change) findNearestAddresses(); PIByteArray ba; ba << int(3) << self_info.name << self_info.addresses << self_info.netmasks << self_info.neighbours << peers; sendMulticast(ba); } void PIPeer::findNearestAddresses() { //piCout << "[PIPeer \"" + name_ + "\"] findNearestAddresses"; addresses_map.clear(); int max_dist = -1; static PIMap peers_; peers_.clear(); self_info._nuses.resize(self_info.neighbours.size()); self_info._nuses.fill(0); self_info._first = &self_info; peers_[self_info.name] = &self_info; piForeach (PeerInfo & i, peers) { i._nuses.resize(i.neighbours.size()); i._nuses.fill(0); i._first = 0; peers_[i.name] = &i; if (max_dist < i.dist) max_dist = i.dist; if (i.dist > 0) continue; i._naddress.clear(); i._neth = 0; PIString mma, ma; bool af = false; for (int mi = 0; mi < self_info.addresses.size_s(); ++mi) { PIString & m(self_info.addresses[mi]), & mmask(self_info.netmasks[mi]); if (af) break; ma = m; //mma = m.left(m.findLast(".")); mma = PIEthernet::applyMask(m, mmask); for (int ii = 0; ii < i.addresses.size_s(); ++ii) { PIString & r(i.addresses[ii]), & rmask(i.netmasks[ii]); if (mma == PIEthernet::applyMask(r, rmask)) { i._naddress = r; //piCout << "_naddress" << i.name << "=" << r; af = true; break; } } } if (!af) continue; //piCout << " peer" << i.name << ma; piForeach (PIEthernet * e, eths) if (e->readAddress() == ma) { i._neth = e; break; } //piCout << i.name << i._naddress; } PIVector cwave, nwave; PeerInfo * npeer; cwave << &self_info; for (int d = 0; d <= max_dist; ++d) { if (cwave.isEmpty()) break; nwave.clear(); piForeach (PeerInfo * p, cwave) { int ns = p->neighbours.size_s(); for (int n = 0; n < ns; ++n) { if (p->_nuses[n] >= ns) continue; p->_nuses[n]++; npeer = peers_[p->neighbours[n]]; if (npeer == 0) continue; if (d == 0) npeer->_first = npeer; else { if (d == 1) npeer->_first = p; else npeer->_first = p->_first; } nwave << npeer; } } cwave = nwave; //piCout << "wave" << d; for (int i = 0; i < cwave.size_s(); ++i) { //piCout << " peer" << cwave[i]->name << Hex << (uint)(cwave[i]->_first); if (cwave[i]->_first == 0) {cwave.remove(i); --i; continue;} if (addresses_map.contains(cwave[i]->name)) {cwave.remove(i); --i; continue;} } for (int i = 0; i < cwave.size_s(); ++i) { PIVector & pl(addresses_map[cwave[i]->name]); if (!pl.contains(cwave[i]->_first)) pl << cwave[i]->_first; } } /*piCout << " ** addresses map **"; piForeachC (napair & i, addresses_map) piCout << i.first << i.second; piCout << " ** addresses map end **";*/ }