PIPeer ported to data transfer

git-svn-id: svn://db.shs.com.ru/pip@46 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5
This commit is contained in:
2015-03-30 13:44:55 +00:00
parent 0cb2b20d2e
commit c87f73207d
5 changed files with 143 additions and 55 deletions

View File

@@ -20,6 +20,7 @@
#include "pipeer.h"
#define _PIPEER_MSG_SIZE 8000
#define _PIPEER_MSG_TTL 100
#define _PIPEER_MULTICAST_TTL 4
#define _PIPEER_MULTICAST_IP "232.13.3.12"
#define _PIPEER_LOOPBACK_PORT_S 13313
@@ -30,6 +31,31 @@
#define _PIPEER_TRAFFIC_PORT_E 14000
PIPeer::PeerData::~PeerData() {
//if (t) piCout << "delete" << t;
if (t) delete t;
if (dt_in) delete dt_in;
if (dt_out) delete dt_out;
t = 0;
dt_in = dt_out = 0;
}
void PIPeer::PeerData::initDT(const PIString & name) {
if (t == 0) t = new PIThread();
if (dt_in == 0) dt_in = new PIDataTransfer();
if (dt_out == 0) dt_out = new PIDataTransfer();
//piCout << "new" << t;
dt_in->setPacketSize(_PIPEER_MSG_SIZE);
dt_out->setPacketSize(_PIPEER_MSG_SIZE);
t->setName(name);
dt_in->setName(name);
dt_out->setName(name);
}
PIPeer::PeerInfo::Address::Address(const PIString & a, const PIString & m): address(a), netmask(m) {
ping = -1.;
wait_ping = false;
@@ -110,6 +136,43 @@ void PIPeer::timerEvent(void * data, int delim) {
}
void PIPeer::dtSendRequest(uchar type, PIByteArray & data) {
PIObject * e = emitter();
//piCoutObj << "send request" << type << data.size_s();
if (!e) return;
data.push_front(uchar(2));
sendInternal(e->name(), data);
}
void PIPeer::dtReceiveFinished(bool ok) {
if (!ok) return;
PIDataTransfer * e = (PIDataTransfer*)emitter();
if (!e) return;
dataReceived(e->name(), e->data());
dataReceivedEvent(e->name(), e->data());
}
void PIPeer::dtThread() {
PIObject * e = emitter();
//piCoutObj << "send thread";
if (!e) return;
peers_mutex.lock();
PeerInfo * dp = quickestPeer(e->name());
if (dp == 0) {
peers_mutex.unlock();
return;
}
PIDataTransfer * dt = dp->_data.dt_out;
PIByteArray sd(dp->_data.data);
peers_mutex.unlock();
//piCoutObj << "send DT ...";
dt->send(sd);
//piCoutObj << "send DT done";
}
void PIPeer::initEths(PIStringList al) {
PIEthernet * ce;
PIEthernet::InterfaceList il = PIEthernet::interfaces();
@@ -234,6 +297,26 @@ PIPeer::PeerInfo * PIPeer::quickestPeer(const PIString & to) {
bool PIPeer::send(const PIString & to, const void * data, int size) {
PIByteArray ba(data, size);
if (ba.size_s() <= _PIPEER_MSG_SIZE) {
ba.insert(0, uchar(1));
return sendInternal(to, ba);
} else {
//ba.insert(0, uchar(2));
PIMutexLocker mlocker(peers_mutex);
PeerInfo * dp = quickestPeer(to);
if (!dp) return false;
if (dp->_data.t->isRunning()) return false;
dp->_data.data = ba;
//piCoutObj << "start thread ...";
dp->_data.t->startOnce();
}
//piCout << "[PIPeer] send" << size << "bytes ok";
return true;
}
bool PIPeer::sendInternal(const PIString & to, const PIByteArray & data) {
PIMutexLocker mlocker(peers_mutex);
PeerInfo * dp = quickestPeer(to);
if (dp == 0) {
@@ -241,25 +324,16 @@ bool PIPeer::send(const PIString & to, const void * data, int size) {
return false;
}
PIByteArray ba;
ba << int(4) << self_info.name << to << int(0) << size;
PIByteArray fmsg(data, size), cmsg;
int msg_count = (size - 1) / _PIPEER_MSG_SIZE + 1;
//piCout << "[PIPeer] send" << size << "bytes in" << msg_count << "packets ...";
for (int i = 0; i < msg_count; ++i) {
int csize = (i == msg_count - 1) ? ((size - 1) % _PIPEER_MSG_SIZE + 1) : _PIPEER_MSG_SIZE;
cmsg = ba;
cmsg << msg_count << i;
cmsg.append(fmsg.data(i * _PIPEER_MSG_SIZE), csize);
if (!sendToNeighbour(dp, cmsg)) return false;
}
//piCout << "[PIPeer] send" << size << "bytes ok";
ba << int(4) << self_info.name << to << int(0) << data;
//piCoutObj << "sendInternal" << to << data.size_s() << int(data.front());
if (!sendToNeighbour(dp, ba)) return false;
return true;
}
bool PIPeer::dataRead(uchar * readed, int size) {
if (size < 16) return true;
PIByteArray ba(readed, size), sba;
PIByteArray ba(readed, size), sba, pba;
int type, cnt, rec_size;
PIString from, to;
ba >> type;
@@ -313,40 +387,36 @@ bool PIPeer::dataRead(uchar * readed, int size) {
}
if (type != 4) return true;
diag_d.received(size);
ba >> from >> to >> cnt >> rec_size;
piCoutObj << "Received packet" << type << from << to << cnt << rec_size;
ba >> from >> to >> cnt >> pba;
//piCoutObj << "Received packet" << type << from << to << pba.size_s();
if (type == 4) { // data packet
if (to == self_info.name) { // my packet
int msg_count, cmsg;
ba >> msg_count >> cmsg;
piCoutObj << "Received packet" << type << from << to << cnt << rec_size << msg_count << cmsg;
if (cmsg == 0 && msg_count == 1) {
dataReceived(from, ba);
dataReceivedEvent(from, ba);
return true;
}
uchar pt = pba.take_front();
//piCoutObj << "Received packet" << type << from << to << int(pt) << pba.size_s();
peers_mutex.lock();
PeerInfo * fp = const_cast<PeerInfo * >(getPeerByName(from));
if (fp == 0) {
peers_mutex.unlock();
return true;
}
PeerData & pd(fp->_data);
if (cmsg == 0) {
piCoutObj << "Packet clear" << rec_size;
pd.clear();
pd.msg_count = msg_count;
if (pt == 1) {
peers_mutex.unlock();
dataReceived(from, pba);
dataReceivedEvent(from, pba);
return true;
}
if (pt == 2 || pt == 3) {
PIDataTransfer * dt = 0;
if (pt == 3)
dt = fp->_data.dt_in;
if (pt == 2)
dt = fp->_data.dt_out;
peers_mutex.unlock();
//piCoutObj << "DT received" << pba.size_s();
dt->received(pba);
return true;
}
piCoutObj << "Packet add" << cmsg << ba.size_s();
pd.addData(ba);
bool frec = pd.isFullReceived();
PIByteArray rba(pd.data);
peers_mutex.unlock();
if (frec) {
dataReceived(from, rba);
dataReceivedEvent(from, rba);
piCoutObj << "Packet received" << pd.data.size_s();
}
return true;
}
PIMutexLocker plocker(peers_mutex);
@@ -356,9 +426,8 @@ bool PIPeer::dataRead(uchar * readed, int size) {
return true;
}
cnt++;
if (cnt > 100 || from == dp->name) return true;
sba << type << from << to << cnt << rec_size;
sba.append(ba);
if (cnt > _PIPEER_MSG_TTL || from == dp->name) return true;
sba << type << from << to << cnt << pba;
//piCoutObj << "Translate data packet" << type << from << to << cnt << rec_size;
sendToNeighbour(dp, sba);
}
@@ -394,7 +463,7 @@ bool PIPeer::mbcastRead(uchar * data, int size) {
self_info.addNeighbour(pi.name);
}
pi.resetPing();
peers << pi;
addPeer(pi);
buildMap();
//piCoutObj << "new peer \"" << pi.name << "\"" << " dist " << pi.dist;
pi.dist++;
@@ -483,7 +552,7 @@ bool PIPeer::mbcastRead(uchar * data, int size) {
if (exist || isRemoved(rpeer)) continue;
rpeer.dist++;
rpeer.resetPing();
peers << rpeer;
addPeer(rpeer);
ch = true;
peerConnected(rpeer.name);
peerConnectedEvent(rpeer.name);
@@ -553,6 +622,18 @@ void PIPeer::removeNeighbour(const PIString & name) {
}
void PIPeer::addPeer(const PIPeer::PeerInfo & pd) {
peers << pd;
PeerData & p(peers.back()._data);
p.initDT(pd.name);
CONNECTU(p.dt_in, sendRequest, this, dtSendRequestIn)
CONNECTU(p.dt_out, sendRequest, this, dtSendRequestOut)
CONNECTU(p.dt_in, receiveFinished, this, dtReceiveFinished)
CONNECTU(p.dt_out, receiveFinished, this, dtReceiveFinished)
CONNECTU(p.t, started, this, dtThread)
}
bool PIPeer::removePeer(const PIString & name) {
for (int i = 0; i < peers.size_s(); ++i)
if (peers[i].name == name) {