18.03.2013 - Bug fixes, add in/out speed diagnostic to PIProtocol, fixed PIConsole tab switch segfault, PIObject EVENT / EVENT_HANDLER mechanism update - new EVENT macros that use EVENT_HANDLER with raiseEvent implementation.

This allow compile check event for CONNECT and use EVENT as CONNECT target, also raise event now is simple execute EVENT function.
This commit is contained in:
peri4
2013-03-18 12:07:44 +04:00
parent cfc5eed75e
commit 66c53a27fc
72 changed files with 4407 additions and 960 deletions

186
pipeer.cpp Normal file
View File

@@ -0,0 +1,186 @@
/*
PIP - Platform Independent Primitives
Peer - named I/O ethernet node
Copyright (C) 2013 Ivan Pelipenko peri4ko@gmail.com
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pipeer.h"
PIPeer::PIPeer(const PIString & name): PIEthernet() {
setName(name);
setParameter(PIEthernet::Broadcast);
setReadPort(13312);
setSendPort(13312);
srand(uint(PITimer::elapsed_system_m()));
//id_ = name() + "_" + PIString::fromNumber(rand());
CONNECT2(void, void * , int, &timer, timeout, this, timerEvent);
PIEthernet * ce;
PIStringList sl = PIEthernet::allAddresses();
PIString ta;
self_info.name = name_;
self_info.dist = 0;
piForeachC (PIString & i, sl) {
ce = new PIEthernet(this, func_readed);
ce->setReadAddress(i, 13313);
eths << ce;
ce->startThreadedRead();
self_info.addresses << i;
//cout << i << ": " << ta << endl;
}
eth_send = new PIEthernet();
eth_send->initialize();
startThreadedRead();
//joinMulticastGroup("239.13.3.12");
//timer.addDelimiter(5);
timer.start(1000);
sendSelfInfo();
}
PIPeer::~PIPeer() {
terminate();
sendSelfRemove();
//leaveMulticastGroup("239.13.3.12");
delete eth_send;
piForeach (PIEthernet * i, eths)
delete i;
eths.clear();
}
void PIPeer::timerEvent(void * data, int delim) {
switch (delim) {
case 1: // 5 s
syncPeers();
break;
}
//send("broadcast", 9);
}
bool PIPeer::threadedRead(uchar * data, int size) {
int header;
PeerInfo pi;
PIByteArray ba(data, size);
PIVector<PeerInfo> rpeers;
ba >> header >> pi.name;
if (pi.name == name_) return true;
switch (header) {
case 1: // new peer accepted
if (hasPeer(pi.name)) break;
ba >> pi.dist >> pi.addresses;
pi.sync = 0;
peers << pi;
cout << "[PIPeer \"" + name_ + "\"] new peer \"" << pi.name << "\"" << " dist " << pi.dist << endl;
pi.dist++;
sendSelfInfo();
sendPeerInfo(pi);
findNearestAddresses();
break;
case 2: // remove peer accepted
if (removePeer(pi.name)) {
cout << "[PIPeer \"" + name_ + "\"] remove peer \"" << pi.name << "\"" << endl;
sendPeerRemove(pi.name);
findNearestAddresses();
}
break;
case 3: // sync peers
ba >> pi.addresses >> rpeers;
rpeers << pi;
//cout << "[PIPeer \"" + name_ + "\"] rec sync " << rpeers.size_s() << " peers" << endl;
for (uint i = 0; i < rpeers.size(); ++i) {
PeerInfo & rpeer(rpeers[i]);
//cout << " to sync " << rpeer.name << endl;
if (rpeer.name == name_) continue;
bool exist = false;
for (uint j = 0; j < peers.size(); ++j) {
PeerInfo & peer(peers[j]);
if (peer.name == rpeer.name) {
//cout << "synced " << peer.name << endl;
peer.addresses == rpeer.addresses;
if (peer.name == pi.name) peer.sync = 0;
exist = true;
break;
}
}
if (exist) continue;
peers << rpeer;
peers.back().dist++;
findNearestAddresses();
}
break;
}
return true;
}
bool PIPeer::func_readed(void * peer, uchar * data, int size) {
PIPeer * p = (PIPeer * )peer;
cout << "[PIPeer \"" + p->name_ + "\"] received " << data << endl;
return true;
}
void PIPeer::sendPeerInfo(const PeerInfo & info) {
PIByteArray ba;
ba << int(1) << info.name << info.dist << info.addresses;
write(ba);
}
void PIPeer::sendPeerRemove(const PIString & peer) {
PIByteArray ba;
ba << int(2) << peer;
write(ba);
}
void PIPeer::syncPeers() {
//cout << "[PIPeer \"" + name_ + "\"] sync " << peers.size_s() << " peers" << endl;
PIString pn;
for (uint i = 0; i < peers.size(); ++i) {
PeerInfo & cp(peers[i]);
if (cp.sync > 1 && cp.dist == 0) {
pn = cp.name;
cout << "[PIPeer \"" + name_ + "\"] sync: remove " << pn << endl;
peers.remove(i);
sendPeerRemove(pn);
--i;
findNearestAddresses();
continue;
}
cp.sync++;
}
PIByteArray ba;
ba << int(3) << self_info.name << self_info.addresses << peers;
write(ba);
}
void PIPeer::findNearestAddresses() {
cout << "[PIPeer \"" + name_ + "\"] findNearestAddresses" << endl;
int max_dist = -1;
piForeach (PeerInfo & i, peers)
if (max_dist < i.dist)
max_dist = i.dist;
PIVector<PeerInfo * > cwave;
for (int d = 0; d <= max_dist; ++d) {
//if ()
}
}