17.10.2013 - Adjusted for QNX, PIPeer release for Windows, Remote console

This commit is contained in:
peri4
2013-10-17 16:12:10 +04:00
parent 4b90f2818e
commit 0f1b528ac6
42 changed files with 585 additions and 171 deletions

View File

@@ -21,7 +21,7 @@
#define _PIPEER_PORT_SYNC_START 13313
#define _PIPEER_PORT_SYNC_END 13353
#define _PIPEER_IP_MULTICAST "239.13.3.12"
#define _PIPEER_IP_MULTICAST "230.13.3.12"
#define _PIPEER_MSG_SIZE 8192
PIPeer::PIPeer(const PIString & name): PIObject() {
@@ -29,6 +29,7 @@ PIPeer::PIPeer(const PIString & name): PIObject() {
setName(name);
self_info.name = name_;
self_info.dist = 0;
eth_send = 0;
//joinMulticastGroup("239.240.241.242");
srand(uint(PITimer::elapsed_system_m()));
//id_ = name() + "_" + PIString::fromNumber(rand());
@@ -37,21 +38,24 @@ PIPeer::PIPeer(const PIString & name): PIObject() {
sl.removeAll("127.0.0.1"); sl << "127.0.0.1";
initMulticasts(sl);
initEths(sl);
//piCout << "Peer" << name_;
sendSelfInfo();
timer.addDelimiter(5);
timer.start(1000);
sendSelfInfo();
}
PIPeer::~PIPeer() {
piForeach (PIEthernet * i, mc_eths)
i->stopThreadedRead();
if (eth_send != 0)
eth_send->stopThreadedRead();
sendSelfRemove();
destroyMulticasts();
piForeach (PIEthernet * i, eths)
delete i;
eths.clear();
if (eth_send != 0)
delete eth_send;
}
@@ -88,6 +92,9 @@ void PIPeer::initEths(const PIStringList & al) {
}
}
}
eth_send = new PIEthernet();
eth_send->setParameters(0);
eth_send->setDebug(false);
}
@@ -116,6 +123,7 @@ void PIPeer::initMulticasts(const PIStringList & al) {
if (!ce->open()) continue;
if (is_main) if (!ce->joinMulticastGroup(_PIPEER_IP_MULTICAST)) continue;
//piCout << "mc binded to" << ce->path();
ce->setName(is_main ? "no_send" : "");
mc_eths << ce;
if (is_main || is_bc) {
if (is_main) rec_mc = true;
@@ -175,14 +183,6 @@ PIPeer::PeerInfo * PIPeer::quickestPeer(const PIString & to) {
}
bool PIPeer::sendToNeighbour(PIPeer::PeerInfo * peer, const PIByteArray & ba) {
if (peer->_neth == 0) return false;
//piCout << "send to" << peer->name << peer->_naddress << ba.size_s() << "bytes";
diag_d.sended(ba.size_s());
return peer->_neth->send(peer->_naddress, ba.data(), ba.size_s());
}
bool PIPeer::send(const PIString & to, const void * data, int size) {
PeerInfo * dp = quickestPeer(to);
if (dp == 0) {
@@ -193,6 +193,7 @@ bool PIPeer::send(const PIString & to, const void * data, int size) {
ba << int(4) << self_info.name << to << int(0) << size;
PIByteArray fmsg(data, size), cmsg;
int msg_count = (size - 1) / _PIPEER_MSG_SIZE + 1;
//piCout << "[PIPeer] send" << size << "bytes in" << msg_count << "packets ...";
for (int i = 0; i < msg_count; ++i) {
int csize = (i == msg_count - 1) ? ((size - 1) % _PIPEER_MSG_SIZE + 1) : _PIPEER_MSG_SIZE;
cmsg.clear();
@@ -201,17 +202,23 @@ bool PIPeer::send(const PIString & to, const void * data, int size) {
cmsg.append(fmsg.data(i * _PIPEER_MSG_SIZE), csize);
if (!sendToNeighbour(dp, cmsg)) return false;
}
//piCout << "[PIPeer] send" << size << "bytes ok";
return true;
}
bool PIPeer::dataRead(uchar * readed, int size) {
diag_d.received(size);
if (size < 16) return true;
PIByteArray ba(readed, size), sba;
int type, cnt, rec_size;
PIString from, to;
ba >> type >> from >> to >> cnt >> rec_size;
//piCout << "[PIPeer \"" + name_ + "\"] Received packet" << type << from << to << cnt << rec_size;
ba >> type;
//piCout << "[PIPeer \"" + name_ + "\"] Received packet" << type;
if (type != 4) return true;
PIMutexLocker locker(eth_mutex);
diag_d.received(size);
ba >> from >> to >> cnt >> rec_size;
//piCout << "[PIPeer \"" + name_ + "\"] Received packet" << /*type << from << to << cnt <<*/ rec_size;
if (type == 4) { // data packet
if (to == self_info.name) { // my packet
int msg_count, cmsg;
@@ -256,17 +263,20 @@ bool PIPeer::dataRead(uchar * readed, int size) {
bool PIPeer::multicastRead(uchar * data, int size) {
if (size < 8) return true;
int type;
PIByteArray ba(data, size);
ba >> type;
if (type <= 0 || type >= 4) return true;
PeerInfo pi;
PIVector<PeerInfo> rpeers;
ba >> pi.name;
//piCout << "read type" << type << "from" << pi.name;
if (pi.name == name_) return true;
PIMutexLocker locker(mc_mutex);
diag_s.received(size);
int header;
PeerInfo pi;
PIByteArray ba(data, size);
PIVector<PeerInfo> rpeers;
ba >> header >> pi.name;
//piCout << "read type" << header << "from" << pi.name;
if (pi.name == name_) return true;
//piCout << "analyz ...";
switch (header) {
switch (type) {
case 1: // new peer accepted
//piCout << "new peer packet ...";
if (hasPeer(pi.name)) break;
@@ -346,17 +356,31 @@ bool PIPeer::multicastRead(uchar * data, int size) {
}
bool PIPeer::sendToNeighbour(PIPeer::PeerInfo * peer, const PIByteArray & ba) {
if (peer->_neth == 0) return false;
//piCout << "[PIPeer] sendToNeighbour" << (eth_send->readAddress()) << peer->_naddress << ba.size_s() << "bytes ...";
//bool ok = peer->_neth->send(peer->_naddress, ba.data(), ba.size_s());
bool ok = eth_send->send(peer->_naddress, ba.data(), ba.size_s());
//piCout << "[PIPeer] sendToNeighbour" << (ok ? "ok" : "fail");
if (ok) diag_d.sended(ba.size_s());
return ok;
}
void PIPeer::sendMulticast(const PIByteArray & ba) {
//piCout << "send muticast ...";
piForeach (PIEthernet * e, mc_eths) {
if (e->name() == "no_send") continue;
for (int p = _PIPEER_PORT_SYNC_START; p < _PIPEER_PORT_SYNC_END; ++p) {
e->setSendPort(p);
//errorClear();
//piCout << "send to" << e->sendAddress() << e->send(ba);
//piCout << "send to" << e->path() << e->sendAddress();// << e->send(ba);
//piCout << PIEthernet::ethErrorString();
e->send(ba);
diag_s.sended(ba.size_s());
}
}
//piCout << "send muticast ok";
}