This allow compile check event for CONNECT and use EVENT as CONNECT target, also raise event now is simple execute EVENT function.
187 lines
4.6 KiB
C++
187 lines
4.6 KiB
C++
/*
|
|
PIP - Platform Independent Primitives
|
|
Peer - named I/O ethernet node
|
|
Copyright (C) 2013 Ivan Pelipenko peri4ko@gmail.com
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "pipeer.h"
|
|
|
|
|
|
PIPeer::PIPeer(const PIString & name): PIEthernet() {
|
|
setName(name);
|
|
setParameter(PIEthernet::Broadcast);
|
|
setReadPort(13312);
|
|
setSendPort(13312);
|
|
srand(uint(PITimer::elapsed_system_m()));
|
|
//id_ = name() + "_" + PIString::fromNumber(rand());
|
|
CONNECT2(void, void * , int, &timer, timeout, this, timerEvent);
|
|
|
|
PIEthernet * ce;
|
|
PIStringList sl = PIEthernet::allAddresses();
|
|
PIString ta;
|
|
self_info.name = name_;
|
|
self_info.dist = 0;
|
|
piForeachC (PIString & i, sl) {
|
|
ce = new PIEthernet(this, func_readed);
|
|
ce->setReadAddress(i, 13313);
|
|
eths << ce;
|
|
ce->startThreadedRead();
|
|
self_info.addresses << i;
|
|
//cout << i << ": " << ta << endl;
|
|
}
|
|
eth_send = new PIEthernet();
|
|
eth_send->initialize();
|
|
|
|
startThreadedRead();
|
|
//joinMulticastGroup("239.13.3.12");
|
|
//timer.addDelimiter(5);
|
|
timer.start(1000);
|
|
sendSelfInfo();
|
|
}
|
|
|
|
|
|
PIPeer::~PIPeer() {
|
|
terminate();
|
|
sendSelfRemove();
|
|
//leaveMulticastGroup("239.13.3.12");
|
|
delete eth_send;
|
|
piForeach (PIEthernet * i, eths)
|
|
delete i;
|
|
eths.clear();
|
|
}
|
|
|
|
|
|
void PIPeer::timerEvent(void * data, int delim) {
|
|
switch (delim) {
|
|
case 1: // 5 s
|
|
syncPeers();
|
|
break;
|
|
}
|
|
//send("broadcast", 9);
|
|
}
|
|
|
|
|
|
bool PIPeer::threadedRead(uchar * data, int size) {
|
|
int header;
|
|
PeerInfo pi;
|
|
PIByteArray ba(data, size);
|
|
PIVector<PeerInfo> rpeers;
|
|
ba >> header >> pi.name;
|
|
if (pi.name == name_) return true;
|
|
switch (header) {
|
|
case 1: // new peer accepted
|
|
if (hasPeer(pi.name)) break;
|
|
ba >> pi.dist >> pi.addresses;
|
|
pi.sync = 0;
|
|
peers << pi;
|
|
cout << "[PIPeer \"" + name_ + "\"] new peer \"" << pi.name << "\"" << " dist " << pi.dist << endl;
|
|
pi.dist++;
|
|
sendSelfInfo();
|
|
sendPeerInfo(pi);
|
|
findNearestAddresses();
|
|
break;
|
|
case 2: // remove peer accepted
|
|
if (removePeer(pi.name)) {
|
|
cout << "[PIPeer \"" + name_ + "\"] remove peer \"" << pi.name << "\"" << endl;
|
|
sendPeerRemove(pi.name);
|
|
findNearestAddresses();
|
|
}
|
|
break;
|
|
case 3: // sync peers
|
|
ba >> pi.addresses >> rpeers;
|
|
rpeers << pi;
|
|
//cout << "[PIPeer \"" + name_ + "\"] rec sync " << rpeers.size_s() << " peers" << endl;
|
|
for (uint i = 0; i < rpeers.size(); ++i) {
|
|
PeerInfo & rpeer(rpeers[i]);
|
|
//cout << " to sync " << rpeer.name << endl;
|
|
if (rpeer.name == name_) continue;
|
|
bool exist = false;
|
|
for (uint j = 0; j < peers.size(); ++j) {
|
|
PeerInfo & peer(peers[j]);
|
|
if (peer.name == rpeer.name) {
|
|
//cout << "synced " << peer.name << endl;
|
|
peer.addresses == rpeer.addresses;
|
|
if (peer.name == pi.name) peer.sync = 0;
|
|
exist = true;
|
|
break;
|
|
}
|
|
}
|
|
if (exist) continue;
|
|
peers << rpeer;
|
|
peers.back().dist++;
|
|
findNearestAddresses();
|
|
}
|
|
break;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
bool PIPeer::func_readed(void * peer, uchar * data, int size) {
|
|
PIPeer * p = (PIPeer * )peer;
|
|
cout << "[PIPeer \"" + p->name_ + "\"] received " << data << endl;
|
|
return true;
|
|
}
|
|
|
|
|
|
void PIPeer::sendPeerInfo(const PeerInfo & info) {
|
|
PIByteArray ba;
|
|
ba << int(1) << info.name << info.dist << info.addresses;
|
|
write(ba);
|
|
}
|
|
|
|
|
|
void PIPeer::sendPeerRemove(const PIString & peer) {
|
|
PIByteArray ba;
|
|
ba << int(2) << peer;
|
|
write(ba);
|
|
}
|
|
|
|
|
|
void PIPeer::syncPeers() {
|
|
//cout << "[PIPeer \"" + name_ + "\"] sync " << peers.size_s() << " peers" << endl;
|
|
PIString pn;
|
|
for (uint i = 0; i < peers.size(); ++i) {
|
|
PeerInfo & cp(peers[i]);
|
|
if (cp.sync > 1 && cp.dist == 0) {
|
|
pn = cp.name;
|
|
cout << "[PIPeer \"" + name_ + "\"] sync: remove " << pn << endl;
|
|
peers.remove(i);
|
|
sendPeerRemove(pn);
|
|
--i;
|
|
findNearestAddresses();
|
|
continue;
|
|
}
|
|
cp.sync++;
|
|
}
|
|
PIByteArray ba;
|
|
ba << int(3) << self_info.name << self_info.addresses << peers;
|
|
write(ba);
|
|
}
|
|
|
|
|
|
void PIPeer::findNearestAddresses() {
|
|
cout << "[PIPeer \"" + name_ + "\"] findNearestAddresses" << endl;
|
|
int max_dist = -1;
|
|
piForeach (PeerInfo & i, peers)
|
|
if (max_dist < i.dist)
|
|
max_dist = i.dist;
|
|
PIVector<PeerInfo * > cwave;
|
|
for (int d = 0; d <= max_dist; ++d) {
|
|
//if ()
|
|
}
|
|
}
|