version 0.5.0_alpha

git-svn-id: svn://db.shs.com.ru/pip@8 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5
This commit is contained in:
2015-03-10 10:13:18 +00:00
parent b1f651ab62
commit c11bc3b3b8
697 changed files with 18150 additions and 18839 deletions

View File

@@ -1,7 +1,7 @@
/*
PIP - Platform Independent Primitives
Peer - named I/O ethernet node
Copyright (C) 2014 Ivan Pelipenko peri4ko@gmail.com
Copyright (C) 2015 Ivan Pelipenko peri4ko@gmail.com
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -133,7 +133,7 @@ void PIPeer::initMBcasts(PIStringList al) {
PIString nm;
al << _PIPEER_MULTICAST_IP;
piForeachC (PIString & a, al) {
piCout << "mcast try" << a;
//piCout << "mcast try" << a;
ce = new PIEthernet();
ce->setDebug(false);
ce->setName("__S__PIPeer_mcast_eth_" + a);
@@ -156,7 +156,7 @@ void PIPeer::initMBcasts(PIStringList al) {
ce->setSendAddress(PIEthernet::getBroadcast(a, nm), _PIPEER_BROADCAST_PORT);
ce->setReadAddress(a, _PIPEER_BROADCAST_PORT);
//piCout << "mc BC try" << a << nm << ce->sendIP();
piCout << "bcast try" << a << nm;
//piCout << "bcast try" << a << nm;
eths_bcast << ce;
CONNECTU(ce, threadedReadEvent, this, mbcastRead);
ce->startThreadedRead();
@@ -171,7 +171,7 @@ void PIPeer::initMBcasts(PIStringList al) {
eth_lo.setSendIP("127.0.0.1");
CONNECTU(&eth_lo, threadedReadEvent, this, mbcastRead);
eth_lo.startThreadedRead();
piCout << "lo binded to" << eth_lo.readAddress();
//piCout << "lo binded to" << eth_lo.readAddress();
//piCout << "add eth" << ta;
break;
}
@@ -335,55 +335,63 @@ bool PIPeer::mbcastRead(uchar * data, int size) {
ba >> type;
if (type <= 0 || type >= 4) return true;
PeerInfo pi;
const PeerInfo * rpi = 0;
PIVector<PeerInfo> rpeers;
ba >> pi.name;
//piCout << "read type" << type << "from" << pi.name;
if (pi.name == self_info.name) return true;
PIMutexLocker locker(mc_mutex);
diag_s.received(size);
const PeerInfo * rpi = 0;
bool ch = false;
PIVector<PeerInfo> rpeers;
//piCout << "analyz ...";
switch (type) {
case 1: // new peer
//piCout << "new peer packet ...";
if (hasPeer(pi.name)) break;
ba >> pi;
pi.sync = 0;
if (pi.dist == 0) {
pi.addNeighbour(self_info.name);
self_info.addNeighbour(pi.name);
peers_mutex.lock();
if (hasPeer(pi.name)) {
ba >> pi;
pi.sync = 0;
if (pi.dist == 0) {
pi.addNeighbour(self_info.name);
self_info.addNeighbour(pi.name);
}
peers << pi;
findNearestAddresses();
piCoutObj << "new peer \"" << pi.name << "\"" << " dist " << pi.dist;
pi.dist++;
sendSelfInfo();
sendPeerInfo(pi);
peerConnected(pi.name);
peerConnectedEvent(pi.name);
//piCout << "new peer packet ok";
}
peers << pi;
piCoutObj << "new peer \"" << pi.name << "\"" << " dist " << pi.dist;
pi.dist++;
sendSelfInfo();
sendPeerInfo(pi);
findNearestAddresses();
peerConnected(pi.name);
peerConnectedEvent(pi.name);
//piCout << "new peer packet ok";
peers_mutex.unlock();
break;
case 2: // remove peer
//piCout << "remove peer packet ..." << pi.name;
peers_mutex.lock();
rpi = getPeerByName(pi.name);
if (!rpi) break;
dist = rpi->dist;
addToRemoved(*rpi);
removePeer(pi.name);
piCoutObj << "remove peer \"" << pi.name << "\"";
if (dist == 0)
self_info.removeNeighbour(pi.name);
sendPeerRemove(pi.name);
findNearestAddresses();
peerDisconnected(pi.name);
peerDisconnectedEvent(pi.name);
//piCout << "remove peer packet ok";
if (rpi) {
dist = rpi->dist;
addToRemoved(*rpi);
removePeer(pi.name);
piCoutObj << "remove peer \"" << pi.name << "\"";
if (dist == 0)
self_info.removeNeighbour(pi.name);
sendPeerRemove(pi.name);
findNearestAddresses();
peerDisconnected(pi.name);
peerDisconnectedEvent(pi.name);
//piCout << "remove peer packet ok";
}
peers_mutex.unlock();
break;
case 3: // sync peers
//piCout << "sync packet ...";
ba >> pi >> rpeers;
rpeers << pi;
//piCoutObj << "rec sync " << rpeers.size_s() << " peers";
peers_mutex.lock();
piForeach (PeerInfo & rpeer, rpeers) {
//piCout << " to sync " << rpeer.name;
if (rpeer.name == self_info.name) continue;
@@ -416,12 +424,14 @@ bool PIPeer::mbcastRead(uchar * data, int size) {
if (exist || isRemoved(rpeer)) continue;
rpeer.dist++;
peers << rpeer;
findNearestAddresses();
ch = true;
peerConnected(rpeer.name);
peerConnectedEvent(rpeer.name);
}
//piCout << "***";
//piCout << self_info.name << self_info.neighbours;
if (ch)
findNearestAddresses();
piForeach (PeerInfo & i, peers) {
if (i.dist == 0) {
self_info.addNeighbour(i.name);
@@ -429,6 +439,7 @@ bool PIPeer::mbcastRead(uchar * data, int size) {
}
//piCout << i.name << i.neighbours;
}
peers_mutex.unlock();
//piCoutObj << "after sync " << peers.size_s() << " peers";
break;
}
@@ -514,6 +525,7 @@ void PIPeer::syncPeers() {
PIMutexLocker locker(eth_mutex);
PIString pn;
bool change = false;
peers_mutex.lock();
for (uint i = 0; i < peers.size(); ++i) {
PeerInfo & cp(peers[i]);
if (cp.sync > 3 && cp.dist == 0) {
@@ -539,6 +551,7 @@ void PIPeer::syncPeers() {
self_info.time = PISystemTime::current();
PIByteArray ba;
ba << int(3) << self_info.name << self_info << peers;
peers_mutex.unlock();
sendMBcast(ba);
}