From 229e237b2b92ff674ad2c25a795787a806abde60 Mon Sep 17 00:00:00 2001 From: peri4 Date: Mon, 14 Jun 2021 16:17:05 +0300 Subject: [PATCH] transfers patch --- libs/main/io_utils/pibasetransfer.cpp | 1 + libs/main/io_utils/pibasetransfer.h | 1 + libs/main/io_utils/pidatatransfer.cpp | 5 +++ libs/main/io_utils/pidatatransfer.h | 1 + libs/main/io_utils/pifiletransfer.cpp | 3 +- libs/main/io_utils/pifiletransfer.h | 2 +- main.cpp | 63 ++++++++++++++++++++++++++- 7 files changed, 72 insertions(+), 4 deletions(-) diff --git a/libs/main/io_utils/pibasetransfer.cpp b/libs/main/io_utils/pibasetransfer.cpp index 0d3cef9a..1b2f9bf2 100644 --- a/libs/main/io_utils/pibasetransfer.cpp +++ b/libs/main/io_utils/pibasetransfer.cpp @@ -234,6 +234,7 @@ void PIBaseTransfer::received(PIByteArray data) { mutex_send.lock(); send_queue = 0; mutex_send.unlock(); + beginReceive(); receiveStarted(); state_string = "receiving"; replies[0] = pt_ReplySuccess; diff --git a/libs/main/io_utils/pibasetransfer.h b/libs/main/io_utils/pibasetransfer.h index d5f54ac2..988cdef9 100644 --- a/libs/main/io_utils/pibasetransfer.h +++ b/libs/main/io_utils/pibasetransfer.h @@ -93,6 +93,7 @@ protected: void buildSession(PIVector parts); virtual PIByteArray buildPacket(Part fi) = 0; virtual void receivePart(Part fi, PIByteArray ba, PIByteArray pheader) = 0; + virtual void beginReceive() {;} virtual PIByteArray customHeader() {return PIByteArray();} bool send_process(); diff --git a/libs/main/io_utils/pidatatransfer.cpp b/libs/main/io_utils/pidatatransfer.cpp index 9a5aff4a..db401ef5 100644 --- a/libs/main/io_utils/pidatatransfer.cpp +++ b/libs/main/io_utils/pidatatransfer.cpp @@ -34,6 +34,11 @@ void PIDataTransfer::receivePart(Part fi, PIByteArray ba, PIByteArray pheader) { } +void PIDataTransfer::beginReceive() { + data_.clear(); +} + + bool PIDataTransfer::send(const PIByteArray& ba) { data_ = ba; buildSession(PIVector() << Part(0, data_.size())); diff --git a/libs/main/io_utils/pidatatransfer.h b/libs/main/io_utils/pidatatransfer.h index f1cddf18..dd8480d7 100644 --- a/libs/main/io_utils/pidatatransfer.h +++ b/libs/main/io_utils/pidatatransfer.h @@ -39,6 +39,7 @@ public: private: virtual PIByteArray buildPacket(Part p); virtual void receivePart(PIBaseTransfer::Part fi, PIByteArray ba, PIByteArray pheader); + virtual void beginReceive(); PIByteArray data_; }; diff --git a/libs/main/io_utils/pifiletransfer.cpp b/libs/main/io_utils/pifiletransfer.cpp index d5ec8c9c..475aa814 100644 --- a/libs/main/io_utils/pifiletransfer.cpp +++ b/libs/main/io_utils/pifiletransfer.cpp @@ -30,7 +30,6 @@ PIFileTransfer::PIFileTransfer() { dir = PIDir::current(); started_ = scanning = false; bytes_file_all = bytes_file_cur = 0; - CONNECT(void, this, receiveStarted, this, receive_started); CONNECT1(void, bool, this, sendFinished, this, send_finished); CONNECT1(void, bool, this, receiveFinished, this, receive_finished); } @@ -286,7 +285,7 @@ PIByteArray PIFileTransfer::customHeader() { } -void PIFileTransfer::receive_started() { +void PIFileTransfer::beginReceive() { if (pftheader.step == pft_None) { files_.clear(); // piCoutObj << "start receive" diff --git a/libs/main/io_utils/pifiletransfer.h b/libs/main/io_utils/pifiletransfer.h index b74b9eef..254454a0 100644 --- a/libs/main/io_utils/pifiletransfer.h +++ b/libs/main/io_utils/pifiletransfer.h @@ -103,7 +103,7 @@ private: virtual void receivePart(Part fi, PIByteArray ba, PIByteArray pheader); virtual PIByteArray buildPacket(Part fi); virtual PIByteArray customHeader(); - EVENT_HANDLER(void, receive_started); + virtual void beginReceive(); EVENT_HANDLER1(void, send_finished, bool, ok); EVENT_HANDLER1(void, receive_finished, bool, ok); }; diff --git a/main.cpp b/main.cpp index c83f9350..a26a37b3 100644 --- a/main.cpp +++ b/main.cpp @@ -1,9 +1,70 @@ #include "pip.h" - +PIPeer p0("p0"), p1("p1"); int main() { + + PIMap sends, recs; + PISet errors, missed; + + CONNECTL(&p0, dataReceivedEvent, ([&](const PIString & from, const PIByteArray & data){ + uint cnt = *(uint*)(data.data()); + piCout << "rec " << cnt << data.size_s(); + recs[cnt] = data.size_s(); + if (sends[cnt] != data.size_s()) { + piCout << " " << cnt << "ERROR"; + errors << cnt; + } + })); + + p0.start(); + p1.start(); + + piSleep(2); + + int count = 10; + for (int i = 0; i < count; ++i) { + PIByteArray msg; + msg << (uint)i; + msg.enlarge(randomi() % 3800 + 4000); + sends[i] = msg.size_s(); + piCout << "send" << i << msg.size_s(); + p1.send(p0.name(), msg); + piMSleep(100); + } + + piSleep(1); + for (int i = 0; i < count; ++i) + if (!recs.contains(i)) missed << i; + piCout << "errors" << errors; + piCout << "missed" << missed; + + + /*PIDataTransfer tr0, tr1; + CONNECTL(&tr0, receiveFinished, ([&](bool ok){ + PIByteArray ba = tr0.data(); + uint cnt = *(uint*)(ba.data()); + piCout << "rec " << cnt << ok << ba.size_s(); + })); + + CONNECTL(&tr1, sendRequest, ([&](PIByteArray & data){tr0.received(data);})); + CONNECTL(&tr0, sendRequest, ([&](PIByteArray & data){tr1.received(data);})); + + int count = 10; + for (int i = 0; i < count; ++i) { + PIByteArray msg; + msg << (uint)i; + msg.enlarge(randomi() % 3800 + 4000); + //sends[i] = msg.size_s(); + piCout << "send" << i << msg.size_s(); + tr1.send(msg); + //piMSleep(100); + }*/ + + + return 0; + const int iters = 4; const int sz = 10000000;