PIDeque bugs fixed

git-svn-id: svn://db.shs.com.ru/pip@27 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5
This commit is contained in:
2015-03-16 12:17:14 +00:00
parent 7657996e98
commit e50bedc5ef
12 changed files with 198 additions and 217 deletions

View File

@@ -19,7 +19,7 @@
#include "pipeer.h"
#define _PIPEER_MSG_SIZE 8192
#define _PIPEER_MSG_SIZE 8000
#define _PIPEER_MULTICAST_TTL 4
#define _PIPEER_MULTICAST_IP "232.13.3.12"
#define _PIPEER_LOOPBACK_PORT_S 13313
@@ -211,7 +211,7 @@ void PIPeer::destroyMBcasts() {
PIPeer::PeerInfo * PIPeer::quickestPeer(const PIString & to) {
if (!addresses_map.contains(to)) return 0;
if (!peers_map.contains(to)) return 0;
//piCout << "*** search quickest peer" << to;
PIVector<PeerInfo * > tp = addresses_map[to];
PeerInfo * dp = 0;
@@ -229,6 +229,7 @@ PIPeer::PeerInfo * PIPeer::quickestPeer(const PIString & to) {
bool PIPeer::send(const PIString & to, const void * data, int size) {
PIMutexLocker mlocker(peers_mutex);
PeerInfo * dp = quickestPeer(to);
if (dp == 0) {
//piCoutObj << "Can`t find peer \"" << to << "\"!";
@@ -258,13 +259,13 @@ bool PIPeer::dataRead(uchar * readed, int size) {
PIString from, to;
ba >> type;
PIMutexLocker locker(eth_mutex);
PIMutexLocker plocker(peers_mutex);
//piCout << "[PIPeer \"" + name_ + "\"] Received packet" << type;
if (type == 5) { // ping request
PIString addr;
PISystemTime time;
ba >> to >> from >> addr >> time;
piCout << "ping request" << to << from << addr;
//piCout << "ping request" << to << from << addr;
PIMutexLocker plocker(peers_mutex);
if (from == self_info.name) { // send ping back
const PeerInfo * pi = getPeerByName(to);
if (pi) {
@@ -284,7 +285,8 @@ bool PIPeer::dataRead(uchar * readed, int size) {
PIString addr;
PISystemTime time, ptime, ctime = PISystemTime::current(true);
ba >> to >> from >> addr >> time;
piCout << "ping reply" << to << from << addr;
//piCout << "ping reply" << to << from << addr;
PIMutexLocker plocker(peers_mutex);
if (to == self_info.name) { // ping echo
piForeach (PeerInfo & p, peers) {
if (!p.isNeighbour()) continue;
@@ -297,7 +299,7 @@ bool PIPeer::dataRead(uchar * readed, int size) {
a.wait_ping = false;
if (a.ping < 0) a.ping = ptime.toMilliseconds();
else a.ping = 0.6 * a.ping + 0.4 * ptime.toMilliseconds();
piCout << "*** ping echo" << p.name << a.address << a.ping;
//piCout << "*** ping echo" << p.name << a.address << a.ping;
return true;
}
}
@@ -318,8 +320,12 @@ bool PIPeer::dataRead(uchar * readed, int size) {
dataReceivedEvent(from, ba);
return true;
}
peers_mutex.lock();
PeerInfo * fp = const_cast<PeerInfo * >(getPeerByName(from));
if (fp == 0) return true;
if (fp == 0) {
peers_mutex.unlock();
return true;
}
PeerData & pd(fp->_data);
if (cmsg == 0) {
//piCout << "[PIPeer \"" + name_ + "\"] Packet clear" << rec_size;
@@ -328,13 +334,17 @@ bool PIPeer::dataRead(uchar * readed, int size) {
}
//piCout << "[PIPeer \"" + name_ + "\"] Packet add" << cmsg << ba.size_s();
pd.addData(ba);
if (pd.isFullReceived()) {
dataReceived(from, pd.data);
dataReceivedEvent(from, pd.data);
bool frec = pd.isFullReceived();
PIByteArray rba(pd.data);
peers_mutex.unlock();
if (frec) {
dataReceived(from, rba);
dataReceivedEvent(from, rba);
//piCout << "[PIPeer \"" + name_ + "\"] Packet received" << pd.data.size_s();
}
return true;
}
PIMutexLocker plocker(peers_mutex);
PeerInfo * dp = quickestPeer(to);
if (dp == 0) {
//piCoutObj << "Can`t find peer \"" << to << "\"!";
@@ -379,8 +389,8 @@ bool PIPeer::mbcastRead(uchar * data, int size) {
self_info.addNeighbour(pi.name);
}
peers << pi;
findNearestAddresses();
piCoutObj << "new peer \"" << pi.name << "\"" << " dist " << pi.dist;
buildMap();
//piCoutObj << "new peer \"" << pi.name << "\"" << " dist " << pi.dist;
pi.dist++;
sendSelfInfo();
sendPeerInfo(pi);
@@ -398,11 +408,11 @@ bool PIPeer::mbcastRead(uchar * data, int size) {
dist = rpi->dist;
addToRemoved(*rpi);
removePeer(pi.name);
piCoutObj << "remove peer \"" << pi.name << "\"";
//piCoutObj << "remove peer \"" << pi.name << "\"";
if (dist == 0)
self_info.removeNeighbour(pi.name);
sendPeerRemove(pi.name);
findNearestAddresses();
buildMap();
peerDisconnected(pi.name);
peerDisconnectedEvent(pi.name);
//piCout << "remove peer packet ok";
@@ -464,7 +474,7 @@ bool PIPeer::mbcastRead(uchar * data, int size) {
//piCout << i.name << i.neighbours;
}
if (ch)
findNearestAddresses();
buildMap();
peers_mutex.unlock();
//piCoutObj << "after sync " << peers.size_s() << " peers";
break;
@@ -542,7 +552,7 @@ void PIPeer::pingNeighbours() {
a.wait_ping = true;
sba = ba;
sba << p.name << a.address << PISystemTime::current(true);
piCout << "ping" << p.name << a.address << a.last_ping;
//piCout << "ping" << p.name << a.address << a.last_ping;
if (eth_send.send(a.address, sba))
diag_s.sended(sba.size_s());
}
@@ -556,11 +566,11 @@ void PIPeer::syncPeers() {
PIString pn;
bool change = false;
peers_mutex.lock();
for (uint i = 0; i < peers.size(); ++i) {
for (int i = 0; i < peers.size_s(); ++i) {
PeerInfo & cp(peers[i]);
if (cp.sync > 3) {
pn = cp.name;
piCoutObj << "sync: remove " << pn;
//piCoutObj << "sync: remove " << pn;
addToRemoved(cp);
peers.remove(i);
sendPeerRemove(pn);
@@ -580,7 +590,7 @@ void PIPeer::syncPeers() {
cp.was_update = false;
}
pingNeighbours();
if (change) findNearestAddresses();
if (change) buildMap();
self_info.cnt++;
self_info.time = PISystemTime::current();
PIByteArray ba;
@@ -590,9 +600,13 @@ void PIPeer::syncPeers() {
}
void PIPeer::findNearestAddresses() {
//piCout << "[PIPeer \"" + name_ + "\"] findNearestAddresses";
void PIPeer::buildMap() {
//piCout << "[PIPeer \"" + name_ + "\"] buildMap";
peers_map.clear();
addresses_map.clear();
piForeach (PeerInfo & i, peers)
peers_map[i.name] = &i;
return;
int max_dist = -1;
PIMap<PIString, PeerInfo * > peers_;
self_info._nuses.resize(self_info.neighbours.size());