diff --git a/src/io/pibasetransfer.cpp b/src/io/pibasetransfer.cpp index 7b983b0b..34850df9 100644 --- a/src/io/pibasetransfer.cpp +++ b/src/io/pibasetransfer.cpp @@ -38,7 +38,7 @@ void PIBaseTransfer::stopReceive() { } -void PIBaseTransfer::received(PIByteArray& data) { +void PIBaseTransfer::received(PIByteArray data) { packet_header_size = sizeof(PacketHeader) + customHeader().size(); // piCoutObj << "receive" << data.size(); if (data.size() < sizeof(PacketHeader)) return; diff --git a/src/io/pibasetransfer.h b/src/io/pibasetransfer.h index da99c416..15c961c8 100644 --- a/src/io/pibasetransfer.h +++ b/src/io/pibasetransfer.h @@ -53,7 +53,7 @@ public: EVENT(sendStarted) EVENT1(sendFinished, bool, ok) EVENT1(sendRequest, PIByteArray &, data) - EVENT_HANDLER1(void, received, PIByteArray &, data); + EVENT_HANDLER1(void, received, PIByteArray, data); protected: uint packet_header_size, part_header_size; diff --git a/src/io/pipeer.cpp b/src/io/pipeer.cpp index 0038eff0..5e660d1a 100755 --- a/src/io/pipeer.cpp +++ b/src/io/pipeer.cpp @@ -31,26 +31,40 @@ #define _PIPEER_TRAFFIC_PORT_E 14000 -PIPeer::PeerData::~PeerData() { - //if (t) piCout << "delete" << t; - if (t) delete t; - if (dt_in) delete dt_in; - if (dt_out) delete dt_out; - t = 0; - dt_in = dt_out = 0; +PIPeer::PeerData::PeerData(const PIString & n): PIObject(n) { + dt_in.setPacketSize(_PIPEER_MSG_SIZE); + dt_out.setPacketSize(_PIPEER_MSG_SIZE); + CONNECTU(&dt_in, sendRequest, this, dtSendRequestIn) + CONNECTU(&dt_out, sendRequest, this, dtSendRequestOut) + CONNECTU(&dt_in, receiveFinished, this, dtReceiveFinishedIn) + CONNECTU(&dt_out, receiveFinished, this, dtReceiveFinishedOut) + CONNECTU(&t, started, this, dtThread) } -void PIPeer::PeerData::initDT(const PIString & name) { - if (t == 0) t = new PIThread(); - if (dt_in == 0) dt_in = new PIDataTransfer(); - if (dt_out == 0) dt_out = new PIDataTransfer(); - //piCout << "new" << t; - dt_in->setPacketSize(_PIPEER_MSG_SIZE); - dt_out->setPacketSize(_PIPEER_MSG_SIZE); - t->setName(name); - dt_in->setName(name); - dt_out->setName(name); +void PIPeer::PeerData::dtThread() { + piCoutObj << "send DT ..."; + dt_out.send(data); + piCoutObj << "send DT done"; +} + + +bool PIPeer::PeerData::send(const PIByteArray & d) { + if (t.isRunning()) return false; + data = d; + t.startOnce(); + return true; +} + + +void PIPeer::PeerData::receivedPacket(uchar type, const PIByteArray & d) { + PIDataTransfer * dt = 0; + if (type == 3) + dt = &dt_in; + if (type == 2) + dt = &dt_out; + piCoutObj << "DT received" << int(type) << d.size_s(); + if (dt) dt->received(d); } @@ -74,6 +88,17 @@ int PIPeer::PeerInfo::ping() const { } +void PIPeer::PeerInfo::init() { + if (_data == 0) _data = new PeerData(name); +} + + +void PIPeer::PeerInfo::destroy() { + if (_data) delete _data; + _data = 0; +} + + PIString PIPeer::PeerInfo::fastestAddress() const { double mp = -1.; PIString ret; @@ -136,43 +161,6 @@ void PIPeer::timerEvent(void * data, int delim) { } -void PIPeer::dtSendRequest(uchar type, PIByteArray & data) { - PIObject * e = emitter(); - //piCoutObj << "send request" << type << data.size_s(); - if (!e) return; - data.push_front(uchar(2)); - sendInternal(e->name(), data); -} - - -void PIPeer::dtReceiveFinished(bool ok) { - if (!ok) return; - PIDataTransfer * e = (PIDataTransfer*)emitter(); - if (!e) return; - dataReceived(e->name(), e->data()); - dataReceivedEvent(e->name(), e->data()); -} - - -void PIPeer::dtThread() { - PIObject * e = emitter(); - //piCoutObj << "send thread"; - if (!e) return; - peers_mutex.lock(); - PeerInfo * dp = quickestPeer(e->name()); - if (dp == 0) { - peers_mutex.unlock(); - return; - } - PIDataTransfer * dt = dp->_data.dt_out; - PIByteArray sd(dp->_data.data); - peers_mutex.unlock(); - //piCoutObj << "send DT ..."; - dt->send(sd); - //piCoutObj << "send DT done"; -} - - void PIPeer::initEths(PIStringList al) { PIEthernet * ce; PIEthernet::InterfaceList il = PIEthernet::interfaces(); @@ -306,10 +294,7 @@ bool PIPeer::send(const PIString & to, const void * data, int size) { PIMutexLocker mlocker(peers_mutex); PeerInfo * dp = quickestPeer(to); if (!dp) return false; - if (dp->_data.t->isRunning()) return false; - dp->_data.data = ba; - //piCoutObj << "start thread ..."; - dp->_data.t->startOnce(); + return dp->_data->send(ba); } //piCout << "[PIPeer] send" << size << "bytes ok"; return true; @@ -331,6 +316,12 @@ bool PIPeer::sendInternal(const PIString & to, const PIByteArray & data) { } +void PIPeer::dtReceived(const PIString & from, const PIByteArray & data) { + dataReceived(from, data); + dataReceivedEvent(from, data); +} + + bool PIPeer::dataRead(uchar * readed, int size) { if (size < 16) return true; PIByteArray ba(readed, size), sba, pba; @@ -406,14 +397,9 @@ bool PIPeer::dataRead(uchar * readed, int size) { return true; } if (pt == 2 || pt == 3) { - PIDataTransfer * dt = 0; - if (pt == 3) - dt = fp->_data.dt_in; - if (pt == 2) - dt = fp->_data.dt_out; peers_mutex.unlock(); - //piCoutObj << "DT received" << pba.size_s(); - dt->received(pba); + if (fp->_data) + fp->_data->receivedPacket(pt, pba); return true; } peers_mutex.unlock(); @@ -624,19 +610,17 @@ void PIPeer::removeNeighbour(const PIString & name) { void PIPeer::addPeer(const PIPeer::PeerInfo & pd) { peers << pd; - PeerData & p(peers.back()._data); - p.initDT(pd.name); - CONNECTU(p.dt_in, sendRequest, this, dtSendRequestIn) - CONNECTU(p.dt_out, sendRequest, this, dtSendRequestOut) - CONNECTU(p.dt_in, receiveFinished, this, dtReceiveFinished) - CONNECTU(p.dt_out, receiveFinished, this, dtReceiveFinished) - CONNECTU(p.t, started, this, dtThread) + PeerInfo & p(peers.back()); + p.init(); + CONNECTU(p._data, sendRequest, this, sendInternal) + CONNECTU(p._data, received, this, dtReceived) } bool PIPeer::removePeer(const PIString & name) { for (int i = 0; i < peers.size_s(); ++i) if (peers[i].name == name) { + peers[i].destroy(); peers.remove(i); return true; } @@ -694,6 +678,7 @@ void PIPeer::syncPeers() { if (cp.sync > 3) { pn = cp.name; //piCoutObj << "sync: remove " << pn; + cp.destroy(); addToRemoved(cp); peers.remove(i); sendPeerRemove(pn); diff --git a/src/io/pipeer.h b/src/io/pipeer.h index 7be79cfc..ff52b275 100755 --- a/src/io/pipeer.h +++ b/src/io/pipeer.h @@ -31,15 +31,23 @@ class PIP_EXPORT PIPeer: public PIObject { PIOBJECT_SUBCLASS(PIPeer, PIObject) private: - struct PeerData { - explicit PeerData() {dt_in = dt_out = 0; t = 0;} - PeerData(const PeerData & ) {dt_in = dt_out = 0; t = 0;} - PeerData & operator =(const PeerData & ) {dt_in = dt_out = 0; t = 0; return *this;} - ~PeerData(); - void initDT(const PIString & name); + + class PeerData: public PIObject { + PIOBJECT_SUBCLASS(PeerData, PIObject) + public: + PeerData(const PIString & n); + EVENT_HANDLER1(void, dtSendRequestIn, PIByteArray &, data) {data.push_front(uchar(2)); sendRequest(name(), data);} + EVENT_HANDLER1(void, dtSendRequestOut, PIByteArray &, data) {data.push_front(uchar(3)); sendRequest(name(), data);} + EVENT_HANDLER1(void, dtReceiveFinishedIn, bool, ok) {if (ok) received(name(), dt_in.data());} + EVENT_HANDLER1(void, dtReceiveFinishedOut, bool, ok) {if (ok) received(name(), dt_out.data());} + EVENT_HANDLER(void, dtThread); + EVENT2(received, const PIString &, from, const PIByteArray &, data); + EVENT2(sendRequest, const PIString &, to, const PIByteArray &, data); + bool send(const PIByteArray & d); + void receivedPacket(uchar type, const PIByteArray & d); PIByteArray data; - PIThread * t; - PIDataTransfer * dt_in, * dt_out; + PIThread t; + PIDataTransfer dt_in, dt_out; }; public: @@ -51,7 +59,8 @@ public: friend PIByteArray & operator <<(PIByteArray & s, const PIPeer::PeerInfo & v); friend PIByteArray & operator >>(PIByteArray & s, PIPeer::PeerInfo & v); public: - PeerInfo() {dist = sync = cnt = 0; trace = -1; was_update = false;} + PeerInfo() {dist = sync = cnt = 0; trace = -1; was_update = false; _data = 0;} + ~PeerInfo() {} struct Address { Address(const PIString & a = PIString(), const PIString & m = "255.255.255.0"); @@ -78,13 +87,14 @@ public: void addNeighbours(const PIStringList & l) {piForeachC (PIString & n, l) if (!neighbours.contains(n)) neighbours << n;} void removeNeighbour(const PIString & n) {neighbours.removeAll(n);} void resetPing() {for (int i = 0; i < addresses.size_s(); ++i) addresses[i].ping = -1;} - + void init(); + void destroy(); PIString nearest_address; int sync, cnt, trace; bool was_update; PISystemTime time; - PeerData _data; + PeerData * _data; }; @@ -134,18 +144,14 @@ protected: private: EVENT_HANDLER2(void, timerEvent, void * , data, int, delim); - EVENT_HANDLER1(void, dtSendRequestIn, PIByteArray &, data) {dtSendRequest(2, data);} - EVENT_HANDLER1(void, dtSendRequestOut, PIByteArray &, data) {dtSendRequest(3, data);} - EVENT_HANDLER1(void, dtReceiveFinished, bool, ok); - EVENT_HANDLER(void, dtThread); + EVENT_HANDLER2(bool, sendInternal, const PIString &, to, const PIByteArray &, data); + EVENT_HANDLER2(void, dtReceived, const PIString &, from, const PIByteArray &, data); bool hasPeer(const PIString & name) {piForeachC (PeerInfo & i, peers) if (i.name == name) return true; return false;} bool removePeer(const PIString & name); void removeNeighbour(const PIString & name); void addPeer(const PeerInfo & pd); - bool sendInternal(const PIString & to, const PIByteArray & data); - void dtSendRequest(uchar type, PIByteArray & data); void sendPeerInfo(const PeerInfo & info); void sendPeerRemove(const PIString & peer); void sendSelfInfo() {sendPeerInfo(self_info);} diff --git a/utils/system_daemon/daemon.cpp b/utils/system_daemon/daemon.cpp index 21006ad5..4c1dc9b2 100644 --- a/utils/system_daemon/daemon.cpp +++ b/utils/system_daemon/daemon.cpp @@ -277,12 +277,12 @@ void Daemon::dataReceived(const PIString & from, const PIByteArray & data) { r = remotes.value(from); if (!r) break; ba >> dir; - r->dir.cd(dir); + r->dir_my.cd(dir); { - PIVector fil = r->dir.entries(); + PIVector fil = r->dir_my.entries(); piForeach (PIFile::FileInfo & f, fil) f.path = f.name(); - rba << int(ReplyChangeDir) << r->dir.absolutePath() << fil; + rba << int(ReplyChangeDir) << r->dir_my.absolutePath() << fil; } break; case ReplyHostInfo: @@ -296,7 +296,7 @@ void Daemon::dataReceived(const PIString & from, const PIByteArray & data) { { PIVector fil; ba >> dir >> fil; - r->dir.setDir(dir); + r->dir_remote.setDir(dir); fm.setRemoteDir(dir); fm.setRemoteContent(fil); fm.remoteRestoreDir(); diff --git a/utils/system_daemon/daemon.h b/utils/system_daemon/daemon.h index 511c2a41..fc9c5725 100644 --- a/utils/system_daemon/daemon.h +++ b/utils/system_daemon/daemon.h @@ -66,9 +66,9 @@ private: class Remote: public PIThread { public: - Remote(const PIString & n = PIString()) {dt.setName(n); ft.setName(n); dir = PIDir::current();} + Remote(const PIString & n = PIString()) {dt.setName(n); ft.setName(n); dir_my = PIDir::current();} void sendData(const PIByteArray & d) {_d = d; startOnce();} - PIDir dir; + PIDir dir_my, dir_remote; PIDataTransfer dt; PIFileTransfer ft; PIByteArray _d;