PIPeer data transfers fixnsfer
git-svn-id: svn://db.shs.com.ru/pip@47 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5
This commit is contained in:
@@ -31,26 +31,40 @@
|
||||
#define _PIPEER_TRAFFIC_PORT_E 14000
|
||||
|
||||
|
||||
PIPeer::PeerData::~PeerData() {
|
||||
//if (t) piCout << "delete" << t;
|
||||
if (t) delete t;
|
||||
if (dt_in) delete dt_in;
|
||||
if (dt_out) delete dt_out;
|
||||
t = 0;
|
||||
dt_in = dt_out = 0;
|
||||
PIPeer::PeerData::PeerData(const PIString & n): PIObject(n) {
|
||||
dt_in.setPacketSize(_PIPEER_MSG_SIZE);
|
||||
dt_out.setPacketSize(_PIPEER_MSG_SIZE);
|
||||
CONNECTU(&dt_in, sendRequest, this, dtSendRequestIn)
|
||||
CONNECTU(&dt_out, sendRequest, this, dtSendRequestOut)
|
||||
CONNECTU(&dt_in, receiveFinished, this, dtReceiveFinishedIn)
|
||||
CONNECTU(&dt_out, receiveFinished, this, dtReceiveFinishedOut)
|
||||
CONNECTU(&t, started, this, dtThread)
|
||||
}
|
||||
|
||||
|
||||
void PIPeer::PeerData::initDT(const PIString & name) {
|
||||
if (t == 0) t = new PIThread();
|
||||
if (dt_in == 0) dt_in = new PIDataTransfer();
|
||||
if (dt_out == 0) dt_out = new PIDataTransfer();
|
||||
//piCout << "new" << t;
|
||||
dt_in->setPacketSize(_PIPEER_MSG_SIZE);
|
||||
dt_out->setPacketSize(_PIPEER_MSG_SIZE);
|
||||
t->setName(name);
|
||||
dt_in->setName(name);
|
||||
dt_out->setName(name);
|
||||
void PIPeer::PeerData::dtThread() {
|
||||
piCoutObj << "send DT ...";
|
||||
dt_out.send(data);
|
||||
piCoutObj << "send DT done";
|
||||
}
|
||||
|
||||
|
||||
bool PIPeer::PeerData::send(const PIByteArray & d) {
|
||||
if (t.isRunning()) return false;
|
||||
data = d;
|
||||
t.startOnce();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void PIPeer::PeerData::receivedPacket(uchar type, const PIByteArray & d) {
|
||||
PIDataTransfer * dt = 0;
|
||||
if (type == 3)
|
||||
dt = &dt_in;
|
||||
if (type == 2)
|
||||
dt = &dt_out;
|
||||
piCoutObj << "DT received" << int(type) << d.size_s();
|
||||
if (dt) dt->received(d);
|
||||
}
|
||||
|
||||
|
||||
@@ -74,6 +88,17 @@ int PIPeer::PeerInfo::ping() const {
|
||||
}
|
||||
|
||||
|
||||
void PIPeer::PeerInfo::init() {
|
||||
if (_data == 0) _data = new PeerData(name);
|
||||
}
|
||||
|
||||
|
||||
void PIPeer::PeerInfo::destroy() {
|
||||
if (_data) delete _data;
|
||||
_data = 0;
|
||||
}
|
||||
|
||||
|
||||
PIString PIPeer::PeerInfo::fastestAddress() const {
|
||||
double mp = -1.;
|
||||
PIString ret;
|
||||
@@ -136,43 +161,6 @@ void PIPeer::timerEvent(void * data, int delim) {
|
||||
}
|
||||
|
||||
|
||||
void PIPeer::dtSendRequest(uchar type, PIByteArray & data) {
|
||||
PIObject * e = emitter();
|
||||
//piCoutObj << "send request" << type << data.size_s();
|
||||
if (!e) return;
|
||||
data.push_front(uchar(2));
|
||||
sendInternal(e->name(), data);
|
||||
}
|
||||
|
||||
|
||||
void PIPeer::dtReceiveFinished(bool ok) {
|
||||
if (!ok) return;
|
||||
PIDataTransfer * e = (PIDataTransfer*)emitter();
|
||||
if (!e) return;
|
||||
dataReceived(e->name(), e->data());
|
||||
dataReceivedEvent(e->name(), e->data());
|
||||
}
|
||||
|
||||
|
||||
void PIPeer::dtThread() {
|
||||
PIObject * e = emitter();
|
||||
//piCoutObj << "send thread";
|
||||
if (!e) return;
|
||||
peers_mutex.lock();
|
||||
PeerInfo * dp = quickestPeer(e->name());
|
||||
if (dp == 0) {
|
||||
peers_mutex.unlock();
|
||||
return;
|
||||
}
|
||||
PIDataTransfer * dt = dp->_data.dt_out;
|
||||
PIByteArray sd(dp->_data.data);
|
||||
peers_mutex.unlock();
|
||||
//piCoutObj << "send DT ...";
|
||||
dt->send(sd);
|
||||
//piCoutObj << "send DT done";
|
||||
}
|
||||
|
||||
|
||||
void PIPeer::initEths(PIStringList al) {
|
||||
PIEthernet * ce;
|
||||
PIEthernet::InterfaceList il = PIEthernet::interfaces();
|
||||
@@ -306,10 +294,7 @@ bool PIPeer::send(const PIString & to, const void * data, int size) {
|
||||
PIMutexLocker mlocker(peers_mutex);
|
||||
PeerInfo * dp = quickestPeer(to);
|
||||
if (!dp) return false;
|
||||
if (dp->_data.t->isRunning()) return false;
|
||||
dp->_data.data = ba;
|
||||
//piCoutObj << "start thread ...";
|
||||
dp->_data.t->startOnce();
|
||||
return dp->_data->send(ba);
|
||||
}
|
||||
//piCout << "[PIPeer] send" << size << "bytes ok";
|
||||
return true;
|
||||
@@ -331,6 +316,12 @@ bool PIPeer::sendInternal(const PIString & to, const PIByteArray & data) {
|
||||
}
|
||||
|
||||
|
||||
void PIPeer::dtReceived(const PIString & from, const PIByteArray & data) {
|
||||
dataReceived(from, data);
|
||||
dataReceivedEvent(from, data);
|
||||
}
|
||||
|
||||
|
||||
bool PIPeer::dataRead(uchar * readed, int size) {
|
||||
if (size < 16) return true;
|
||||
PIByteArray ba(readed, size), sba, pba;
|
||||
@@ -406,14 +397,9 @@ bool PIPeer::dataRead(uchar * readed, int size) {
|
||||
return true;
|
||||
}
|
||||
if (pt == 2 || pt == 3) {
|
||||
PIDataTransfer * dt = 0;
|
||||
if (pt == 3)
|
||||
dt = fp->_data.dt_in;
|
||||
if (pt == 2)
|
||||
dt = fp->_data.dt_out;
|
||||
peers_mutex.unlock();
|
||||
//piCoutObj << "DT received" << pba.size_s();
|
||||
dt->received(pba);
|
||||
if (fp->_data)
|
||||
fp->_data->receivedPacket(pt, pba);
|
||||
return true;
|
||||
}
|
||||
peers_mutex.unlock();
|
||||
@@ -624,19 +610,17 @@ void PIPeer::removeNeighbour(const PIString & name) {
|
||||
|
||||
void PIPeer::addPeer(const PIPeer::PeerInfo & pd) {
|
||||
peers << pd;
|
||||
PeerData & p(peers.back()._data);
|
||||
p.initDT(pd.name);
|
||||
CONNECTU(p.dt_in, sendRequest, this, dtSendRequestIn)
|
||||
CONNECTU(p.dt_out, sendRequest, this, dtSendRequestOut)
|
||||
CONNECTU(p.dt_in, receiveFinished, this, dtReceiveFinished)
|
||||
CONNECTU(p.dt_out, receiveFinished, this, dtReceiveFinished)
|
||||
CONNECTU(p.t, started, this, dtThread)
|
||||
PeerInfo & p(peers.back());
|
||||
p.init();
|
||||
CONNECTU(p._data, sendRequest, this, sendInternal)
|
||||
CONNECTU(p._data, received, this, dtReceived)
|
||||
}
|
||||
|
||||
|
||||
bool PIPeer::removePeer(const PIString & name) {
|
||||
for (int i = 0; i < peers.size_s(); ++i)
|
||||
if (peers[i].name == name) {
|
||||
peers[i].destroy();
|
||||
peers.remove(i);
|
||||
return true;
|
||||
}
|
||||
@@ -694,6 +678,7 @@ void PIPeer::syncPeers() {
|
||||
if (cp.sync > 3) {
|
||||
pn = cp.name;
|
||||
//piCoutObj << "sync: remove " << pn;
|
||||
cp.destroy();
|
||||
addToRemoved(cp);
|
||||
peers.remove(i);
|
||||
sendPeerRemove(pn);
|
||||
|
||||
Reference in New Issue
Block a user