diff --git a/src_main/console/pikbdlistener.cpp b/src_main/console/pikbdlistener.cpp index 64dd1bf1..d4469faa 100644 --- a/src_main/console/pikbdlistener.cpp +++ b/src_main/console/pikbdlistener.cpp @@ -300,7 +300,7 @@ void PIKbdListener::readKeyboard() { tm_dbl.reset(); } else if (mb < me.buttons) me.action = MouseButtonRelease; - else {piCoutObj << "WTF"; break;} + else {if (mb != 0) piCoutObj << "WTF"; break;} } me.buttons = mb; if (piCompareBinary(&prev_me, &me, sizeof(me))) diff --git a/src_main/io/pibasetransfer.cpp b/src_main/io/pibasetransfer.cpp index 21968db3..e1518f12 100644 --- a/src_main/io/pibasetransfer.cpp +++ b/src_main/io/pibasetransfer.cpp @@ -13,12 +13,13 @@ PIBaseTransfer::PIBaseTransfer(): crc(standardCRC_16()), diag(false) { break_ = true; bytes_all = bytes_cur = 0; send_queue = 0; + send_up = 0; timeout_ = 10.; diag.setDisconnectTimeout(timeout_ / 10); //CONNECTU(&diag, qualityChanged, this, diagChanged); diag.setName("PIBaseTransfer"); diag.start(50); - packets_count = 100; + packets_count = 10; setPacketSize(4096); randomize(); } @@ -112,9 +113,25 @@ void PIBaseTransfer::received(PIByteArray data) { } if (is_sending) { mutex_session.lock(); - if (h.id < replies.size()) + if (h.id < replies.size()) { replies[h.id] = pt; - else + pm_string[h.id] = pt == pt_ReplySuccess ? '#' : '-'; + int s = pm_string.find('+'), s1 = send_queue; + while (s <= (int)h.id && s > 0) { + send_queue--; + if (s >= 0) pm_string[s] = '-'; + send_up = 0; + if (send_queue < 0) { + send_queue = 0; + break; + } + s = pm_string.find('+'); + } + if (s1-send_queue > 1 && packets_count > 2) packets_count-= piMaxi(packets_count/10,1); + if (s1 == send_queue && s1 < piMaxi(packets_count/2, 2) && packets_count < 100) send_up++; + if (send_up > 20 && send_up > packets_count*2) packets_count+= piMaxi(packets_count/10,1); + //piCoutObj << packets_count; + } else piCoutObj << "invalid reply id"; mutex_session.unlock(); // piCoutObj << "Done Packet" << h.id; @@ -182,11 +199,13 @@ void PIBaseTransfer::received(PIByteArray data) { bytes_all = sr.size; header.session_id = h.session_id; header.id = 0; + packets_count = 10; session.clear(); replies.clear(); session.resize(sr.packets); replies.resize(sr.packets + 1); replies.fill(pt_Unknown); + pm_string.resize(replies.size(), '-'); diag.reset(); // diag.start(100); //piCoutObj << "receiveStarted()"; @@ -198,6 +217,7 @@ void PIBaseTransfer::received(PIByteArray data) { receiveStarted(); state_string = "receiving"; replies[0] = pt_ReplySuccess; + pm_string[0] = '#'; mutex_session.unlock(); sendReply(pt_ReplySuccess); mutex_header.unlock(); @@ -240,6 +260,7 @@ bool PIBaseTransfer::send_process() { int session_size = session.size(); replies.resize(session_size + 1); replies.fill(pt_Unknown); + pm_string.resize(replies.size(), '-'); mutex_session.unlock(); PIByteArray ba; if (!getStartRequest()) return finish_send(false); @@ -247,6 +268,7 @@ bool PIBaseTransfer::send_process() { PITimeMeasurer stm; mutex_send.lock(); send_queue = 0; + send_up = 0; send_tm.reset(); mutex_send.unlock(); //int ltm = 0; @@ -272,8 +294,10 @@ bool PIBaseTransfer::send_process() { mutex_send.unlock(); return finish_send(false); } - if (stm.elapsed_s() > timeout_ / 10.) { + if (stm.elapsed_s() > timeout_ / 2.) { + send_up = 0; send_queue = 0; + pm_string.replaceAll("+", "-"); } mutex_send.unlock(); continue; @@ -282,26 +306,28 @@ bool PIBaseTransfer::send_process() { ba = build_packet(i); diag.sended(ba.size_s()); sendRequest(ba); + pm_string[i+1] = '+'; mutex_send.lock(); send_queue++; mutex_send.unlock(); if (break_) return finish_send(false); } // piCoutObj << "send done, checking"; - PITimeMeasurer tm; + PITimeMeasurer rtm; int prev_chk = 0; mutex_send.lock(); send_queue = 0; mutex_send.unlock(); piMSleep(10); - while (tm.elapsed_s() < timeout_) { + state_string = "sending+"; + while (rtm.elapsed_s() < timeout_) { //piCoutObj << "recovering..."; mutex_session.lock(); int chk = checkSession(); mutex_session.unlock(); if (chk == 0) return finish_send(true); - if (chk != prev_chk) tm.reset(); - else if (tm.elapsed_m() < 100) { + if (chk != prev_chk) rtm.reset(); + else if (rtm.elapsed_m() < 100) { piMSleep(1); continue; } @@ -322,11 +348,15 @@ bool PIBaseTransfer::send_process() { mutex_send.unlock(); if (sq >= packets_count) { piMSleep(10); + send_queue = 0; + send_up = 0; + pm_string.replaceAll("+", "-"); continue; } ba = build_packet(chk - 1); diag.sended(ba.size_s()); sendRequest(ba); + pm_string[chk] = '+'; mutex_send.lock(); send_queue++; mutex_send.unlock(); @@ -457,7 +487,8 @@ bool PIBaseTransfer::getStartRequest() { mutex_session.lock(); if (replies[0] == pt_ReplySuccess) { state_string = "send permited!"; - //piCoutObj << "ping " << tm.elapsed_m(); + //packets_count = piClampi(10 / tm.elapsed_m(), 2, 100); + //piCoutObj << "ping " << tm.elapsed_m() << packets_count; mutex_session.unlock(); return true; } @@ -474,6 +505,7 @@ void PIBaseTransfer::processData(int id, PIByteArray & data) { if (!session[id - 1].isEmpty()) { header.id = id; replies[id] = pt_ReplySuccess; + pm_string[id] = '#'; sendReply(pt_ReplySuccess); if (replies[replies.size()-1] == pt_ReplySuccess) if (checkSession() == 0) state_string = "receive ok"; @@ -500,6 +532,7 @@ void PIBaseTransfer::processData(int id, PIByteArray & data) { } header.id = id; replies[id] = pt_ReplySuccess; + pm_string[id] = '#'; sendReply(pt_ReplySuccess); if (checkSession() == 0) state_string = "receive ok"; } diff --git a/src_main/io/pibasetransfer.h b/src_main/io/pibasetransfer.h index 4670f6c7..be64686c 100644 --- a/src_main/io/pibasetransfer.h +++ b/src_main/io/pibasetransfer.h @@ -69,6 +69,7 @@ public: bool isCRCEnabled() const {return crc_enabled;} PIString stateString() const {return state_string;} + PIString packetMap() const {return pm_string;} llong bytesAll() const {return bytes_all;} llong bytesCur() const {return bytes_cur;} const PIDiagnostics &diagnostic() {return diag;} @@ -131,11 +132,13 @@ private: PacketHeader header; CRC_16 crc; int send_queue; + int send_up; PIDiagnostics diag; PIMutex mutex_session; PIMutex mutex_send; PIMutex mutex_header; bool crc_enabled; + PIString pm_string; }; inline PIByteArray & operator <<(PIByteArray & s, const PIBaseTransfer::PacketHeader & v) {s << v.sig << v.type << v.session_id << v.id << v.crc; return s;} diff --git a/utils/udp_file_transfer/main.cpp b/utils/udp_file_transfer/main.cpp index a5baee3e..0e2955e6 100644 --- a/utils/udp_file_transfer/main.cpp +++ b/utils/udp_file_transfer/main.cpp @@ -1,6 +1,11 @@ #include "pip.h" #include "pifiletransfer.h" #include "pidatatransfer.h" +#include "piscreen.h" +#include "piscreentiles.h" + + +TileSimple pmt; using namespace PICoutManipulators; @@ -20,7 +25,7 @@ public: //testt.setTimeout(0.1); CONNECTU(&testt, sendRequest, this, ftsend); } else { - ft.setPacketSize(12800); + ft.setPacketSize(65000); ft.setName("PIFT"); CONNECTU(&ft, sendRequest, this, ftsend); CONNECTU(&ft, sendFilesStarted, this, ftevent); @@ -41,7 +46,7 @@ public: } void startTest() { - PIByteArray ba(1024*1024*256); + PIByteArray ba(1024*1024*8); testt.send(ba); } @@ -56,13 +61,17 @@ private: void tick(void *, int) { if (ft.isStarted()) { ftevent(); + updatePMT(); if (PIKbdListener::exiting) { ft.stopSend(); ft.stopReceive(); } // piCout << (int)ft.diagnostic().quality(); } - if (testt.isSending() || testt.isReceiving()) ftevent(); + if (testt.isSending() || testt.isReceiving()) { + ftevent(); + updatePMT(); + } } EVENT_HANDLER(void, ftevent) { @@ -119,6 +128,18 @@ private: } ft.received(ba); } + + + void updatePMT() { + PIString pm; + if (test_) pm = testt.packetMap(); + else pm = ft.packetMap(); + int wd = 110; + pmt.content.resize(pm.size() / wd + 1); + for (int i=0; iaddTile(new TilePICout()); + screen.rootTile()->addTile(&pmt); UDPFileTransfer tuf(src, dst, true); + screen.start(); if (send_) tuf.startTest(); - WAIT_FOREVER; + screen.waitForFinish(); return 0; } if ((!cli.hasArgument("send") || !cli.hasArgument("receive")) || cli.hasArgument("help")) { @@ -184,8 +210,13 @@ int main (int argc, char * argv[]) { src += ":"+PIString::fromNumber(port); dst += ":"+PIString::fromNumber(port); } - PIKbdListener kbd; - kbd.enableExitCapture(); +// PIKbdListener kbd; +// kbd.enableExitCapture(); + PIScreen screen(false); + screen.enableExitCapture(); + screen.rootTile()->addTile(new TilePICout()); + screen.rootTile()->addTile(&pmt); + screen.start(); UDPFileTransfer f(src, dst); if (cli.hasArgument("dir")) f.ft.setDirectory(cli.argumentValue("dir")); if (cli.hasArgument("quet")) f.setQuet(true); @@ -199,9 +230,8 @@ int main (int argc, char * argv[]) { } else { piCout << "wait for receiving"; } - kbd.start(); - WAIT_FOR_EXIT PICout(0) << "\n"; + screen.waitForFinish(); return 0; }