git-svn-id: svn://db.shs.com.ru/pip@4 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5

This commit is contained in:
2015-02-28 18:35:47 +00:00
parent 8e451c891d
commit 13336674eb
154 changed files with 44021 additions and 0 deletions

632
src/io/pipeer.cpp Executable file
View File

@@ -0,0 +1,632 @@
/*
PIP - Platform Independent Primitives
Peer - named I/O ethernet node
Copyright (C) 2014 Ivan Pelipenko peri4ko@gmail.com
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pipeer.h"
#define _PIPEER_MSG_SIZE 8192
#define _PIPEER_MULTICAST_TTL 4
#define _PIPEER_MULTICAST_IP "232.13.3.12"
#define _PIPEER_LOOPBACK_PORT_S 13313
#define _PIPEER_LOOPBACK_PORT_E (13313+32)
#define _PIPEER_MULTICAST_PORT 13360
#define _PIPEER_BROADCAST_PORT 13361
#define _PIPEER_TRAFFIC_PORT_S 13400
#define _PIPEER_TRAFFIC_PORT_E 14000
PIPeer::PeerInfo::Address::Address(const PIString & a, const PIString & m): address(a), netmask(m) {
ping = -1.;
wait_ping = false;
last_ping = PISystemTime::current(true);
}
int PIPeer::PeerInfo::ping() const {
int ret = -1;
piForeachC (Address & a, addresses)
if (a.ping > 0.)
ret = piMaxi(ret, piRoundd(a.ping));
return ret;
}
PIPeer::PIPeer(const PIString & name_): PIObject() {
setName(name_);
self_info.name = name_;
self_info.dist = 0;
self_info.time = PISystemTime::current();
//joinMulticastGroup("239.240.241.242");
srand(uint(PISystemTime::current(true).toMicroseconds()));
//id_ = self_info.name + "_" + PIString::fromNumber(rand());
CONNECTU(&timer, tickEvent, this, timerEvent);
PIStringList sl = PIEthernet::allAddresses();
initEths(sl);
sl.removeAll("127.0.0.1");
initMBcasts(sl);
sendSelfInfo();
timer.addDelimiter(5);
timer.start(200);
}
PIPeer::~PIPeer() {
piForeach (PIEthernet * i, eths_traffic) {
i->stopThreadedRead();
delete i;
}
eths_traffic.clear();
piForeach (PIEthernet * i, eths_mcast)
i->stopThreadedRead();
piForeach (PIEthernet * i, eths_bcast)
i->stopThreadedRead();
eth_send.stopThreadedRead();
eth_lo.stopThreadedRead();
sendSelfRemove();
destroyMBcasts();
}
void PIPeer::timerEvent(void * data, int delim) {
switch (delim) {
case 5: // 1 s
syncPeers();
break;
}
//send("broadcast", 9);
}
void PIPeer::initEths(PIStringList al) {
PIEthernet * ce;
PIEthernet::InterfaceList il = PIEthernet::interfaces();
const PIEthernet::Interface * cint = 0;
piForeachC (PIString & a, al) {
ce = new PIEthernet();
ce->setDebug(false);
ce->setName("__S__PIPeer_traffic_eth_rec_" + a);
ce->setParameters(0);
ce->setThreadSafe(true);
bool ok = false;
for (int p = _PIPEER_TRAFFIC_PORT_S; p < _PIPEER_TRAFFIC_PORT_E; ++p) {
ce->setReadAddress(a, p);
if (ce->open()) {
eths_traffic << ce;
cint = il.getByAddress(a);
self_info.addresses << PeerInfo::Address(ce->path(), cint == 0 ? "255.255.255.0" : cint->netmask);
CONNECTU(ce, threadedReadEvent, this, dataRead);
ce->startThreadedRead();
//piCout << "dc binded to" << ce->path();
//piCout << "add eth" << ta;
ok = true;
break;
}
}
if (!ok) delete ce;
}
eth_send.setDebug(false);
eth_send.setName("__S__PIPeer_traffic_eth_send");
eth_send.setParameters(0);
}
void PIPeer::initMBcasts(PIStringList al) {
destroyMBcasts();
PIEthernet * ce;
PIEthernet::InterfaceList il = PIEthernet::interfaces();
const PIEthernet::Interface * cint;
PIString nm;
al << _PIPEER_MULTICAST_IP;
piForeachC (PIString & a, al) {
piCout << "mcast try" << a;
ce = new PIEthernet();
ce->setDebug(false);
ce->setName("__S__PIPeer_mcast_eth_" + a);
ce->setParameters(0);
ce->setSendAddress(_PIPEER_MULTICAST_IP, _PIPEER_MULTICAST_PORT);
ce->setReadAddress(a, _PIPEER_MULTICAST_PORT);
ce->setMulticastTTL(_PIPEER_MULTICAST_TTL);
ce->joinMulticastGroup(_PIPEER_MULTICAST_IP);
eths_mcast << ce;
CONNECTU(ce, threadedReadEvent, this, mbcastRead);
ce->startThreadedRead();
}
piForeachC (PIString & a, al) {
ce = new PIEthernet();
ce->setDebug(false);
ce->setName("__S__PIPeer_bcast_eth_" + a);
ce->setParameters(PIEthernet::Broadcast);
cint = il.getByAddress(a);
nm = (cint == 0) ? "255.255.255.0" : cint->netmask;
ce->setSendAddress(PIEthernet::getBroadcast(a, nm), _PIPEER_BROADCAST_PORT);
ce->setReadAddress(a, _PIPEER_BROADCAST_PORT);
//piCout << "mc BC try" << a << nm << ce->sendIP();
piCout << "bcast try" << a << nm;
eths_bcast << ce;
CONNECTU(ce, threadedReadEvent, this, mbcastRead);
ce->startThreadedRead();
}
eth_lo.setDebug(false);
eth_lo.setName("__S__PIPeer_eth_loopback");
eth_lo.setParameters(0);
cint = il.getByAddress("127.0.0.1");
for (int p = _PIPEER_LOOPBACK_PORT_S; p <= _PIPEER_LOOPBACK_PORT_E; ++p) {
eth_lo.setReadAddress("127.0.0.1", p);
if (eth_lo.open()) {
eth_lo.setSendIP("127.0.0.1");
CONNECTU(&eth_lo, threadedReadEvent, this, mbcastRead);
eth_lo.startThreadedRead();
piCout << "lo binded to" << eth_lo.readAddress();
//piCout << "add eth" << ta;
break;
}
}
if (eths_mcast.isEmpty()) piCoutObj << "Warning! Can`t find suitable network interface for multicast receive, check for exists at least one interface with multicasting enabled!";
if (eths_bcast.isEmpty()) piCoutObj << "Warning! Can`t find suitable network interface for broadcast receive, check for exists at least one interface with broadcasting enabled!";
}
void PIPeer::destroyMBcasts() {
piForeach (PIEthernet * i, eths_mcast) {
i->leaveMulticastGroup(_PIPEER_MULTICAST_IP);
delete i;
}
piForeach (PIEthernet * i, eths_bcast)
delete i;
eth_lo.stop();
eth_lo.close();
eths_mcast.clear();
eths_bcast.clear();
}
PIPeer::PeerInfo * PIPeer::quickestPeer(const PIString & to) {
if (!addresses_map.contains(to)) return 0;
//piCout << "*** search quickest peer" << to;
PIVector<PeerInfo * > tp = addresses_map[to];
PeerInfo * dp = 0;
int mping = 0x7FFFFFFF;
for (int i = 0; i < tp.size_s(); ++i) {
if (mping > tp[i]->ping()) {
mping = tp[i]->ping();
dp = tp[i];
}
}
//piCout << "*** search quickest peer: found" << dp->name;
return dp;
}
bool PIPeer::send(const PIString & to, const void * data, int size) {
PeerInfo * dp = quickestPeer(to);
if (dp == 0) {
//piCoutObj << "Can`t find peer \"" << to << "\"!";
return false;
}
PIByteArray ba;
ba << int(4) << self_info.name << to << int(0) << size;
PIByteArray fmsg(data, size), cmsg;
int msg_count = (size - 1) / _PIPEER_MSG_SIZE + 1;
//piCout << "[PIPeer] send" << size << "bytes in" << msg_count << "packets ...";
for (int i = 0; i < msg_count; ++i) {
int csize = (i == msg_count - 1) ? ((size - 1) % _PIPEER_MSG_SIZE + 1) : _PIPEER_MSG_SIZE;
cmsg = ba;
cmsg << msg_count << i;
cmsg.append(fmsg.data(i * _PIPEER_MSG_SIZE), csize);
if (!sendToNeighbour(dp, cmsg)) return false;
}
//piCout << "[PIPeer] send" << size << "bytes ok";
return true;
}
bool PIPeer::dataRead(uchar * readed, int size) {
if (size < 16) return true;
PIByteArray ba(readed, size), sba;
int type, cnt, rec_size;
PIString from, to;
ba >> type;
PIMutexLocker locker(eth_mutex);
//piCout << "[PIPeer \"" + name_ + "\"] Received packet" << type;
if (type == 5) { // ping
PIString addr;
PISystemTime time, ptime, ctime = PISystemTime::current(true);
ba >> to >> from >> addr >> time;
if (to == self_info.name) { // ping echo
piForeach (PeerInfo & p, peers) {
if (!p.isNeighbour()) continue;
if (p.name != from) continue;
piForeach (PeerInfo::Address & a, p.addresses) {
if (a.address != addr) continue;
if (a.last_ping >= time) piBreak;
ptime = ctime - time;
a.last_ping = time;
a.wait_ping = false;
if (a.ping < 0) a.ping = ptime.toMilliseconds();
else a.ping = 0.6 * a.ping + 0.4 * ptime.toMilliseconds();
piCout << "*** ping echo" << p.name << a.address << a.ping;
return true;
}
}
return true;
}
// send ping back
piForeachC (PeerInfo & p, peers) {
if (!p.isNeighbour()) continue;
if (p.name != to) continue;
sba = PIByteArray(readed, size);
//piCout << "ping from" << to << addr << ", send back to" << p.name;
piForeachC (PeerInfo::Address & a, p.addresses) {
if (eth_send.send(a.address, sba))
diag_s.received(sba.size_s());
}
return true;
}
//PIEthernet * eth = (PIEthernet*)emitter();
//()->send();
return true;
}
if (type != 4) return true;
diag_d.received(size);
ba >> from >> to >> cnt >> rec_size;
//piCout << "[PIPeer \"" + name_ + "\"] Received packet" << /*type << from << to << cnt <<*/ rec_size;
if (type == 4) { // data packet
if (to == self_info.name) { // my packet
int msg_count, cmsg;
ba >> msg_count >> cmsg;
//piCout << "[PIPeer \"" + name_ + "\"] Received packet" << type << from << to << cnt << rec_size << msg_count << cmsg;
if (cmsg == 0 && msg_count == 1) {
dataReceived(from, ba);
dataReceivedEvent(from, ba);
return true;
}
PeerInfo * fp = const_cast<PeerInfo * >(getPeerByName(from));
if (fp == 0) return true;
PeerData & pd(fp->_data);
if (cmsg == 0) {
//piCout << "[PIPeer \"" + name_ + "\"] Packet clear" << rec_size;
pd.clear();
pd.msg_count = msg_count;
}
//piCout << "[PIPeer \"" + name_ + "\"] Packet add" << cmsg << ba.size_s();
pd.addData(ba);
if (pd.isFullReceived()) {
dataReceived(from, pd.data);
dataReceivedEvent(from, pd.data);
//piCout << "[PIPeer \"" + name_ + "\"] Packet received" << pd.data.size_s();
}
return true;
}
PeerInfo * dp = quickestPeer(to);
if (dp == 0) {
//piCoutObj << "Can`t find peer \"" << to << "\"!";
return true;
}
cnt++;
if (cnt > 100 || from == dp->name) return true;
sba << type << from << to << cnt << rec_size;
sba.append(ba);
//piCoutObj << "Translate data packet" << type << from << to << cnt << rec_size;
sendToNeighbour(dp, sba);
}
return true;
}
bool PIPeer::mbcastRead(uchar * data, int size) {
if (size < 8) return true;
int type, dist;
PIByteArray ba(data, size);
ba >> type;
if (type <= 0 || type >= 4) return true;
PeerInfo pi;
const PeerInfo * rpi = 0;
PIVector<PeerInfo> rpeers;
ba >> pi.name;
//piCout << "read type" << type << "from" << pi.name;
if (pi.name == self_info.name) return true;
PIMutexLocker locker(mc_mutex);
diag_s.received(size);
//piCout << "analyz ...";
switch (type) {
case 1: // new peer
//piCout << "new peer packet ...";
if (hasPeer(pi.name)) break;
ba >> pi;
pi.sync = 0;
if (pi.dist == 0) {
pi.addNeighbour(self_info.name);
self_info.addNeighbour(pi.name);
}
peers << pi;
piCoutObj << "new peer \"" << pi.name << "\"" << " dist " << pi.dist;
pi.dist++;
sendSelfInfo();
sendPeerInfo(pi);
findNearestAddresses();
peerConnected(pi.name);
peerConnectedEvent(pi.name);
//piCout << "new peer packet ok";
break;
case 2: // remove peer
//piCout << "remove peer packet ..." << pi.name;
rpi = getPeerByName(pi.name);
if (!rpi) break;
dist = rpi->dist;
addToRemoved(*rpi);
removePeer(pi.name);
piCoutObj << "remove peer \"" << pi.name << "\"";
if (dist == 0)
self_info.removeNeighbour(pi.name);
sendPeerRemove(pi.name);
findNearestAddresses();
peerDisconnected(pi.name);
peerDisconnectedEvent(pi.name);
//piCout << "remove peer packet ok";
break;
case 3: // sync peers
//piCout << "sync packet ...";
ba >> pi >> rpeers;
rpeers << pi;
//piCoutObj << "rec sync " << rpeers.size_s() << " peers";
piForeach (PeerInfo & rpeer, rpeers) {
//piCout << " to sync " << rpeer.name;
if (rpeer.name == self_info.name) continue;
bool exist = false;
piForeach (PeerInfo & peer, peers) {
if (peer.name == rpeer.name) exist = true;
if (exist && isPeerRecent(peer, rpeer)) {
//piCout << "synced " << peer.name;
for (int z = 0; z < rpeer.addresses.size_s(); ++z) {
PeerInfo::Address & ra(rpeer.addresses[z]);
for (int k = 0; k < peer.addresses.size_s(); ++k) {
PeerInfo::Address & a(peer.addresses[k]);
if (ra.address == a.address) {
ra.ping = a.ping;
ra.wait_ping = a.wait_ping;
ra.last_ping = a.last_ping;
piBreak;
}
}
}
peer.addresses = rpeer.addresses;
peer.cnt = rpeer.cnt;
peer.time = rpeer.time;
peer.addNeighbours(rpeer.neighbours);
rpeer.neighbours = peer.neighbours;
if (peer.name == pi.name) peer.sync = 0;
piBreak;
}
}
if (exist || isRemoved(rpeer)) continue;
rpeer.dist++;
peers << rpeer;
findNearestAddresses();
peerConnected(rpeer.name);
peerConnectedEvent(rpeer.name);
}
//piCout << "***";
//piCout << self_info.name << self_info.neighbours;
piForeach (PeerInfo & i, peers) {
if (i.dist == 0) {
self_info.addNeighbour(i.name);
i.addNeighbour(self_info.name);
}
//piCout << i.name << i.neighbours;
}
//piCoutObj << "after sync " << peers.size_s() << " peers";
break;
}
return true;
}
bool PIPeer::sendToNeighbour(PIPeer::PeerInfo * peer, const PIByteArray & ba) {
//if (peer->_neth == 0) return false;
piCout << "[PIPeer] sendToNeighbour" << peer->name << peer->_naddress << ba.size_s() << "bytes ...";
//bool ok = peer->_neth->send(peer->_naddress, ba.data(), ba.size_s());
bool ok = eth_send.send(peer->_naddress, ba);
//piCout << "[PIPeer] sendToNeighbour" << (ok ? "ok" : "fail");
if (ok) diag_d.sended(ba.size_s());
return ok;
}
void PIPeer::sendMBcast(const PIByteArray & ba) {
//piCout << "sendMBcast" << ba.size() << "bytes ...";
piForeach (PIEthernet * e, eths_mcast) {
//errorClear();
//piCout << "send to" << e->path() << e->sendAddress();// << e->send(ba);
//piCout << PIEthernet::ethErrorString();
if (e->isOpened())
if (e->send(ba))
diag_s.sended(ba.size_s());
}
piForeach (PIEthernet * e, eths_bcast) {
//errorClear();
//piCout << "send to" << e->path() << e->sendAddress();// << e->send(ba);
//piCout << PIEthernet::ethErrorString();
if (e->isOpened())
if (e->send(ba))
diag_s.sended(ba.size_s());
}
for (int p = _PIPEER_LOOPBACK_PORT_S; p <= _PIPEER_LOOPBACK_PORT_E; ++p) {
eth_lo.setSendPort(p);
if (eth_lo.send(ba))
diag_s.sended(ba.size_s());
}
//piCout << "send muticast ok";
}
void PIPeer::sendPeerInfo(const PeerInfo & info) {
PIByteArray ba;
ba << int(1) << info.name << info;
sendMBcast(ba);
}
void PIPeer::sendPeerRemove(const PIString & peer) {
PIByteArray ba;
ba << int(2) << peer;
sendMBcast(ba);
}
void PIPeer::pingNeighbours() {
PIByteArray ba, sba;
ba << int(5) << self_info.name;
//piCout << "pingNeighbours" << peers.size();
piForeach (PeerInfo & p, peers) {
//piCout << " ping neighbour" << p.name;
if (!p.isNeighbour()) continue;
piForeach (PeerInfo::Address & a, p.addresses) {
//piCout << " address" << a.address << a.wait_ping;
if (a.wait_ping) continue;
a.wait_ping = true;
sba = ba;
sba << p.name << a.address << PISystemTime::current(true);
//piCout << "ping" << p.name << a.address << a.last_ping;
if (eth_send.send(a.address, sba))
diag_s.sended(sba.size_s());
}
}
}
void PIPeer::syncPeers() {
//piCout << "[PIPeer \"" + self_info.name + "\"] sync " << peers.size_s() << " peers";
PIMutexLocker locker(eth_mutex);
PIString pn;
bool change = false;
for (uint i = 0; i < peers.size(); ++i) {
PeerInfo & cp(peers[i]);
if (cp.sync > 3 && cp.dist == 0) {
pn = cp.name;
//piCoutObj << "sync: remove " << pn;
addToRemoved(cp);
peers.remove(i);
sendPeerRemove(pn);
--i;
piForeach (PeerInfo & p, peers)
p.removeNeighbour(pn);
self_info.removeNeighbour(pn);
peerDisconnected(pn);
peerDisconnectedEvent(pn);
change = true;
continue;
}
cp.sync++;
}
pingNeighbours();
if (change) findNearestAddresses();
self_info.cnt++;
self_info.time = PISystemTime::current();
PIByteArray ba;
ba << int(3) << self_info.name << self_info << peers;
sendMBcast(ba);
}
void PIPeer::findNearestAddresses() {
//piCout << "[PIPeer \"" + name_ + "\"] findNearestAddresses";
addresses_map.clear();
int max_dist = -1;
static PIMap<PIString, PeerInfo * > peers_;
peers_.clear();
self_info._nuses.resize(self_info.neighbours.size());
self_info._nuses.fill(0);
self_info._first = &self_info;
peers_[self_info.name] = &self_info;
piForeach (PeerInfo & i, peers) {
i._nuses.resize(i.neighbours.size());
i._nuses.fill(0);
i._first = 0;
peers_[i.name] = &i;
if (max_dist < i.dist)
max_dist = i.dist;
if (i.dist > 0) continue;
i._naddress.clear();
i._neth = 0;
PIString mma, ma;
bool af = false;
for (int mi = 0; mi < self_info.addresses.size_s(); ++mi) {
PeerInfo::Address & a(self_info.addresses[mi]);
if (af) break;
ma = a.address;
//mma = m.left(m.findLast("."));
mma = PIEthernet::applyMask(a.address, a.netmask);
for (int ii = 0; ii < i.addresses.size_s(); ++ii) {
PeerInfo::Address & r(i.addresses[ii]);
if (!r.isAvailable()) continue;
if (mma == PIEthernet::applyMask(r.address, r.netmask)) {
i._naddress = r.address;
//piCout << "_naddress" << i.name << "=" << r;
af = true;
break;
}
}
}
if (!af) continue;
//piCout << " peer" << i.name << ma;
piForeach (PIEthernet * e, eths_traffic)
if (e->readAddress() == ma) {
i._neth = e;
break;
}
//piCout << i.name << i._naddress;
}
PIVector<PeerInfo * > cwave, nwave;
PeerInfo * npeer;
cwave << &self_info;
for (int d = 0; d <= max_dist; ++d) {
if (cwave.isEmpty()) break;
nwave.clear();
piForeach (PeerInfo * p, cwave) {
int ns = p->neighbours.size_s();
for (int n = 0; n < ns; ++n) {
if (p->_nuses[n] >= ns) continue;
p->_nuses[n]++;
npeer = peers_[p->neighbours[n]];
if (npeer == 0) continue;
if (d == 0) npeer->_first = npeer;
else {
if (d == 1) npeer->_first = p;
else npeer->_first = p->_first;
}
nwave << npeer;
}
}
cwave = nwave;
//piCout << "wave" << d;
for (int i = 0; i < cwave.size_s(); ++i) {
//piCout << " peer" << cwave[i]->name << Hex << (uint)(cwave[i]->_first);
if (cwave[i]->_first == 0) {cwave.remove(i); --i; continue;}
if (addresses_map.contains(cwave[i]->name)) {cwave.remove(i); --i; continue;}
}
for (int i = 0; i < cwave.size_s(); ++i) {
PIVector<PeerInfo * > & pl(addresses_map[cwave[i]->name]);
if (!pl.contains(cwave[i]->_first))
pl << cwave[i]->_first;
}
}
/*piCout << " ** addresses map **";
piForeachC (napair & i, addresses_map)
piCout << i.first << i.second;
piCout << " ** addresses map end **";*/
}