git-svn-id: svn://db.shs.com.ru/pip@226 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5
This commit is contained in:
@@ -133,6 +133,8 @@ PIPeer::PIPeer(const PIString & name_): PIObject() {
|
||||
destroyed = false;
|
||||
setName(name_);
|
||||
self_info.name = name_;
|
||||
diag_d.setName(name_+"_data");
|
||||
diag_s.setName(name_+"_service");
|
||||
self_info.dist = 0;
|
||||
self_info.time = PISystemTime::current();
|
||||
//joinMulticastGroup("239.240.241.242");
|
||||
@@ -140,6 +142,7 @@ PIPeer::PIPeer(const PIString & name_): PIObject() {
|
||||
//id_ = self_info.name + "_" + PIString::fromNumber(rand());
|
||||
CONNECTU(&timer, tickEvent, this, timerEvent);
|
||||
prev_ifaces = PIEthernet::interfaces();
|
||||
no_timer = false;
|
||||
initNetwork();
|
||||
sendSelfInfo();
|
||||
timer.addDelimiter(5);
|
||||
@@ -174,6 +177,8 @@ PIPeer::~PIPeer() {
|
||||
|
||||
|
||||
void PIPeer::timerEvent(void * data, int delim) {
|
||||
// piCoutObj << "timerEvent" << delim;
|
||||
if (no_timer) return;
|
||||
switch (delim) {
|
||||
case 1: // every 1 s
|
||||
syncPeers();
|
||||
@@ -187,7 +192,7 @@ void PIPeer::timerEvent(void * data, int delim) {
|
||||
|
||||
|
||||
void PIPeer::initEths(PIStringList al) {
|
||||
piCoutObj << "initEths start";
|
||||
// piCoutObj << "initEths start";
|
||||
PIEthernet * ce;
|
||||
const PIEthernet::Interface * cint = 0;
|
||||
piForeachC (PIString & a, al) {
|
||||
@@ -204,8 +209,8 @@ void PIPeer::initEths(PIStringList al) {
|
||||
self_info.addresses << PeerInfo::Address(ce->path(), cint == 0 ? "255.255.255.0" : cint->netmask);
|
||||
CONNECTU(ce, threadedReadEvent, this, dataRead);
|
||||
ce->startThreadedRead();
|
||||
//piCout << "dc binded to" << ce->path();
|
||||
//piCout << "add eth" << ta;
|
||||
// piCoutObj << "dc binded to" << ce->path();
|
||||
// piCoutObj << "add eth" << a;
|
||||
ok = true;
|
||||
break;
|
||||
}
|
||||
@@ -215,49 +220,61 @@ void PIPeer::initEths(PIStringList al) {
|
||||
eth_send.setDebug(false);
|
||||
eth_send.setName("__S__PIPeer_traffic_eth_send");
|
||||
eth_send.setParameters(0);
|
||||
piCoutObj << "initEths ok";
|
||||
// piCoutObj << "initEths ok";
|
||||
}
|
||||
|
||||
|
||||
void PIPeer::initMBcasts(PIStringList al) {
|
||||
piCoutObj << "initMBcasts start";
|
||||
destroyMBcasts();
|
||||
// destroyMBcasts();
|
||||
PIEthernet * ce;
|
||||
const PIEthernet::Interface * cint;
|
||||
PIString nm;
|
||||
al << _PIPEER_MULTICAST_IP;
|
||||
// piCoutObj << "initMBcasts start" << al;
|
||||
piForeachC (PIString & a, al) {
|
||||
//piCout << "mcast try" << a;
|
||||
ce = new PIEthernet();
|
||||
ce->setDebug(false);
|
||||
// ce->setDebug(false);
|
||||
ce->setName("__S__PIPeer_mcast_eth_" + a);
|
||||
ce->setParameters(0);
|
||||
ce->setSendAddress(_PIPEER_MULTICAST_IP, _PIPEER_MULTICAST_PORT);
|
||||
ce->setReadAddress(a, _PIPEER_MULTICAST_PORT);
|
||||
ce->setMulticastTTL(_PIPEER_MULTICAST_TTL);
|
||||
ce->joinMulticastGroup(_PIPEER_MULTICAST_IP);
|
||||
eths_mcast << ce;
|
||||
CONNECTU(ce, threadedReadEvent, this, mbcastRead);
|
||||
ce->startThreadedRead();
|
||||
if (ce->open()) {
|
||||
eths_mcast << ce;
|
||||
CONNECTU(ce, threadedReadEvent, this, mbcastRead);
|
||||
ce->startThreadedRead();
|
||||
// piCout << "mcast bind to" << a << ce->sendIP();
|
||||
} else {
|
||||
delete ce;
|
||||
piCoutObj << "invalid address for mcast" << a;
|
||||
}
|
||||
}
|
||||
al.removeAll(_PIPEER_MULTICAST_IP);
|
||||
piForeachC (PIString & a, al) {
|
||||
ce = new PIEthernet();
|
||||
ce->setDebug(false);
|
||||
// ce->setDebug(false);
|
||||
ce->setName("__S__PIPeer_bcast_eth_" + a);
|
||||
ce->setParameters(PIEthernet::Broadcast);
|
||||
cint = prev_ifaces.getByAddress(a);
|
||||
nm = (cint == 0) ? "255.255.255.0" : cint->netmask;
|
||||
ce->setSendAddress(PIEthernet::getBroadcast(a, nm), _PIPEER_BROADCAST_PORT);
|
||||
ce->setReadAddress(a, _PIPEER_BROADCAST_PORT);
|
||||
//piCout << "mc BC try" << a << nm << ce->sendIP();
|
||||
//piCout << "bcast try" << a << nm;
|
||||
eths_bcast << ce;
|
||||
CONNECTU(ce, threadedReadEvent, this, mbcastRead);
|
||||
ce->startThreadedRead();
|
||||
if (ce->open()) {
|
||||
eths_bcast << ce;
|
||||
CONNECTU(ce, threadedReadEvent, this, mbcastRead);
|
||||
ce->startThreadedRead();
|
||||
// piCout << "mc BC try" << a << nm << ce->sendIP();
|
||||
// piCout << "bcast bind to" << a << nm;
|
||||
} else {
|
||||
delete ce;
|
||||
piCoutObj << "invalid address for bcast" << a;
|
||||
}
|
||||
}
|
||||
eth_lo.setDebug(false);
|
||||
// eth_lo.setDebug(false);
|
||||
eth_lo.setName("__S__PIPeer_eth_loopback");
|
||||
eth_lo.setParameters(0);
|
||||
eth_lo.setParameters(PIEthernet::SeparateSockets);
|
||||
eth_lo.init();
|
||||
cint = prev_ifaces.getByAddress("127.0.0.1");
|
||||
for (int p = _PIPEER_LOOPBACK_PORT_S; p <= _PIPEER_LOOPBACK_PORT_E; ++p) {
|
||||
@@ -266,19 +283,20 @@ void PIPeer::initMBcasts(PIStringList al) {
|
||||
eth_lo.setSendIP("127.0.0.1");
|
||||
CONNECTU(ð_lo, threadedReadEvent, this, mbcastRead);
|
||||
eth_lo.startThreadedRead();
|
||||
//piCout << "lo binded to" << eth_lo.readAddress();
|
||||
// piCout << "lo binded to" << eth_lo.readAddress() << eth_lo.sendAddress();
|
||||
//piCout << "add eth" << ta;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (eths_mcast.isEmpty()) piCoutObj << "Warning! Can`t find suitable network interface for multicast receive, check for exists at least one interface with multicasting enabled!";
|
||||
if (eths_bcast.isEmpty()) piCoutObj << "Warning! Can`t find suitable network interface for broadcast receive, check for exists at least one interface with broadcasting enabled!";
|
||||
piCoutObj << "initMBcasts ok";
|
||||
if (eths_mcast.isEmpty() && eths_bcast.isEmpty() && !eth_lo.isOpened()) piCoutObj << "Warning! Can`t find suitable network interface for multicast receive, check for exists at least one interface with multicasting enabled!";
|
||||
// piCoutObj << "initMBcasts ok";
|
||||
}
|
||||
|
||||
|
||||
void PIPeer::destroyEths() {
|
||||
piForeach (PIEthernet * i, eths_traffic) {
|
||||
((PIThread*)i)->stop();
|
||||
((PIThread*)i)->waitForFinish(100);
|
||||
i->stopThreadedRead();
|
||||
i->close();
|
||||
delete i;
|
||||
@@ -289,16 +307,22 @@ void PIPeer::destroyEths() {
|
||||
|
||||
void PIPeer::destroyMBcasts() {
|
||||
piForeach (PIEthernet * i, eths_mcast) {
|
||||
((PIThread*)i)->stop();
|
||||
((PIThread*)i)->waitForFinish(100);
|
||||
i->stopThreadedRead();
|
||||
i->leaveMulticastGroup(_PIPEER_MULTICAST_IP);
|
||||
i->close();
|
||||
delete i;
|
||||
}
|
||||
piForeach (PIEthernet * i, eths_bcast) {
|
||||
((PIThread*)i)->stop();
|
||||
((PIThread*)i)->waitForFinish(100);
|
||||
i->stopThreadedRead();
|
||||
i->close();
|
||||
delete i;
|
||||
}
|
||||
((PIThread*)ð_lo)->stop();
|
||||
((PIThread*)ð_lo)->waitForFinish(100);
|
||||
eth_lo.stopThreadedRead();
|
||||
eth_lo.close();
|
||||
eths_mcast.clear();
|
||||
@@ -365,7 +389,10 @@ void PIPeer::dtReceived(const PIString & from, const PIByteArray & data) {
|
||||
|
||||
|
||||
bool PIPeer::dataRead(uchar * readed, int size) {
|
||||
if (destroyed) return true;
|
||||
if (destroyed) {
|
||||
piCout << "[PIPeer] SegFault";
|
||||
return true;
|
||||
}
|
||||
if (size < 16) return true;
|
||||
PIByteArray ba(readed, size), sba, pba;
|
||||
int type, cnt;
|
||||
@@ -467,7 +494,10 @@ bool PIPeer::dataRead(uchar * readed, int size) {
|
||||
|
||||
|
||||
bool PIPeer::mbcastRead(uchar * data, int size) {
|
||||
if (destroyed) return true;
|
||||
if (destroyed) {
|
||||
piCout << "[PIPeer] SegFault";
|
||||
return true;
|
||||
}
|
||||
if (size < 8) return true;
|
||||
int type, dist;
|
||||
PIByteArray ba(data, size);
|
||||
@@ -475,7 +505,7 @@ bool PIPeer::mbcastRead(uchar * data, int size) {
|
||||
if (type <= 0 || type >= 4) return true;
|
||||
PeerInfo pi;
|
||||
ba >> pi.name;
|
||||
//piCoutObj << "received mb from" << pi.name << "packet" << type;
|
||||
// piCoutObj << "received mb from" << pi.name << "packet" << type;
|
||||
if (pi.name == self_info.name) return true;
|
||||
PIMutexLocker locker(mc_mutex);
|
||||
diag_s.received(size);
|
||||
@@ -625,7 +655,8 @@ bool PIPeer::sendToNeighbour(PIPeer::PeerInfo * peer, const PIByteArray & ba) {
|
||||
|
||||
|
||||
void PIPeer::sendMBcast(const PIByteArray & ba) {
|
||||
//piCout << "sendMBcast" << ba.size() << "bytes ...";
|
||||
send_mc_mutex.lock();
|
||||
// piCout << "sendMBcast" << ba.size() << "bytes ...";
|
||||
piForeach (PIEthernet * e, eths_mcast) {
|
||||
//errorClear();
|
||||
//piCout << "send to" << e->path() << e->sendAddress();// << e->send(ba);
|
||||
@@ -647,7 +678,8 @@ void PIPeer::sendMBcast(const PIByteArray & ba) {
|
||||
if (eth_lo.send(ba))
|
||||
diag_s.sended(ba.size_s());
|
||||
}
|
||||
//piCout << "send muticast ok";
|
||||
// piCout << "send muticast ok";
|
||||
send_mc_mutex.unlock();
|
||||
}
|
||||
|
||||
|
||||
@@ -774,17 +806,23 @@ void PIPeer::checkNetwork() {
|
||||
|
||||
|
||||
void PIPeer::reinit() {
|
||||
no_timer = true;
|
||||
// timer.stop();
|
||||
// timer.clearDelimiters();
|
||||
PIMutexLocker mbl(mc_mutex);
|
||||
PIMutexLocker ethl(eth_mutex);
|
||||
PIMutexLocker pl(peers_mutex);
|
||||
PIMutexLocker sl(send_mutex);
|
||||
initNetwork();
|
||||
eth_send.close();
|
||||
eth_lo.stopThreadedRead();
|
||||
eth_lo.close();
|
||||
eth_send.init();
|
||||
eth_send.open();
|
||||
eth_lo.startThreadedRead();
|
||||
// eth_send.close();
|
||||
// eth_lo.stopThreadedRead();
|
||||
// eth_lo.close();
|
||||
// eth_send.init();
|
||||
// eth_send.open();
|
||||
// eth_lo.startThreadedRead();
|
||||
// timer.addDelimiter(5);
|
||||
// timer.start(1000);
|
||||
no_timer = false;
|
||||
}
|
||||
|
||||
|
||||
@@ -793,6 +831,7 @@ void PIPeer::initNetwork() {
|
||||
eth_send.init();
|
||||
destroyEths();
|
||||
destroyMBcasts();
|
||||
piMSleep(100);
|
||||
// piCoutObj << self_info.addresses.size();
|
||||
self_info.addresses.clear();
|
||||
PIStringList sl = PIEthernet::allAddresses();
|
||||
|
||||
Reference in New Issue
Block a user