From b1519ea4af94bdad5daa8385f282b05a201d62bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=B5=D0=BB=D0=B8=D0=BF=D0=B5=D0=BD=D0=BA=D0=BE=20?= =?UTF-8?q?=D0=98=D0=B2=D0=B0=D0=BD?= Date: Tue, 31 Mar 2015 10:01:49 +0000 Subject: [PATCH] PIDataTransferansfers fixnsfer git-svn-id: svn://db.shs.com.ru/pip@51 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5 --- src/io/pibasetransfer.cpp | 44 ++++++++++++++++++++++++++------------- src/io/pibasetransfer.h | 3 ++- src/io/pipeer.cpp | 2 +- 3 files changed, 32 insertions(+), 17 deletions(-) diff --git a/src/io/pibasetransfer.cpp b/src/io/pibasetransfer.cpp index 34850df9..9ed3d919 100644 --- a/src/io/pibasetransfer.cpp +++ b/src/io/pibasetransfer.cpp @@ -10,7 +10,7 @@ PIBaseTransfer::PIBaseTransfer(): crc(standardCRC_16()) { is_sending = is_receiving = false; break_ = true; bytes_all = bytes_cur = 0; - replies_cnt = 0; + replies_cnt = send_queue = 0; timeout_ = 10.; packets_count = 32; setPacketSize(4096); @@ -67,6 +67,11 @@ void PIBaseTransfer::received(PIByteArray data) { case pt_ReplySuccess: case pt_ReplyInvalid: if (h.session_id != header.session_id) return; + if (pt == pt_ReplySuccess) { + send_queue--; + if (send_queue < 0) send_queue = 0; + send_tm.reset(); + } replies_cnt++; if (is_sending) { if (h.id >= 0 && h.id < replies.size()) @@ -113,7 +118,7 @@ void PIBaseTransfer::received(PIByteArray data) { is_receiving = true; break_ = false; receiveStarted(); - replies_cnt = 0; + replies_cnt = send_queue = 0; state_string = "receiving"; replies[0] = pt_ReplySuccess; sendReply(pt_ReplySuccess); @@ -133,43 +138,52 @@ bool PIBaseTransfer::send_process() { replies.fill(pt_Unknown); PIByteArray ba; if (!getStartRequest()) return finish_send(false); - replies_cnt = 0; + replies_cnt = send_queue = 0; state_string = "sending"; PITimeMeasurer stm; + send_tm.reset(); int ltm = 0; for (int i = 0; i < session.size_s(); i++) { - if (i - replies_cnt >= packets_count) { + if (send_queue >= packets_count) { --i; - piMSleep(1); - if (ltm > 10) return finish_send(false); - if (stm.elapsed_s() > timeout_/10) { - replies_cnt = i; - ltm++; - } + piMSleep(10); + if (send_tm.elapsed_s() > timeout_) return finish_send(false); + if (stm.elapsed_s() > timeout_ / 10.) + send_queue = 0; continue; } stm.reset(); ba = build_packet(i); sendRequest(ba); + send_queue++; if (break_) return finish_send(false); } PITimeMeasurer tm2, tm; int prev_chk = 0; + send_queue = 0; while (tm.elapsed_s() < timeout_) { int chk = checkSession(); - if (chk != prev_chk) tm.reset(); if (chk == 0) return finish_send(true); + if (chk != prev_chk) tm.reset(); + else if (tm.elapsed_m() < 100) { + piMSleep(1); + continue; + } + prev_chk = chk; if (chk > 0) { - if (tm2.elapsed_s() > timeout_/10) { - // piCoutObj << "recovery packet" << chk; - piMSleep(1); + if (tm2.elapsed_s() > timeout_ / 10.) { + //piCoutObj << "recovery packet" << chk; + if (send_queue >= packets_count) { + piMSleep(10); + continue; + } ba = build_packet(chk - 1); sendRequest(ba); + send_queue++; } } // if (chk == -1) return finish_send(false); if (break_) return finish_send(false); - prev_chk = chk; piMSleep(1); } return finish_send(false); diff --git a/src/io/pibasetransfer.h b/src/io/pibasetransfer.h index 15c961c8..8dda0037 100644 --- a/src/io/pibasetransfer.h +++ b/src/io/pibasetransfer.h @@ -86,9 +86,10 @@ private: double timeout_; PIVector > session; PIVector replies; + PITimeMeasurer send_tm; PacketHeader header; CRC_16 crc; - int replies_cnt; + int replies_cnt, send_queue; void processData(int id, PIByteArray &data); PIByteArray build_packet(int id); diff --git a/src/io/pipeer.cpp b/src/io/pipeer.cpp index db1ee919..35ee5d2d 100755 --- a/src/io/pipeer.cpp +++ b/src/io/pipeer.cpp @@ -424,7 +424,7 @@ bool PIPeer::dataRead(uchar * readed, int size) { cnt++; if (cnt > _PIPEER_MSG_TTL || from == dp->name) return true; sba << type << from << to << cnt << pba; - //piCoutObj << "Translate data packet" << type << from << to << cnt << rec_size; + piCout << "translate packet" << from << "->" << to << ", ttl =" << cnt; sendToNeighbour(dp, sba); } return true;