From c87f73207d8dc14490a56898e74cc9ed62fb6c75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=B5=D0=BB=D0=B8=D0=BF=D0=B5=D0=BD=D0=BA=D0=BE=20?= =?UTF-8?q?=D0=98=D0=B2=D0=B0=D0=BD?= Date: Mon, 30 Mar 2015 13:44:55 +0000 Subject: [PATCH] PIPeer ported to data transfer git-svn-id: svn://db.shs.com.ru/pip@46 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5 --- CMakeLists.txt | 2 +- main.cpp | 6 +- src/io/pipeer.cpp | 165 ++++++++++++++++++++++++++--------- src/io/pipeer.h | 23 +++-- utils/system_daemon/daemon.h | 2 +- 5 files changed, 143 insertions(+), 55 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4c4fe507..6c9e208e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ project(pip) cmake_minimum_required(VERSION 2.6) -#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g3") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g3") include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) include(CheckFunctionExists) diff --git a/main.cpp b/main.cpp index 87e1751c..22ae0260 100644 --- a/main.cpp +++ b/main.cpp @@ -133,7 +133,7 @@ public: void print() {piCout << "Child"; Parent::print();} }; -#include +//#include int main (int argc, char * argv[]) { PIString s, s2; s = "test"; @@ -157,12 +157,12 @@ int main (int argc, char * argv[]) { ba >> s2; PICout(0xFFFF) << s2 << s2.length(); return 0; - + /* hostent * he = 0; he = gethostbyname(argv[1]); piCout << he->h_name; piCout << he->h_aliases[0]; - return 0; + return 0;*/ diff --git a/src/io/pipeer.cpp b/src/io/pipeer.cpp index 081d1e41..0038eff0 100755 --- a/src/io/pipeer.cpp +++ b/src/io/pipeer.cpp @@ -20,6 +20,7 @@ #include "pipeer.h" #define _PIPEER_MSG_SIZE 8000 +#define _PIPEER_MSG_TTL 100 #define _PIPEER_MULTICAST_TTL 4 #define _PIPEER_MULTICAST_IP "232.13.3.12" #define _PIPEER_LOOPBACK_PORT_S 13313 @@ -30,6 +31,31 @@ #define _PIPEER_TRAFFIC_PORT_E 14000 +PIPeer::PeerData::~PeerData() { + //if (t) piCout << "delete" << t; + if (t) delete t; + if (dt_in) delete dt_in; + if (dt_out) delete dt_out; + t = 0; + dt_in = dt_out = 0; +} + + +void PIPeer::PeerData::initDT(const PIString & name) { + if (t == 0) t = new PIThread(); + if (dt_in == 0) dt_in = new PIDataTransfer(); + if (dt_out == 0) dt_out = new PIDataTransfer(); + //piCout << "new" << t; + dt_in->setPacketSize(_PIPEER_MSG_SIZE); + dt_out->setPacketSize(_PIPEER_MSG_SIZE); + t->setName(name); + dt_in->setName(name); + dt_out->setName(name); +} + + + + PIPeer::PeerInfo::Address::Address(const PIString & a, const PIString & m): address(a), netmask(m) { ping = -1.; wait_ping = false; @@ -110,6 +136,43 @@ void PIPeer::timerEvent(void * data, int delim) { } +void PIPeer::dtSendRequest(uchar type, PIByteArray & data) { + PIObject * e = emitter(); + //piCoutObj << "send request" << type << data.size_s(); + if (!e) return; + data.push_front(uchar(2)); + sendInternal(e->name(), data); +} + + +void PIPeer::dtReceiveFinished(bool ok) { + if (!ok) return; + PIDataTransfer * e = (PIDataTransfer*)emitter(); + if (!e) return; + dataReceived(e->name(), e->data()); + dataReceivedEvent(e->name(), e->data()); +} + + +void PIPeer::dtThread() { + PIObject * e = emitter(); + //piCoutObj << "send thread"; + if (!e) return; + peers_mutex.lock(); + PeerInfo * dp = quickestPeer(e->name()); + if (dp == 0) { + peers_mutex.unlock(); + return; + } + PIDataTransfer * dt = dp->_data.dt_out; + PIByteArray sd(dp->_data.data); + peers_mutex.unlock(); + //piCoutObj << "send DT ..."; + dt->send(sd); + //piCoutObj << "send DT done"; +} + + void PIPeer::initEths(PIStringList al) { PIEthernet * ce; PIEthernet::InterfaceList il = PIEthernet::interfaces(); @@ -234,6 +297,26 @@ PIPeer::PeerInfo * PIPeer::quickestPeer(const PIString & to) { bool PIPeer::send(const PIString & to, const void * data, int size) { + PIByteArray ba(data, size); + if (ba.size_s() <= _PIPEER_MSG_SIZE) { + ba.insert(0, uchar(1)); + return sendInternal(to, ba); + } else { + //ba.insert(0, uchar(2)); + PIMutexLocker mlocker(peers_mutex); + PeerInfo * dp = quickestPeer(to); + if (!dp) return false; + if (dp->_data.t->isRunning()) return false; + dp->_data.data = ba; + //piCoutObj << "start thread ..."; + dp->_data.t->startOnce(); + } + //piCout << "[PIPeer] send" << size << "bytes ok"; + return true; +} + + +bool PIPeer::sendInternal(const PIString & to, const PIByteArray & data) { PIMutexLocker mlocker(peers_mutex); PeerInfo * dp = quickestPeer(to); if (dp == 0) { @@ -241,25 +324,16 @@ bool PIPeer::send(const PIString & to, const void * data, int size) { return false; } PIByteArray ba; - ba << int(4) << self_info.name << to << int(0) << size; - PIByteArray fmsg(data, size), cmsg; - int msg_count = (size - 1) / _PIPEER_MSG_SIZE + 1; - //piCout << "[PIPeer] send" << size << "bytes in" << msg_count << "packets ..."; - for (int i = 0; i < msg_count; ++i) { - int csize = (i == msg_count - 1) ? ((size - 1) % _PIPEER_MSG_SIZE + 1) : _PIPEER_MSG_SIZE; - cmsg = ba; - cmsg << msg_count << i; - cmsg.append(fmsg.data(i * _PIPEER_MSG_SIZE), csize); - if (!sendToNeighbour(dp, cmsg)) return false; - } - //piCout << "[PIPeer] send" << size << "bytes ok"; + ba << int(4) << self_info.name << to << int(0) << data; + //piCoutObj << "sendInternal" << to << data.size_s() << int(data.front()); + if (!sendToNeighbour(dp, ba)) return false; return true; } bool PIPeer::dataRead(uchar * readed, int size) { if (size < 16) return true; - PIByteArray ba(readed, size), sba; + PIByteArray ba(readed, size), sba, pba; int type, cnt, rec_size; PIString from, to; ba >> type; @@ -313,40 +387,36 @@ bool PIPeer::dataRead(uchar * readed, int size) { } if (type != 4) return true; diag_d.received(size); - ba >> from >> to >> cnt >> rec_size; - piCoutObj << "Received packet" << type << from << to << cnt << rec_size; + ba >> from >> to >> cnt >> pba; + //piCoutObj << "Received packet" << type << from << to << pba.size_s(); if (type == 4) { // data packet if (to == self_info.name) { // my packet - int msg_count, cmsg; - ba >> msg_count >> cmsg; - piCoutObj << "Received packet" << type << from << to << cnt << rec_size << msg_count << cmsg; - if (cmsg == 0 && msg_count == 1) { - dataReceived(from, ba); - dataReceivedEvent(from, ba); - return true; - } + uchar pt = pba.take_front(); + //piCoutObj << "Received packet" << type << from << to << int(pt) << pba.size_s(); peers_mutex.lock(); PeerInfo * fp = const_cast(getPeerByName(from)); if (fp == 0) { peers_mutex.unlock(); return true; } - PeerData & pd(fp->_data); - if (cmsg == 0) { - piCoutObj << "Packet clear" << rec_size; - pd.clear(); - pd.msg_count = msg_count; + if (pt == 1) { + peers_mutex.unlock(); + dataReceived(from, pba); + dataReceivedEvent(from, pba); + return true; + } + if (pt == 2 || pt == 3) { + PIDataTransfer * dt = 0; + if (pt == 3) + dt = fp->_data.dt_in; + if (pt == 2) + dt = fp->_data.dt_out; + peers_mutex.unlock(); + //piCoutObj << "DT received" << pba.size_s(); + dt->received(pba); + return true; } - piCoutObj << "Packet add" << cmsg << ba.size_s(); - pd.addData(ba); - bool frec = pd.isFullReceived(); - PIByteArray rba(pd.data); peers_mutex.unlock(); - if (frec) { - dataReceived(from, rba); - dataReceivedEvent(from, rba); - piCoutObj << "Packet received" << pd.data.size_s(); - } return true; } PIMutexLocker plocker(peers_mutex); @@ -356,9 +426,8 @@ bool PIPeer::dataRead(uchar * readed, int size) { return true; } cnt++; - if (cnt > 100 || from == dp->name) return true; - sba << type << from << to << cnt << rec_size; - sba.append(ba); + if (cnt > _PIPEER_MSG_TTL || from == dp->name) return true; + sba << type << from << to << cnt << pba; //piCoutObj << "Translate data packet" << type << from << to << cnt << rec_size; sendToNeighbour(dp, sba); } @@ -394,7 +463,7 @@ bool PIPeer::mbcastRead(uchar * data, int size) { self_info.addNeighbour(pi.name); } pi.resetPing(); - peers << pi; + addPeer(pi); buildMap(); //piCoutObj << "new peer \"" << pi.name << "\"" << " dist " << pi.dist; pi.dist++; @@ -483,7 +552,7 @@ bool PIPeer::mbcastRead(uchar * data, int size) { if (exist || isRemoved(rpeer)) continue; rpeer.dist++; rpeer.resetPing(); - peers << rpeer; + addPeer(rpeer); ch = true; peerConnected(rpeer.name); peerConnectedEvent(rpeer.name); @@ -553,6 +622,18 @@ void PIPeer::removeNeighbour(const PIString & name) { } +void PIPeer::addPeer(const PIPeer::PeerInfo & pd) { + peers << pd; + PeerData & p(peers.back()._data); + p.initDT(pd.name); + CONNECTU(p.dt_in, sendRequest, this, dtSendRequestIn) + CONNECTU(p.dt_out, sendRequest, this, dtSendRequestOut) + CONNECTU(p.dt_in, receiveFinished, this, dtReceiveFinished) + CONNECTU(p.dt_out, receiveFinished, this, dtReceiveFinished) + CONNECTU(p.t, started, this, dtThread) +} + + bool PIPeer::removePeer(const PIString & name) { for (int i = 0; i < peers.size_s(); ++i) if (peers[i].name == name) { diff --git a/src/io/pipeer.h b/src/io/pipeer.h index 9c87d48a..7be79cfc 100755 --- a/src/io/pipeer.h +++ b/src/io/pipeer.h @@ -25,21 +25,21 @@ #include "piethernet.h" #include "pidiagnostics.h" +#include "pidatatransfer.h" class PIP_EXPORT PIPeer: public PIObject { PIOBJECT_SUBCLASS(PIPeer, PIObject) private: struct PeerData { - PeerData() {msg_count = msg_rec = 0;} - void clear() {msg_count = msg_rec = 0; data.clear();} - bool isEmpty() const {return msg_count == 0;} - bool isFullReceived() const {return msg_count == msg_rec;} - void addData(const PIByteArray & ba) {data.append(ba); msg_rec++;} - void setData(const PIByteArray & ba) {data = ba; msg_rec = 0; msg_count = (data.size_s() - 1) / 4096 + 1;} + explicit PeerData() {dt_in = dt_out = 0; t = 0;} + PeerData(const PeerData & ) {dt_in = dt_out = 0; t = 0;} + PeerData & operator =(const PeerData & ) {dt_in = dt_out = 0; t = 0; return *this;} + ~PeerData(); + void initDT(const PIString & name); PIByteArray data; - int msg_count; - int msg_rec; + PIThread * t; + PIDataTransfer * dt_in, * dt_out; }; public: @@ -134,11 +134,18 @@ protected: private: EVENT_HANDLER2(void, timerEvent, void * , data, int, delim); + EVENT_HANDLER1(void, dtSendRequestIn, PIByteArray &, data) {dtSendRequest(2, data);} + EVENT_HANDLER1(void, dtSendRequestOut, PIByteArray &, data) {dtSendRequest(3, data);} + EVENT_HANDLER1(void, dtReceiveFinished, bool, ok); + EVENT_HANDLER(void, dtThread); bool hasPeer(const PIString & name) {piForeachC (PeerInfo & i, peers) if (i.name == name) return true; return false;} bool removePeer(const PIString & name); void removeNeighbour(const PIString & name); + void addPeer(const PeerInfo & pd); + bool sendInternal(const PIString & to, const PIByteArray & data); + void dtSendRequest(uchar type, PIByteArray & data); void sendPeerInfo(const PeerInfo & info); void sendPeerRemove(const PIString & peer); void sendSelfInfo() {sendPeerInfo(self_info);} diff --git a/utils/system_daemon/daemon.h b/utils/system_daemon/daemon.h index 6993d893..511c2a41 100644 --- a/utils/system_daemon/daemon.h +++ b/utils/system_daemon/daemon.h @@ -11,7 +11,7 @@ extern PISystemMonitor sys_mon; class Daemon: public PIPeer { - PIOBJECT(Daemon) + PIOBJECT_SUBCLASS(Daemon, PIPeer) public: Daemon(); ~Daemon();