git-svn-id: svn://db.shs.com.ru/pip@232 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5

This commit is contained in:
2016-08-19 19:58:54 +00:00
parent 4ceabcd22d
commit 0133533ac2
7 changed files with 120 additions and 108 deletions

View File

@@ -127,14 +127,16 @@ PIString PIPeer::PeerInfo::fastestAddress() const {
}
REGISTER_DEVICE(PIPeer)
PIPeer::PIPeer(const PIString & name_): PIObject() {
PIPeer::PIPeer(const PIString & n): PIIODevice() {
destroyed = false;
setName(name_);
self_info.name = name_;
diag_d.setName(name_+"_data");
diag_s.setName(name_+"_service");
PIMutexLocker mbl(mc_mutex);
PIMutexLocker ethl(eth_mutex);
PIMutexLocker pl(peers_mutex);
PIMutexLocker sl(send_mutex);
changeName(n);
read_buffer_size = 128;
self_info.dist = 0;
self_info.time = PISystemTime::current();
//joinMulticastGroup("239.240.241.242");
@@ -385,6 +387,11 @@ bool PIPeer::sendInternal(const PIString & to, const PIByteArray & data) {
void PIPeer::dtReceived(const PIString & from, const PIByteArray & data) {
dataReceived(from, data);
dataReceivedEvent(from, data);
if (trust_peer.isEmpty() || trust_peer == from) {
read_buffer_mutex.lock();
if (read_buffer.size() < read_buffer_size) read_buffer.enqueue(data);
read_buffer_mutex.unlock();
}
}
@@ -464,8 +471,7 @@ bool PIPeer::dataRead(uchar * readed, int size) {
}
if (pt == 1) {
peers_mutex.unlock();
dataReceived(from, pba);
dataReceivedEvent(from, pba);
dtReceived(from, pba);
return true;
}
if (pt == 2 || pt == 3) {
@@ -752,6 +758,14 @@ void PIPeer::pingNeighbours() {
}
bool PIPeer::openDevice() {
PIMutexLocker ml(peers_mutex);
if (trust_peer.isEmpty())
return !peers.isEmpty();
return hasPeer(trust_peer);
}
void PIPeer::syncPeers() {
//piCout << "[PIPeer \"" + self_info.name + "\"] sync " << peers.size_s() << " peers";
PIMutexLocker locker(eth_mutex);
@@ -826,6 +840,60 @@ void PIPeer::reinit() {
}
void PIPeer::changeName(const PIString &new_name) {
PIString name_ = new_name;
if (name_.isEmpty()) name_ = "rnd_" + PIString::fromNumber(random() % 1000);
setName(name_);
self_info.name = name_;
diag_d.setName(name_+"_data");
diag_s.setName(name_+"_service");
}
PIString PIPeer::constructFullPath() const {
PIString ret(fullPathPrefix() + "://");
ret << name() << ":" << trustPeerName();
return ret;
}
int PIPeer::read(void *read_to, int max_size) {
bool empty = read_buffer.isEmpty();
while (empty) piMSleep(10);
PIMutexLocker ml(read_buffer_mutex);
if (!read_buffer.isEmpty()) {
PIByteArray ba = read_buffer.dequeue();
int sz = piMini(ba.size_s(), max_size);
memcpy(read_to, ba.data(), sz);
return sz;
}
return 0;
}
int PIPeer::write(const void *data, int size) {
if (trust_peer.isEmpty()) {
sendToAll(data, size);
return size;
}
if (send(trust_peer, data, size))
return size;
else return -1;
}
void PIPeer::configureFromFullPath(const PIString & full_path) {
PIStringList pl = full_path.split(":");
for (int i = 0; i < pl.size_s(); ++i) {
PIString p(pl[i]);
switch (i) {
case 0: changeName(p); break;
case 1: setTrustPeerName(p); break;
}
}
}
void PIPeer::initNetwork() {
// piCoutObj << "initNetwork ...";
eth_send.init();
@@ -890,85 +958,4 @@ void PIPeer::buildMap() {
addresses_map[c.name] = cpath;
//piCout << "map" << c.name << "=" << cpath;
}
/*int max_dist = -1;
self_info._nuses.resize(self_info.neighbours.size());
self_info._nuses.fill(0);
self_info._first = &self_info;
peers_map[self_info.name] = &self_info;
piForeach (PeerInfo & i, peers) {
i._nuses.resize(i.neighbours.size());
i._nuses.fill(0);
i._first = 0;
peers_map[i.name] = &i;
if (max_dist < i.dist)
max_dist = i.dist;
if (i.dist > 0) continue;
i._naddress.clear();
i._neth = 0;
PIString mma, ma;
bool af = false;
for (int mi = 0; mi < self_info.addresses.size_s(); ++mi) {
PeerInfo::Address & a(self_info.addresses[mi]);
if (af) break;
ma = a.address;
//mma = m.left(m.findLast("."));
mma = PIEthernet::applyMask(a.address, a.netmask);
for (int ii = 0; ii < i.addresses.size_s(); ++ii) {
PeerInfo::Address & r(i.addresses[ii]);
if (!r.isAvailable()) continue;
if (mma == PIEthernet::applyMask(r.address, r.netmask)) {
i._naddress = r.address;
//piCout << "_naddress" << i.name << "=" << r;
af = true;
break;
}
}
}
if (!af) continue;
//piCout << " peer" << i.name << ma;
piForeach (PIEthernet * e, eths_traffic)
if (e->readAddress() == ma) {
i._neth = e;
piBreak;
}
//piCout << i.name << i._naddress;
}
PIVector<PeerInfo * > cwave, nwave;
PeerInfo * npeer;
cwave << &self_info;
for (int d = 0; d <= max_dist; ++d) {
if (cwave.isEmpty()) break;
nwave.clear();
piForeach (PeerInfo * p, cwave) {
int ns = p->neighbours.size_s();
for (int n = 0; n < ns; ++n) {
if (p->_nuses[n] >= ns) continue;
p->_nuses[n]++;
npeer = peers_map[p->neighbours[n]];
if (npeer == 0) continue;
if (d == 0) npeer->_first = npeer;
else {
if (d == 1) npeer->_first = p;
else npeer->_first = p->_first;
}
nwave << npeer;
}
}
cwave = nwave;
//piCout << "wave" << d;
for (int i = 0; i < cwave.size_s(); ++i) {
//piCout << " peer" << cwave[i]->name << Hex << (uint)(cwave[i]->_first);
if (cwave[i]->_first == 0) {cwave.remove(i); --i; continue;}
if (addresses_map.contains(cwave[i]->name)) {cwave.remove(i); --i; continue;}
}
for (int i = 0; i < cwave.size_s(); ++i) {
PIVector<PeerInfo * > & pl(addresses_map[cwave[i]->name]);
if (!pl.contains(cwave[i]->_first))
pl << cwave[i]->_first;
}
}*/
/*piCout << " ** addresses map **";
piForeachC (napair & i, addresses_map)
piCout << i.first << i.second;
piCout << " ** addresses map end **";*/
}

View File

@@ -27,9 +27,9 @@
#include "pidiagnostics.h"
#include "pidatatransfer.h"
class PIP_EXPORT PIPeer: public PIObject
class PIP_EXPORT PIPeer: public PIIODevice
{
PIOBJECT_SUBCLASS(PIPeer, PIObject)
PIIODEVICE(PIPeer)
private:
class PeerData: public PIObject {
@@ -53,7 +53,7 @@ private:
};
public:
explicit PIPeer(const PIString & name);
explicit PIPeer(const PIString & name = PIString());
virtual ~PIPeer();
class PeerInfo {
@@ -132,6 +132,13 @@ public:
void reinit();
void lock() {peers_mutex.lock();}
void unlock() {peers_mutex.unlock();}
void changeName(const PIString & new_name);
const PIString & trustPeerName() const {return trust_peer;}
void setTrustPeerName(const PIString & peer_name) {trust_peer = peer_name;}
PIString constructFullPath() const;
int read(void *read_to, int max_size);
int write(const void * data, int size);
EVENT2(dataReceivedEvent, const PIString &, from, const PIByteArray &, data)
EVENT1(peerConnectedEvent, const PIString &, name)
@@ -171,7 +178,13 @@ private:
void pingNeighbours();
void addToRemoved(const PeerInfo & pi) {removed[pi.name] = PIPair<int, PISystemTime>(pi.cnt, pi.time);}
bool isRemoved(const PeerInfo & pi) const {return (removed.value(pi.name) == PIPair<int, PISystemTime>(pi.cnt, pi.time));}
bool openDevice();
bool closeDevice() {return false;}
PIString fullPathPrefix() const {return "peer";}
void configureFromFullPath(const PIString &full_path);
PeerInfo * quickestPeer(const PIString & to);
bool sendToNeighbour(PeerInfo * peer, const PIByteArray & ba);
inline static bool isPeerRecent(const PeerInfo & my, const PeerInfo & income) {return (my.cnt < income.cnt) || (my.time < income.time);}
@@ -196,8 +209,11 @@ private:
PIDiagnostics diag_s, diag_d;
bool destroyed, no_timer;
PIString id_;
};
PIString trust_peer;
PIMutex read_buffer_mutex;
PIQueue<PIByteArray> read_buffer;
int read_buffer_size;
};
inline PICout operator <<(PICout c, const PIPeer::PeerInfo::Address & v) {c.space(); c << "PeerAddress(" << v.address << ", " << v.netmask << ", " << v.ping << ")"; return c;}
inline PICout operator <<(PICout c, const PIPeer::PeerInfo & v) {c.space(); c << "PeerInfo(" << v.name << ", " << v.dist << ", " << v.addresses << ")"; return c;}