From 79aef34c247bcf305d1cd7f607616b372f8670a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=91=D1=8B=D1=87=D0=BA=D0=BE=D0=B2=20=D0=90=D0=BD=D0=B4?= =?UTF-8?q?=D1=80=D0=B5=D0=B9?= Date: Tue, 10 Mar 2015 13:23:16 +0000 Subject: [PATCH] PIFT fix git-svn-id: svn://db.shs.com.ru/pip@13 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5 --- main_udp_filetransfer.cpp | 2 +- src/io/pibasetransfer.cpp | 67 ++++++++++++++++++++++++--------------- src/io/pibasetransfer.h | 3 +- 3 files changed, 44 insertions(+), 28 deletions(-) diff --git a/main_udp_filetransfer.cpp b/main_udp_filetransfer.cpp index 1a895669..89e14467 100644 --- a/main_udp_filetransfer.cpp +++ b/main_udp_filetransfer.cpp @@ -88,7 +88,7 @@ public: CONNECTU(&ft, receiveFilesFinished, this, ftevent); CONNECTU(ð, threadedReadEvent, this, received); start(50); - eth.open(); + eth.setParameter(PIEthernet::SeparateSockets); eth.startThreadedRead(); } diff --git a/src/io/pibasetransfer.cpp b/src/io/pibasetransfer.cpp index a2479abb..7b983b0b 100644 --- a/src/io/pibasetransfer.cpp +++ b/src/io/pibasetransfer.cpp @@ -10,7 +10,9 @@ PIBaseTransfer::PIBaseTransfer(): crc(standardCRC_16()) { is_sending = is_receiving = false; break_ = true; bytes_all = bytes_cur = 0; - timeout_ = 1.; + replies_cnt = 0; + timeout_ = 10.; + packets_count = 32; setPacketSize(4096); srand(PISystemTime::current().toMilliseconds()); } @@ -38,15 +40,14 @@ void PIBaseTransfer::stopReceive() { void PIBaseTransfer::received(PIByteArray& data) { packet_header_size = sizeof(PacketHeader) + customHeader().size(); -// piCoutObj << "r" << data.size() << sizeof(PacketHeader); +// piCoutObj << "receive" << data.size(); if (data.size() < sizeof(PacketHeader)) return; PacketHeader h; data >> h; PacketType pt = (PacketType)h.type; // piCoutObj << "receive" << h.session_id << h.type << h.id; switch (pt) { - case pt_Unknown: - break; + case pt_Unknown: break; case pt_Data: if (h.session_id != header.session_id || !is_receiving) { sendBreak(h.session_id); @@ -62,10 +63,11 @@ void PIBaseTransfer::received(PIByteArray& data) { processData(h.id, data); } } - break; + break; case pt_ReplySuccess: case pt_ReplyInvalid: if (h.session_id != header.session_id) return; + replies_cnt++; if (is_sending) { if (h.id >= 0 && h.id < replies.size()) replies[h.id] = pt; @@ -73,7 +75,7 @@ void PIBaseTransfer::received(PIByteArray& data) { if (is_receiving && h.id == 0) { if (checkSession() == 0 && pt == pt_ReplySuccess) finish_receive(true); } - break; + break; case pt_Break: break_ = true; // piCoutObj << "BREAK"; @@ -85,7 +87,7 @@ void PIBaseTransfer::received(PIByteArray& data) { stopSend(); return; } - break; + break; case pt_Start: if (is_sending) { sendBreak(h.session_id); @@ -111,13 +113,13 @@ void PIBaseTransfer::received(PIByteArray& data) { is_receiving = true; break_ = false; receiveStarted(); + replies_cnt = 0; state_string = "receiving"; replies[0] = pt_ReplySuccess; sendReply(pt_ReplySuccess); } - break; - default: - break; + break; + default: break; } } @@ -131,23 +133,39 @@ bool PIBaseTransfer::send_process() { replies.fill(pt_Unknown); PIByteArray ba; if (!getStartRequest()) return finish_send(false); + replies_cnt = 0; state_string = "sending"; + PITimeMeasurer stm; + int ltm = 0; for (int i = 0; i < session.size_s(); i++) { + if (i - replies_cnt >= packets_count) { + --i; + piMSleep(1); + if (ltm > 10) return finish_send(false); + if (stm.elapsed_s() > timeout_/10) { + replies_cnt = i; + ltm++; + } + continue; + } + stm.reset(); ba = build_packet(i); sendRequest(ba); if (break_) return finish_send(false); } - PITimeMeasurer tm; + PITimeMeasurer tm2, tm; int prev_chk = 0; while (tm.elapsed_s() < timeout_) { int chk = checkSession(); if (chk != prev_chk) tm.reset(); if (chk == 0) return finish_send(true); if (chk > 0) { -// piCoutObj << "recovery packet" << chk; - piMSleep(1); - ba = build_packet(chk - 1); - sendRequest(ba); + if (tm2.elapsed_s() > timeout_/10) { + // piCoutObj << "recovery packet" << chk; + piMSleep(1); + ba = build_packet(chk - 1); + sendRequest(ba); + } } // if (chk == -1) return finish_send(false); if (break_) return finish_send(false); @@ -258,18 +276,15 @@ bool PIBaseTransfer::getStartRequest() { ba << header; ba << st; state_string = "send request"; - for (int i = 0; i < 5; i++) { - tm.reset(); + while (tm.elapsed_s() < timeout_) { sendRequest(ba); - while (tm.elapsed_s() < timeout_) { - if (break_) return false; - //piCoutObj << send_replyes[0]; - if (replies[0] == pt_ReplySuccess) { - state_string = "send permited!"; - return true; - } - piMSleep(10); + if (break_) return false; + //piCoutObj << send_replyes[0]; + if (replies[0] == pt_ReplySuccess) { + state_string = "send permited!"; + return true; } + piMSleep(100); } return false; } @@ -315,7 +330,6 @@ PIByteArray PIBaseTransfer::build_packet(int id) { PIByteArray ba; header.id = id + 1; header.type = pt_Data; -// piCoutObj << "Packet" << header.id; // piCoutObj << "session id" << header.session_id; //ret << header; ret.append(customHeader()); @@ -331,6 +345,7 @@ PIByteArray PIBaseTransfer::build_packet(int id) { header.crc = crc.calculate(ret); PIByteArray hdr; hdr << header; ret.insert(0, hdr); +// piCoutObj << "Packet" << header.id << ret.size(); return ret; } diff --git a/src/io/pibasetransfer.h b/src/io/pibasetransfer.h index acef8f9f..6b233c76 100644 --- a/src/io/pibasetransfer.h +++ b/src/io/pibasetransfer.h @@ -82,12 +82,13 @@ private: friend PIByteArray & operator <<(PIByteArray & s, const PIBaseTransfer::StartRequest & v); static const uint signature; - int packet_size; + int packet_size, packets_count; double timeout_; PIVector > session; PIVector replies; PacketHeader header; CRC_16 crc; + int replies_cnt; void processData(int id, PIByteArray &data); PIByteArray build_packet(int id);