PIFile: applyFileInfo

PIPeer: fixed
pisd work progress ...

git-svn-id: svn://db.shs.com.ru/pip@20 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5
This commit is contained in:
2015-03-12 13:28:27 +00:00
parent 3ed292a602
commit fdb48aba64
14 changed files with 346 additions and 148 deletions

View File

@@ -46,6 +46,18 @@ int PIPeer::PeerInfo::ping() const {
}
PIString PIPeer::PeerInfo::fastestAddress() const {
double mp = -1.;
PIString ret;
piForeachC (Address & a, addresses)
if (a.ping > 0.) {
mp = piMaxd(mp, a.ping);
ret = a.address;
}
return ret;
}
PIPeer::PIPeer(const PIString & name_): PIObject() {
setName(name_);
self_info.name = name_;
@@ -202,12 +214,13 @@ PIPeer::PeerInfo * PIPeer::quickestPeer(const PIString & to) {
PeerInfo * dp = 0;
int mping = 0x7FFFFFFF;
for (int i = 0; i < tp.size_s(); ++i) {
if (mping > tp[i]->ping()) {
mping = tp[i]->ping();
int p = tp[i]->ping();
if (mping > p && p > 0) {
mping = p;
dp = tp[i];
}
}
//piCout << "*** search quickest peer: found" << dp->name;
//piCout << "*** search quickest peer: found" << (dp ? dp->name : "0");
return dp;
}
@@ -222,7 +235,7 @@ bool PIPeer::send(const PIString & to, const void * data, int size) {
ba << int(4) << self_info.name << to << int(0) << size;
PIByteArray fmsg(data, size), cmsg;
int msg_count = (size - 1) / _PIPEER_MSG_SIZE + 1;
//piCout << "[PIPeer] send" << size << "bytes in" << msg_count << "packets ...";
piCout << "[PIPeer] send" << size << "bytes in" << msg_count << "packets ...";
for (int i = 0; i < msg_count; ++i) {
int csize = (i == msg_count - 1) ? ((size - 1) % _PIPEER_MSG_SIZE + 1) : _PIPEER_MSG_SIZE;
cmsg = ba;
@@ -230,7 +243,7 @@ bool PIPeer::send(const PIString & to, const void * data, int size) {
cmsg.append(fmsg.data(i * _PIPEER_MSG_SIZE), csize);
if (!sendToNeighbour(dp, cmsg)) return false;
}
//piCout << "[PIPeer] send" << size << "bytes ok";
piCout << "[PIPeer] send" << size << "bytes ok";
return true;
}
@@ -242,6 +255,7 @@ bool PIPeer::dataRead(uchar * readed, int size) {
PIString from, to;
ba >> type;
PIMutexLocker locker(eth_mutex);
PIMutexLocker plocker(peers_mutex);
//piCout << "[PIPeer \"" + name_ + "\"] Received packet" << type;
if (type == 5) { // ping
PIString addr;
@@ -259,7 +273,7 @@ bool PIPeer::dataRead(uchar * readed, int size) {
a.wait_ping = false;
if (a.ping < 0) a.ping = ptime.toMilliseconds();
else a.ping = 0.6 * a.ping + 0.4 * ptime.toMilliseconds();
piCout << "*** ping echo" << p.name << a.address << a.ping;
//piCout << "*** ping echo" << p.name << a.address << a.ping;
return true;
}
}
@@ -348,7 +362,7 @@ bool PIPeer::mbcastRead(uchar * data, int size) {
case 1: // new peer
//piCout << "new peer packet ...";
peers_mutex.lock();
if (hasPeer(pi.name)) {
if (!hasPeer(pi.name)) {
ba >> pi;
pi.sync = 0;
if (pi.dist == 0) {
@@ -397,27 +411,30 @@ bool PIPeer::mbcastRead(uchar * data, int size) {
if (rpeer.name == self_info.name) continue;
bool exist = false;
piForeach (PeerInfo & peer, peers) {
if (peer.name == rpeer.name) exist = true;
if (exist && isPeerRecent(peer, rpeer)) {
//piCout << "synced " << peer.name;
for (int z = 0; z < rpeer.addresses.size_s(); ++z) {
PeerInfo::Address & ra(rpeer.addresses[z]);
for (int k = 0; k < peer.addresses.size_s(); ++k) {
PeerInfo::Address & a(peer.addresses[k]);
if (ra.address == a.address) {
ra.ping = a.ping;
ra.wait_ping = a.wait_ping;
ra.last_ping = a.last_ping;
piBreak;
if (peer.name == rpeer.name) {
exist = true;
if (isPeerRecent(peer, rpeer)) {
//piCout << "synced " << peer.name;
for (int z = 0; z < rpeer.addresses.size_s(); ++z) {
PeerInfo::Address & ra(rpeer.addresses[z]);
for (int k = 0; k < peer.addresses.size_s(); ++k) {
PeerInfo::Address & a(peer.addresses[k]);
if (ra.address == a.address) {
ra.ping = a.ping;
ra.wait_ping = a.wait_ping;
ra.last_ping = a.last_ping;
piBreak;
}
}
}
peer.was_update = true;
peer.addresses = rpeer.addresses;
peer.cnt = rpeer.cnt;
peer.time = rpeer.time;
peer.addNeighbours(rpeer.neighbours);
rpeer.neighbours = peer.neighbours;
if (peer.name == pi.name) peer.sync = 0;
}
peer.addresses = rpeer.addresses;
peer.cnt = rpeer.cnt;
peer.time = rpeer.time;
peer.addNeighbours(rpeer.neighbours);
rpeer.neighbours = peer.neighbours;
if (peer.name == pi.name) peer.sync = 0;
piBreak;
}
}
@@ -430,8 +447,6 @@ bool PIPeer::mbcastRead(uchar * data, int size) {
}
//piCout << "***";
//piCout << self_info.name << self_info.neighbours;
if (ch)
findNearestAddresses();
piForeach (PeerInfo & i, peers) {
if (i.dist == 0) {
self_info.addNeighbour(i.name);
@@ -439,6 +454,8 @@ bool PIPeer::mbcastRead(uchar * data, int size) {
}
//piCout << i.name << i.neighbours;
}
if (ch)
findNearestAddresses();
peers_mutex.unlock();
//piCoutObj << "after sync " << peers.size_s() << " peers";
break;
@@ -449,9 +466,10 @@ bool PIPeer::mbcastRead(uchar * data, int size) {
bool PIPeer::sendToNeighbour(PIPeer::PeerInfo * peer, const PIByteArray & ba) {
//if (peer->_neth == 0) return false;
piCout << "[PIPeer] sendToNeighbour" << peer->name << peer->_naddress << ba.size_s() << "bytes ...";
PIString addr = peer->fastestAddress();
piCout << "[PIPeer] sendToNeighbour" << peer->name << addr << ba.size_s() << "bytes ...";
//bool ok = peer->_neth->send(peer->_naddress, ba.data(), ba.size_s());
bool ok = eth_send.send(peer->_naddress, ba);
bool ok = eth_send.send(addr, ba);
//piCout << "[PIPeer] sendToNeighbour" << (ok ? "ok" : "fail");
if (ok) diag_d.sended(ba.size_s());
return ok;
@@ -504,7 +522,7 @@ void PIPeer::pingNeighbours() {
ba << int(5) << self_info.name;
//piCout << "pingNeighbours" << peers.size();
piForeach (PeerInfo & p, peers) {
//piCout << " ping neighbour" << p.name;
piCout << " ping neighbour" << p.name << p.ping();
if (!p.isNeighbour()) continue;
piForeach (PeerInfo::Address & a, p.addresses) {
//piCout << " address" << a.address << a.wait_ping;
@@ -528,7 +546,7 @@ void PIPeer::syncPeers() {
peers_mutex.lock();
for (uint i = 0; i < peers.size(); ++i) {
PeerInfo & cp(peers[i]);
if (cp.sync > 3 && cp.dist == 0) {
if (cp.sync > 3) {
pn = cp.name;
//piCoutObj << "sync: remove " << pn;
addToRemoved(cp);
@@ -543,7 +561,11 @@ void PIPeer::syncPeers() {
change = true;
continue;
}
cp.sync++;
if (cp.was_update)
cp.sync = 0;
else
cp.sync++;
cp.was_update = false;
}
pingNeighbours();
if (change) findNearestAddresses();
@@ -600,7 +622,7 @@ void PIPeer::findNearestAddresses() {
piForeach (PIEthernet * e, eths_traffic)
if (e->readAddress() == ma) {
i._neth = e;
break;
piBreak;
}
//piCout << i.name << i._naddress;
}