diff --git a/src/containers/piqueue.h b/src/containers/piqueue.h index cf6cc2e2..bca83ad6 100755 --- a/src/containers/piqueue.h +++ b/src/containers/piqueue.h @@ -25,17 +25,17 @@ #ifndef PIQUEUE_H #define PIQUEUE_H -#include "pivector.h" +#include "pideque.h" template -class PIP_EXPORT PIQueue: public PIVector { +class PIP_EXPORT PIQueue: public PIDeque { public: PIQueue() {;} - PIVector & enqueue(const T & v) {PIVector::push_front(v); return *this;} - T dequeue() {return PIVector::take_back();} - T & head() {return PIVector::back();} - const T & head() const {return PIVector::back();} - PIVector toVector() {PIVector v(PIVector::size()); for (uint i = 0; i < PIVector::size(); ++i) v[i] = PIVector::at(i); return v;} + PIDeque & enqueue(const T & v) {PIDeque::push_front(v); return *this;} + T dequeue() {return PIDeque::take_back();} + T & head() {return PIDeque::back();} + const T & head() const {return PIDeque::back();} + PIVector toVector() {PIVector v(PIDeque::size()); for (uint i = 0; i < PIDeque::size(); ++i) v[i] = PIDeque::at(i); return v;} }; #endif // PIQUEUE_H diff --git a/src/io/pibasetransfer.cpp b/src/io/pibasetransfer.cpp index 5bb2bcf9..9de29b98 100644 --- a/src/io/pibasetransfer.cpp +++ b/src/io/pibasetransfer.cpp @@ -10,12 +10,12 @@ PIBaseTransfer::PIBaseTransfer(): crc(standardCRC_16()), diag(false) { is_sending = is_receiving = is_pause = false; break_ = true; bytes_all = bytes_cur = 0; - replies_cnt = send_queue = 0; + send_queue = 0; timeout_ = 10.; diag.setDisconnectTimeout(timeout_); //CONNECTU(&diag, qualityChanged, this, diagChanged); - diag.start(100); - packets_count = 32; + diag.start(50); + packets_count = 100; setPacketSize(4096); randomize(); } @@ -63,12 +63,14 @@ void PIBaseTransfer::received(PIByteArray data) { diag.received(data.size(), false); return; } else diag.received(data.size(), true); - //piCoutObj << "receive" << h.session_id << h.type << h.id; +// piCoutObj << "receive" << h.session_id << h.type << h.id; switch (pt) { case pt_Unknown: break; case pt_Data: + mutex_header.lock(); if (h.session_id != header.session_id || !is_receiving) { sendBreak(h.session_id); + mutex_header.unlock(); return; } else { uint rcrc = h.crc; @@ -76,32 +78,47 @@ void PIBaseTransfer::received(PIByteArray data) { if (rcrc != ccrc) { header.id = h.id; sendReply(pt_ReplyInvalid); - //piCoutObj << "invalid CRC"; + piCoutObj << "invalid CRC"; } else { + mutex_session.lock(); processData(h.id, data); + mutex_session.unlock(); } if (is_pause) sendReply(pt_Pause); } + mutex_header.unlock(); break; case pt_ReplySuccess: case pt_ReplyInvalid: - if (h.session_id != header.session_id) return; + mutex_header.lock(); + if (h.session_id != header.session_id) { + mutex_header.unlock(); + return; + } if (is_pause) sendReply(pt_Pause); + mutex_header.unlock(); if (pt == pt_ReplySuccess) { + mutex_send.lock(); send_queue--; if (send_queue < 0) send_queue = 0; send_tm.reset(); + mutex_send.unlock(); } - replies_cnt++; if (is_sending) { + mutex_session.lock(); if (h.id < replies.size()) replies[h.id] = pt; + mutex_session.unlock(); +// piCoutObj << "Done Packet" << h.id; } if (is_receiving && h.id == 0) { // if (checkSession() == 0 && pt == pt_ReplySuccess) finish_receive(true); - if (checkSession() == 0 && pt == pt_ReplySuccess) { + if (pt == pt_ReplySuccess) { + mutex_session.lock(); + int cs = checkSession(); + mutex_session.unlock(); //piCoutObj << "Success receive"; - finish_receive(true); + if (cs == 0) finish_receive(true); } } break; @@ -118,15 +135,18 @@ void PIBaseTransfer::received(PIByteArray data) { } break; case pt_Start: + mutex_header.lock(); if (is_pause && (is_sending || is_receiving)) { if (header.session_id == h.session_id) { is_pause = false; + mutex_header.unlock(); resumed(); return; } } if (is_sending && header.session_id != h.session_id) { sendBreak(h.session_id); + mutex_header.unlock(); return; } if (is_receiving) { @@ -134,12 +154,19 @@ void PIBaseTransfer::received(PIByteArray data) { //sendBreak(h.session_id); //return; //piCoutObj << "restart receive"; + mutex_header.unlock(); finish_receive(false, true); - } else return; + } else { + mutex_header.unlock(); + return; + } } + mutex_header.unlock(); if (data.size() == sizeof(StartRequest)) { StartRequest sr; data >> sr; + mutex_session.lock(); + mutex_header.lock(); bytes_cur = 0; state_string = "start request"; bytes_all = sr.size; @@ -155,19 +182,25 @@ void PIBaseTransfer::received(PIByteArray data) { //piCoutObj << "receiveStarted()"; is_receiving = true; break_ = false; + mutex_send.lock(); + send_queue = 0; + mutex_send.unlock(); receiveStarted(); - replies_cnt = send_queue = 0; state_string = "receiving"; replies[0] = pt_ReplySuccess; + mutex_session.unlock(); sendReply(pt_ReplySuccess); + mutex_header.unlock(); } break; case pt_Pause: + mutex_header.lock(); if (header.session_id == h.session_id) { //piCout << "receive pause"; if (!is_pause && pause_tm.elapsed_s() < timeout_/10) { //piCout << "resume"; sendReply(pt_Start); + mutex_header.unlock(); return; } if (!is_pause) paused(); @@ -179,6 +212,7 @@ void PIBaseTransfer::received(PIByteArray data) { if (is_sending) send_tm.reset(); pause_tm.reset(); } + mutex_header.unlock(); break; default: break; } @@ -186,48 +220,73 @@ void PIBaseTransfer::received(PIByteArray data) { bool PIBaseTransfer::send_process() { + mutex_session.lock(); packet_header_size = sizeof(PacketHeader) + customHeader().size(); break_ = false; diag.reset(); // diag.start(100); sendStarted(); is_sending = true; - replies.resize(session.size() + 1); + int session_size = session.size(); + replies.resize(session_size + 1); replies.fill(pt_Unknown); + mutex_session.unlock(); PIByteArray ba; if (!getStartRequest()) return finish_send(false); - replies_cnt = send_queue = 0; state_string = "sending"; PITimeMeasurer stm; + mutex_send.lock(); + send_queue = 0; send_tm.reset(); + mutex_send.unlock(); //int ltm = 0; - for (int i = 0; i < session.size_s(); i++) { - if (send_queue >= packets_count || is_pause) { + for (int i = 0; i < session_size; i++) { + mutex_send.lock(); + int sq = send_queue; + mutex_send.unlock(); + if (sq >= packets_count || is_pause) { --i; - piMSleep(1); + //piMSleep(1); if (is_pause) { piMSleep(40); //piCout << "send pause"; + mutex_header.lock(); sendReply(pt_Pause); + mutex_header.unlock(); if (pause_tm.elapsed_s() > timeout())return finish_send(false); } - if (send_tm.elapsed_s() > timeout_) return finish_send(false); - if (stm.elapsed_s() > timeout_ / 10.) + mutex_send.lock(); + if (send_tm.elapsed_s() > timeout_) { + mutex_send.unlock(); + return finish_send(false); + } + if (stm.elapsed_s() > timeout_ / 10.) { send_queue = 0; + } + mutex_send.unlock(); continue; } stm.reset(); ba = build_packet(i); diag.sended(ba.size()); sendRequest(ba); + mutex_send.lock(); send_queue++; + mutex_send.unlock(); if (break_) return finish_send(false); } - PITimeMeasurer tm2, tm; +// piCoutObj << "send done, checking"; + PITimeMeasurer tm; int prev_chk = 0; + mutex_send.lock(); send_queue = 0; + mutex_send.unlock(); + piMSleep(10); while (tm.elapsed_s() < timeout_) { + //piCoutObj << "recovering..."; + mutex_session.lock(); int chk = checkSession(); + mutex_session.unlock(); if (chk == 0) return finish_send(true); if (chk != prev_chk) tm.reset(); else if (tm.elapsed_m() < 100) { @@ -237,27 +296,32 @@ bool PIBaseTransfer::send_process() { if (is_pause) { piMSleep(40); //piCout << "send pause"; + mutex_header.lock(); sendReply(pt_Pause); + mutex_header.unlock(); if (pause_tm.elapsed_s() > timeout())return finish_send(false); else continue; } prev_chk = chk; if (chk > 0) { - if (tm2.elapsed_s() > timeout_ / 10.) { - //piCoutObj << "recovery packet" << chk; - if (send_queue >= packets_count) { - piMSleep(10); - continue; - } - ba = build_packet(chk - 1); - diag.sended(ba.size()); - sendRequest(ba); - send_queue++; + //piCoutObj << "recovery packet" << chk; + mutex_send.lock(); + int sq = send_queue; + mutex_send.unlock(); + if (sq >= packets_count) { + piMSleep(10); + continue; } + ba = build_packet(chk - 1); + diag.sended(ba.size()); + sendRequest(ba); + mutex_send.lock(); + send_queue++; + mutex_send.unlock(); } // if (chk == -1) return finish_send(false); if (break_) return finish_send(false); - piMSleep(1); + //piMSleep(1); } return finish_send(false); } @@ -281,6 +345,8 @@ int PIBaseTransfer::checkSession() { void PIBaseTransfer::buildSession(PIVector parts) { + mutex_session.lock(); + mutex_header.lock(); state_string = "calculating parts ... "; session.clear(); header.session_id = rand(); @@ -334,6 +400,8 @@ void PIBaseTransfer::buildSession(PIVector parts) { } } if (cur_size > min_size) session << lfi; + mutex_header.unlock(); + mutex_session.unlock(); } @@ -358,6 +426,7 @@ void PIBaseTransfer::sendReply(PacketType reply) { bool PIBaseTransfer::getStartRequest() { PITimeMeasurer tm; + mutex_header.lock(); header.type = pt_Start; header.id = 0; PIByteArray ba; @@ -365,6 +434,7 @@ bool PIBaseTransfer::getStartRequest() { st.packets = (uint)session.size(); st.size = bytes_all; ba << header; + mutex_header.unlock(); ba << st; state_string = "send request"; while (tm.elapsed_s() < timeout_) { @@ -372,24 +442,29 @@ bool PIBaseTransfer::getStartRequest() { sendRequest(ba); if (break_) return false; // piCoutObj << replies[0]; + mutex_session.lock(); if (replies[0] == pt_ReplySuccess) { state_string = "send permited!"; + piCoutObj << "ping " << tm.elapsed_m(); + mutex_session.unlock(); return true; } - piMSleep(100); + mutex_session.unlock(); + piMSleep(1); } return false; } void PIBaseTransfer::processData(int id, PIByteArray & data) { - //piCoutObj << "received packet" << id << ", size" << data.size(); +// piCoutObj << "received packet" << id << ", size" << data.size(); if (id < 1 || id > replies.size_s()) return; if (!session[id - 1].isEmpty()) { header.id = id; replies[id] = pt_ReplySuccess; sendReply(pt_ReplySuccess); - if (checkSession() == 0) state_string = "receive ok"; + if (replies[replies.size()-1] == pt_ReplySuccess) + if (checkSession() == 0) state_string = "receive ok"; return; } Part fi; @@ -413,19 +488,18 @@ void PIBaseTransfer::processData(int id, PIByteArray & data) { } header.id = id; replies[id] = pt_ReplySuccess; - if (checkSession() == 0) state_string = "receive ok"; sendReply(pt_ReplySuccess); + if (checkSession() == 0) state_string = "receive ok"; } PIByteArray PIBaseTransfer::build_packet(int id) { PIByteArray ret; PIByteArray ba; - header.id = id + 1; - header.type = pt_Data; //piCoutObj << "session id" << header.session_id; //ret << header; ret.append(customHeader()); + mutex_session.lock(); for (int i = 0; i < session[id].size_s(); i++) { Part fi = session[id][i]; ret << fi; @@ -435,10 +509,16 @@ PIByteArray PIBaseTransfer::build_packet(int id) { if (ba.size() != fi.size) piCoutObj << "***error while build packet, wrong part size"; ret << ba; } + mutex_session.unlock(); + PIByteArray hdr; + mutex_header.lock(); + header.id = id + 1; + header.type = pt_Data; header.crc = crc.calculate(ret); - PIByteArray hdr; hdr << header; + hdr << header; + mutex_header.unlock(); ret.insert(0, hdr); - //piCoutObj << "Packet" << header.id << ret.size(); +// piCoutObj << "Send Packet" << header.id << ret.size(); return ret; } @@ -448,9 +528,11 @@ bool PIBaseTransfer::finish_send(bool ok) { if (ok) state_string = "send done"; else state_string = "send failed"; //piCoutObj << state_string << PIString::readableSize(bytes_all); + mutex_header.lock(); header.id = 0; if (!ok) sendBreak(header.session_id); else sendReply(pt_ReplySuccess); + mutex_header.unlock(); sendFinished(ok); // diag.stop(); bytes_all = bytes_cur = 0; @@ -463,24 +545,13 @@ void PIBaseTransfer::finish_receive(bool ok, bool quet) { if (ok) state_string = "receive done"; else state_string = "receive failed"; //piCoutObj << state_string << PIString::readableSize(bytes_all); - if (!ok && !quet) sendBreak(header.session_id); + if (!ok && !quet) { + mutex_header.lock(); + sendBreak(header.session_id); + mutex_header.unlock(); + } receiveFinished(ok); // diag.stop(); bytes_all = bytes_cur = 0; } - -void PIBaseTransfer::diagChanged(PIDiagnostics::Quality new_quality, PIDiagnostics::Quality old_quality) { - if (is_receiving) { - if (new_quality == PIDiagnostics::Failure) { - //piCout << "disconnected!"; - break_ = true; - is_receiving = false; - state_string = "receive failed"; - receiveFinished(false); - bytes_all = bytes_cur = 0; - //stopReceive(); - } - } -} - diff --git a/src/io/pibasetransfer.h b/src/io/pibasetransfer.h index 6797c0c4..3208d768 100644 --- a/src/io/pibasetransfer.h +++ b/src/io/pibasetransfer.h @@ -121,8 +121,11 @@ private: PITimeMeasurer send_tm, pause_tm; PacketHeader header; CRC_16 crc; - int replies_cnt, send_queue; + int send_queue; PIDiagnostics diag; + PIMutex mutex_session; + PIMutex mutex_send; + PIMutex mutex_header; void processData(int id, PIByteArray &data); PIByteArray build_packet(int id); @@ -132,7 +135,6 @@ private: bool getStartRequest(); bool finish_send(bool ok); void finish_receive(bool ok, bool quet = false); - EVENT_HANDLER2(void, diagChanged, PIDiagnostics::Quality, new_quality, PIDiagnostics::Quality, old_quality); }; inline PIByteArray & operator <<(PIByteArray & s, const PIBaseTransfer::PacketHeader & v) {s << v.sig << v.type << v.session_id << v.id << v.crc; return s;} diff --git a/src/io/pidiagnostics.cpp b/src/io/pidiagnostics.cpp index e91c892a..e56eb65e 100755 --- a/src/io/pidiagnostics.cpp +++ b/src/io/pidiagnostics.cpp @@ -39,87 +39,77 @@ PIDiagnostics::PIDiagnostics(bool start_): PITimer(PITimer::Pool) { reset(); - if (start_) start(); + if (start_) start(100); } void PIDiagnostics::reset() { - setDisconnectTimeout(3.); lock(); qual = PIDiagnostics::Unknown; - speedIn = speedOut = PIString::readableSize(0) + "/s"; - ifreq = immediate_freq = integral_freq = 0.f; - cur_pckt = rec_once = 0; - wrong_count = receive_count = send_count = wrong_bytes = receive_bytes = send_bytes = 0; - packets_in_sec = packets_out_sec = bytes_in_sec = bytes_out_sec = 0; + speedRecv = speedSend = PIString::readableSize(0) + "/s"; + immediate_freq = integral_freq = 0.f; + count_wrong = count_recv = count_send = bytes_wrong = bytes_recv = bytes_send = 0; + packets_recv_sec = packets_send_sec = bytes_recv_sec = bytes_send_sec = 0; + history_rec.clear(); + history_send.clear(); + history_rec << Entry(); + history_send << Entry(); unlock(); + setDisconnectTimeout(3.); } void PIDiagnostics::received(int size, bool correct) { lock(); - rec_once = 1; + Entry &e(history_send.back()); if (correct) { - float el = tm.elapsed_s(); - tm.reset(); - if (el > 0.f) immediate_freq = ifreq = 1.f / el; - else immediate_freq = ifreq = 0.f; - receive_count++; - receive_bytes += size; + e.cnt_ok++; + e.bytes_ok += size; + count_recv++; + bytes_recv += size; } else { - immediate_freq = ifreq = 0.f; - wrong_count++; - wrong_bytes += size; + e.cnt_fail++; + e.bytes_fail += size; + count_wrong++; + bytes_wrong += size; } - addToHistory(history_rec, size, correct); + e.empty = false; unlock(); } void PIDiagnostics::sended(int size) { lock(); - send_count++; - send_bytes += size; - addToHistory(history_send, size); + Entry &e(history_send.back()); + e.cnt_ok++; + e.bytes_ok += size; + count_send++; + bytes_send += size; + e.empty = false; unlock(); } void PIDiagnostics::tick(void * data, int delimiter) { lock(); - checkHistory(history_rec); - checkHistory(history_send); - PIDiagnostics::Quality diag; - immediate_freq = ifreq; - ifreq = 0.f; - int bps[2]; - int cpckt[2]; - bps[0] = bps[1] = 0; - cpckt[0] = cpckt[1] = 0; - packets_in_sec = packets_out_sec = 0; - piForeachC (Entry & e, history_rec) { - if (e.ok) { - bps[0] += e.bytes; - packets_in_sec++; - } - cpckt[e.ok ? 1 : 0]++; - } - piForeachC (Entry & e, history_send) { - bps[1] += e.bytes; - packets_out_sec++; - } - bytes_in_sec = bps[0] / disconn_; - bytes_out_sec = bps[1] / disconn_; - packets_in_sec /= disconn_; - packets_out_sec /= disconn_; - speedIn = PIString::readableSize(bytes_in_sec) + "/s"; - speedOut = PIString::readableSize(bytes_out_sec) + "/s"; - int arc = cpckt[0] + cpckt[1]; + int tcnt = 0; + Entry send = calcHistory(history_send, &tcnt); + Entry recv = calcHistory(history_rec, &tcnt); + float it = disconn_ * (float(tcnt) / history_rec.size()); + float hz = interval() / 1000.f; + integral_freq = recv.cnt_ok / it; + packets_recv_sec = ullong(float(recv.cnt_ok) / it); + packets_send_sec = ullong(float(send.cnt_ok) / it); + bytes_recv_sec = ullong(double(recv.bytes_ok) / it); + bytes_send_sec = ullong(double(send.bytes_ok) / it); + immediate_freq = double(history_rec.back().cnt_ok) / hz; + speedRecv = PIString::readableSize(ullong(double(history_rec.back().bytes_ok) / hz)) + "/s"; + speedSend = PIString::readableSize(ullong(double(history_send.back().bytes_ok) / hz)) + "/s"; + int arc = recv.cnt_ok + recv.cnt_fail; float good_percents = 0.f; - if (arc > 0) good_percents = (float)cpckt[1] / arc * 100.f; - if (disconn_ > 0.) integral_freq = cpckt[1] / disconn_; - else integral_freq = 0.; - if (rec_once == 0) { + if (arc > 0) good_percents = (float)recv.cnt_ok / arc * 100.f; + PIDiagnostics::Quality diag; + if (tcnt == 0) { diag = PIDiagnostics::Unknown; } else { if (good_percents == 0.f) diag = PIDiagnostics::Failure; @@ -131,39 +121,41 @@ void PIDiagnostics::tick(void * data, int delimiter) { qualityChanged(diag, qual); qual = diag; } + history_rec.dequeue(); + history_send.dequeue(); + Entry e; + if (diag != PIDiagnostics::Unknown) e.empty = false; + history_rec.enqueue(e); + history_send.enqueue(e); unlock(); } -void PIDiagnostics::addToHistory(PIVector & hist, int bytes, bool ok) { +PIDiagnostics::Entry PIDiagnostics::calcHistory(PIQueue &hist, int * cnt) { Entry e; - e.time = PISystemTime::current(true); - e.bytes = bytes; - e.ok = ok; - checkHistory(hist); - hist << e; -} - - -void PIDiagnostics::checkHistory(PIVector< PIDiagnostics::Entry > & hist) { - PISystemTime ctm = PISystemTime::current(true); + *cnt = 0; for (int i = 0; i < hist.size_s(); ++i) { - if ((ctm - hist[i].time).abs() > disconn_st) { - hist.remove(i); - --i; - } + e.bytes_ok += hist[i].bytes_ok; + e.bytes_fail += hist[i].bytes_fail; + e.cnt_ok += hist[i].cnt_ok; + e.cnt_fail += hist[i].cnt_fail; + if (!e.empty) *cnt++; } + return e; } void PIDiagnostics::propertyChanged(const PIString &) { - disconn_ = property("disconnectTimeout").toFloat(); - changeDisconnectTimeout(); + float disct = property("disconnectTimeout").toFloat(); + changeDisconnectTimeout(disct); } -void PIDiagnostics::changeDisconnectTimeout() { +void PIDiagnostics::changeDisconnectTimeout(float disct) { lock(); - disconn_st = PISystemTime::fromSeconds(disconn_); + disconn_ = piMaxf(disct, interval()); + int hist_size = piMaxi(int(disconn_ * 1000 / interval()), 1); + history_rec.resize(hist_size); + history_send.resize(hist_size); unlock(); } diff --git a/src/io/pidiagnostics.h b/src/io/pidiagnostics.h index 8a6ba83e..251a749d 100755 --- a/src/io/pidiagnostics.h +++ b/src/io/pidiagnostics.h @@ -51,7 +51,7 @@ public: float disconnectTimeout() const {return disconn_;} //! Returns period of full disconnect in seconds and period of averaging frequency - void setDisconnectTimeout(float s) {setProperty("disconnectTimeout", s); disconn_ = s; changeDisconnectTimeout();} + void setDisconnectTimeout(float s) {setProperty("disconnectTimeout", s); changeDisconnectTimeout(s);} //! Returns immediate receive frequency, packets/s @@ -61,43 +61,43 @@ public: float integralFrequency() const {return integral_freq;} //! Returns correct received packets per second - ullong receiveCountPerSec() const {return packets_in_sec;} + ullong receiveCountPerSec() const {return packets_recv_sec;} //! Returns sended packets per second - ullong sendCountPerSec() const {return packets_out_sec;} + ullong sendCountPerSec() const {return packets_send_sec;} //! Returns correct received bytes per second - ullong receiveBytesPerSec() const {return bytes_in_sec;} + ullong receiveBytesPerSec() const {return bytes_recv_sec;} //! Returns sended bytes per second - ullong sendBytesPerSec() const {return bytes_out_sec;} + ullong sendBytesPerSec() const {return bytes_send_sec;} //! Returns overall correct received bytes - ullong receiveBytes() const {return receive_bytes;} + ullong receiveBytes() const {return bytes_recv;} //! Returns overall wrong received bytes - ullong wrongBytes() const {return wrong_bytes;} + ullong wrongBytes() const {return bytes_wrong;} //! Returns overall sended bytes - ullong sendBytes() const {return send_bytes;} + ullong sendBytes() const {return bytes_send;} //! Returns overall correct received packets count - ullong receiveCount() const {return receive_count;} + ullong receiveCount() const {return count_recv;} //! Returns overall wrong received packets count - ullong wrongCount() const {return wrong_count;} + ullong wrongCount() const {return count_wrong;} //! Returns overall sended packets count - ullong sendCount() const {return send_count;} + ullong sendCount() const {return count_send;} //! Returns connection quality PIDiagnostics::Quality quality() const {return qual;} //! Returns receive speed in format "n {B|kB|MB|GB|TB}/s" - PIString receiveSpeed() const {return speedIn;} + PIString receiveSpeed() const {return speedRecv;} //! Returns send speed in format "n {B|kB|MB|GB|TB}/s" - PIString sendSpeed() const {return speedOut;} + PIString sendSpeed() const {return speedSend;} //! Returns immediate receive frequency pointer, packets/s. Useful for output to PIConsole @@ -107,46 +107,46 @@ public: const float * integralFrequency_ptr() const {return &integral_freq;} //! Returns correct received packets per second pointer. Useful for output to PIConsole - const ullong * receiveCountPerSec_ptr() const {return &packets_in_sec;} + const ullong * receiveCountPerSec_ptr() const {return &packets_recv_sec;} //! Returns sended packets per second pointer. Useful for output to PIConsole - const ullong * sendCountPerSec_ptr() const {return &packets_out_sec;} + const ullong * sendCountPerSec_ptr() const {return &packets_send_sec;} //! Returns correct received bytes per second pointer. Useful for output to PIConsole - const ullong * receiveBytesPerSec_ptr() const {return &bytes_in_sec;} + const ullong * receiveBytesPerSec_ptr() const {return &bytes_recv_sec;} //! Returns sended bytes per second pointer. Useful for output to PIConsole - const ullong * sendBytesPerSec_ptr() const {return &bytes_out_sec;} + const ullong * sendBytesPerSec_ptr() const {return &bytes_send_sec;} //! Returns overall correct received bytes pointer. Useful for output to PIConsole - const ullong * receiveBytes_ptr() const {return &receive_bytes;} + const ullong * receiveBytes_ptr() const {return &bytes_recv;} //! Returns overall wrong received bytes pointer. Useful for output to PIConsole - const ullong * wrongBytes_ptr() const {return &wrong_bytes;} + const ullong * wrongBytes_ptr() const {return &bytes_wrong;} //! Returns overall sended bytes pointer. Useful for output to PIConsole - const ullong * sendBytes_ptr() const {return &send_bytes;} + const ullong * sendBytes_ptr() const {return &bytes_send;} //! Returns overall correct received packets count pointer. Useful for output to PIConsole - const ullong * receiveCount_ptr() const {return &receive_count;} + const ullong * receiveCount_ptr() const {return &count_recv;} //! Returns overall wrong received packets count pointer. Useful for output to PIConsole - const ullong * wrongCount_ptr() const {return &wrong_count;} + const ullong * wrongCount_ptr() const {return &count_wrong;} //! Returns overall sended packets count pointer. Useful for output to PIConsole - const ullong * sendCount_ptr() const {return &send_count;} + const ullong * sendCount_ptr() const {return &count_send;} //! Returns connection quality pointer. Useful for output to PIConsole const int * quality_ptr() const {return (int * )&qual;} //! Returns receive speed pointer in format "n {B|kB|MB|GB|TB}/s". Useful for output to PIConsole - const PIString * receiveSpeed_ptr() const {return &speedIn;} + const PIString * receiveSpeed_ptr() const {return &speedRecv;} //! Returns send speed pointer in format "n {B|kB|MB|GB|TB}/s". Useful for output to PIConsole - const PIString * sendSpeed_ptr() const {return &speedOut;} + const PIString * sendSpeed_ptr() const {return &speedSend;} - EVENT_HANDLER0(void, start) {start(1000.); changeDisconnectTimeout();} - EVENT_HANDLER1(void, start, double, msecs) {if (msecs > 0.) {PITimer::start(msecs); changeDisconnectTimeout();}} + EVENT_HANDLER0(void, start) {start(100.); changeDisconnectTimeout(disconn_);} + EVENT_HANDLER1(void, start, double, msecs) {if (msecs > 0.) {PITimer::start(msecs); changeDisconnectTimeout(disconn_);}} EVENT_HANDLER0(void, stop) {PITimer::stop();} EVENT_HANDLER0(void, reset); @@ -182,27 +182,26 @@ public: private: struct Entry { - Entry() {ok = true; bytes = 0;} - PISystemTime time; - bool ok; - int bytes; + Entry() {bytes_ok = bytes_fail = 0; cnt_ok = cnt_fail = 0; empty = true;} + ullong bytes_ok; + ullong bytes_fail; + uint cnt_ok; + uint cnt_fail; + bool empty; }; void tick(void * data, int delimiter); - void addToHistory(PIVector & hist, int bytes, bool ok = true); - void checkHistory(PIVector & hist); + Entry calcHistory(PIQueue & hist, int * cnt); void propertyChanged(const PIString & ); - void changeDisconnectTimeout(); + void changeDisconnectTimeout(float disct); PIDiagnostics::Quality qual; - PIString speedIn, speedOut; - float ifreq, immediate_freq, integral_freq, disconn_; - PIVector history_rec, history_send; - PISystemTime disconn_st; - PITimeMeasurer tm; - char cur_pckt, rec_once; - ullong wrong_count, receive_count, send_count, wrong_bytes, receive_bytes, send_bytes; - ullong packets_in_sec, packets_out_sec, bytes_in_sec, bytes_out_sec; + PIString speedRecv, speedSend; + float ifreq, immediate_freq, integral_freq; + PIQueue history_rec, history_send; + float disconn_; + ullong count_wrong, count_recv, count_send, bytes_wrong, bytes_recv, bytes_send; + ullong packets_recv_sec, packets_send_sec, bytes_recv_sec, bytes_send_sec; }; diff --git a/src/io/pidir.cpp b/src/io/pidir.cpp index 1a09d434..ab39af13 100755 --- a/src/io/pidir.cpp +++ b/src/io/pidir.cpp @@ -228,7 +228,7 @@ PIVector PIDir::entries() { PIString p(dp); if (dp == ".") dp.clear(); else if (!dp.endsWith(separator)) dp += separator; - //piCout << "entries from" << p; +// piCout << "start entries from" << p; #ifdef WINDOWS if (dp == separator) { char letters[1024]; @@ -287,6 +287,7 @@ PIVector PIDir::entries() { } free(list); #endif +// piCout << "end entries from" << p; return l; } diff --git a/src/io/piethernet.h b/src/io/piethernet.h index 79bd2762..e3e7bd3f 100755 --- a/src/io/piethernet.h +++ b/src/io/piethernet.h @@ -42,7 +42,7 @@ public: UDP /** UDP - User Datagram Protocol */ , TCP_Client /** TCP client - allow connection to TCP server */ , TCP_Server /** TCP server - receive connections from TCP clients */ , - TCP_SingleTCP + TCP_SingleTCP /** TCP client single mode - connect & send & disconnect, on each packet */ }; //! \brief Parameters of %PIEthernet diff --git a/src/io/pifiletransfer.cpp b/src/io/pifiletransfer.cpp index 03d8d31a..7d63420f 100644 --- a/src/io/pifiletransfer.cpp +++ b/src/io/pifiletransfer.cpp @@ -211,7 +211,7 @@ PIByteArray PIFileTransfer::buildPacket(Part p) { bytes_file_cur = p.start; return ba; } - return PIByteArray(); + return ba; } @@ -286,11 +286,12 @@ void PIFileTransfer::receive_finished(bool ok) { if (!ok || !user_ok) { pftheader.session_id--; started_ = false; + piCoutObj << "file cancel"; receiveFilesFinished(false); } else resume(); } if (pftheader.step == pft_Data) { - //piCout << "file close"; + //piCoutObj << "file close"; work_file.close(); started_ = false; receiveFilesFinished(ok); @@ -299,7 +300,9 @@ void PIFileTransfer::receive_finished(bool ok) { void PIFileTransfer::send_finished(bool ok) { - //piCout << "file close"; - work_file.close(); - started_ = false; +// piCoutObj << "file close"; + if (pftheader.step == pft_Data) { + work_file.close(); + started_ = false; + } } diff --git a/src/thread/pipipelinethread.h b/src/thread/pipipelinethread.h new file mode 100644 index 00000000..c655d5cb --- /dev/null +++ b/src/thread/pipipelinethread.h @@ -0,0 +1,129 @@ +#ifndef PIPIPELINETHREAD_H +#define PIPIPELINETHREAD_H + +#include "pithread.h" + +template +class PIPipelineThread : public PIThread +{ + PIOBJECT_SUBCLASS(PIPipelineThread, PIThread) +public: + PIPipelineThread() { + cnt = 0; + max_size = 0; + wait_next_pipe = true; + next_overload = false; + } + ~PIPipelineThread() { + stop(); + if (!waitForFinish(1000)) terminate(); + } + template + void connectTo(PIPipelineThread * next) { + CONNECT2(void, T2, bool *, this, calculated, next, enqueue) + } + EVENT2(calculated, const T2 &, v, bool *, overload) + EVENT_HANDLER1(void, enqueue, const T1 &, v) {enqueue(v, 0);} + EVENT_HANDLER2(void, enqueue, const T1 &, v, bool *, overload) { + mutex.lock(); + if (max_size > 0 && in.size() < max_size) { + in.enqueue(v); + if (overload) *overload = false; + } else { + if (overload) *overload = true; + } + mutex.unlock(); + } + const ullong * counterPtr() const {return &cnt;} + ullong counter() const {return cnt;} + bool isEmpty() { + bool ret; + mutex.lock(); + ret = in.isEmpty(); + mutex.unlock(); + return ret; + } + int queSize() { + int ret; + mutex.lock(); + ret = in.size(); + mutex.unlock(); + return ret; + } + void clear() { + mutex.lock(); + in.clear(); + next_overload = false; + mutex.unlock(); + } + void stopCalc() { + if (isRunning()) { + stop(); + if (!waitForFinish(100)) terminate(); + mutex.unlock(); + mutex_l.unlock(); + } + } + T2 getLast() { + T2 ret; + mutex_l.lock(); + ret = last; + mutex_l.unlock(); + return ret; + } + + uint maxQueSize() { + uint ret; + mutex.lock(); + ret = max_size; + mutex.unlock(); + return ret; + } + + void setMaxQueSize(uint count) { + mutex.lock(); + count = max_size; + if (max_size > 0 && in.size() > max_size) in.resize(max_size); + mutex.unlock(); + } + + bool isWaitNextPipe() {return wait_next_pipe;} + void setWaitNextPipe(bool wait) {wait_next_pipe = wait;} + +protected: + virtual T2 calc(const T1 &v, bool &ok) = 0; + +private: + void begin() {cnt = 0;} + void run() { + mutex.lock(); + if (in.isEmpty()) { + mutex.unlock(); + piMSleep(1); + return; + } + if (next_overload) { + calculated(last, &next_overload); + } else { + T1 t = in.dequeue(); + bool ok = true; + mutex.unlock(); + T2 r = calc(t, ok); + mutex_l.lock(); + last = r; + mutex_l.unlock(); + cnt++; + if (ok) calculated(r, &next_overload); + } + } + PIQueue in; + T2 last; + PIMutex mutex; + PIMutex mutex_l; + ullong cnt; + uint max_size; + bool wait_next_pipe; + bool next_overload; +}; + +#endif // PIPIPELINETHREAD_H diff --git a/utils/udp_file_transfer/main.cpp b/utils/udp_file_transfer/main.cpp index 7cef756a..629c561a 100644 --- a/utils/udp_file_transfer/main.cpp +++ b/utils/udp_file_transfer/main.cpp @@ -8,17 +8,25 @@ using namespace PICoutManipulators; class UDPFileTransfer: public PITimer { PIOBJECT_SUBCLASS(UDPFileTransfer, PITimer) public: - UDPFileTransfer(const PIString &src_ip_port, const PIString &dst_ip_port) { + UDPFileTransfer(const PIString &src_ip_port, const PIString &dst_ip_port, bool test = false) { quet_ = false; + test_ = test; eth.setReadAddress(src_ip_port); eth.setSendAddress(dst_ip_port); - ft.setPacketSize(8192); - ft.setName("PIFT"); - CONNECTU(&ft, sendRequest, this, ftsend); - CONNECTU(&ft, sendFilesStarted, this, ftevent); - CONNECTU(&ft, receiveFilesStarted, this, ftevent); - CONNECTU(&ft, sendFilesFinished, this, ftevent); - CONNECTU(&ft, receiveFilesFinished, this, ftevent); + if (test_) { + testt.setPacketSize(1280); + testt.setName("TEST"); + //testt.setTimeout(0.1); + CONNECTU(&testt, sendRequest, this, ftsend); + } else { + ft.setPacketSize(1280); + ft.setName("PIFT"); + CONNECTU(&ft, sendRequest, this, ftsend); + CONNECTU(&ft, sendFilesStarted, this, ftevent); + CONNECTU(&ft, receiveFilesStarted, this, ftevent); + CONNECTU(&ft, sendFilesFinished, this, ftevent); + CONNECTU(&ft, receiveFilesFinished, this, ftevent); + } CONNECTU(ð, threadedReadEvent, this, received); start(50); eth.setParameter(PIEthernet::SeparateSockets); @@ -31,7 +39,14 @@ public: ft.send(file); } + void startTest() { + PIByteArray ba(1024*1024*64); + testt.send(ba); + } + PIFileTransfer ft; + PIDataTransfer testt; + bool test_; private: PIEthernet eth; @@ -44,30 +59,50 @@ private: ft.stopSend(); ft.stopReceive(); } -// piCout << (int)ft.diagnostic().quality(); + // piCout << (int)ft.diagnostic().quality(); } + if (testt.isSending() || testt.isReceiving()) ftevent(); } EVENT_HANDLER(void, ftevent) { + if (test_) { + +#ifdef WINDOWS + piCout + #else + PICout(AddSpaces) << ClearLine + #endif + << testt.stateString() + << testt.diagnostic().receiveSpeed() + << testt.diagnostic().sendSpeed() + << "(" + << PIString::readableSize(testt.bytesCur()) << "/" << PIString::readableSize(testt.bytesAll()) + << ")" + #ifndef WINDOWS + << Flush + #endif + ; + return; + } if (quet_) return; #ifdef WINDOWS - piCout -#else + piCout + #else PICout(AddSpaces) << ClearLine -#endif + #endif << ft.stateString() << ft.curFile() - << PIString::readableSize(ft.diagnostic().receiveBytesPerSec()) + "/s" - << PIString::readableSize(ft.diagnostic().sendBytesPerSec()) + "/s" + << ft.diagnostic().receiveSpeed() + << ft.diagnostic().sendSpeed() << "(" << PIString::readableSize(ft.bytesFileCur()) << "/" << PIString::readableSize(ft.bytesFileAll()) << ", " << PIString::readableSize(ft.bytesCur()) << "/" << PIString::readableSize(ft.bytesAll()) << ")" << "ETA" << (ft.diagnostic().receiveBytesPerSec() > 0 ? PIString::fromNumber(PISystemTime::fromSeconds((ft.bytesAll() - ft.bytesCur()) / ft.diagnostic().receiveBytesPerSec()).toSeconds()) : PIString("unknown")) -#ifndef WINDOWS + #ifndef WINDOWS << Flush -#endif + #endif ; } @@ -77,6 +112,10 @@ private: EVENT_HANDLER2(void, received, uchar * , readed, int, size) { PIByteArray ba(readed, size); + if(test_) { + testt.received(ba); + return; + } ft.received(ba); } }; @@ -95,13 +134,14 @@ void usage() { piCout << "-p " << Green << "- UDP port for transfer, by default 50005"; piCout << "-d " << Green << "- directory, where place received files"; piCout << " " << Green << "- add path to send, if no path, then \"pift\" working in receive mode"; + piCout << "-t " << Green << "- test mode"; } int main (int argc, char * argv[]) { PICLI cli(argc, argv); PIINTROSPECTION_START - cli.setOptionalArgumentsCount(-1); + cli.setOptionalArgumentsCount(-1); cli.addArgument("send", true); cli.addArgument("receive", true); cli.addArgument("dir", true); @@ -109,6 +149,28 @@ int main (int argc, char * argv[]) { cli.addArgument("help"); cli.addArgument("quet"); cli.addArgument("fullpath"); + cli.addArgument("test", true); + if (cli.hasArgument("test")) { + PIString src; + PIString dst; + bool send_ = false; + if (cli.argumentValue("test") == "1") { + src = "127.0.0.1:50005"; + dst = "127.0.0.1:50006"; + send_ = true; + } else { + src = "127.0.0.1:50006"; + dst = "127.0.0.1:50005"; + } + if (cli.hasArgument("send") && cli.hasArgument("receive") && cli.hasArgument("fullpath")) { + src = cli.argumentValue("receive"); + dst = cli.argumentValue("send"); + } + UDPFileTransfer tuf(src, dst, true); + if (send_) tuf.startTest(); + WAIT_FOREVER; + return 0; + } if ((!cli.hasArgument("send") || !cli.hasArgument("receive")) || cli.hasArgument("help")) { usage(); return 0; @@ -137,8 +199,9 @@ int main (int argc, char * argv[]) { } else { piCout << "wait for receiving"; } + kbd.start(); WAIT_FOR_EXIT - PICout(0) << "\n"; + PICout(0) << "\n"; return 0; }