diff --git a/src/io/pibasetransfer.cpp b/src/io/pibasetransfer.cpp index 91e3095b..1cae4672 100644 --- a/src/io/pibasetransfer.cpp +++ b/src/io/pibasetransfer.cpp @@ -7,7 +7,7 @@ PIBaseTransfer::PIBaseTransfer(): crc(standardCRC_16()), diag(false) { header.session_id = 0; packet_header_size = sizeof(PacketHeader) + customHeader().size(); part_header_size = sizeof(Part) + sizeof(int); - is_sending = is_receiving = false; + is_sending = is_receiving = is_pause = false; break_ = true; bytes_all = bytes_cur = 0; replies_cnt = send_queue = 0; @@ -36,11 +36,20 @@ void PIBaseTransfer::stopSend() { void PIBaseTransfer::stopReceive() { if (!is_receiving) return; break_ = true; -// piCoutObj << "stopReceive()"; + //piCoutObj << "stopReceive()"; finish_receive(false); } +void PIBaseTransfer::setPause(bool pause_) { + if (is_pause == pause_) return; + pause_tm.reset(); + is_pause = pause_; + if (pause_) paused(); + else resumed(); +} + + void PIBaseTransfer::received(PIByteArray data) { packet_header_size = sizeof(PacketHeader) + customHeader().size(); if (data.size() < sizeof(PacketHeader)) { @@ -54,94 +63,115 @@ void PIBaseTransfer::received(PIByteArray data) { diag.received(data.size(), false); return; } else diag.received(data.size(), true); -// piCoutObj << "receive" << h.session_id << h.type << h.id; + //piCoutObj << "receive" << h.session_id << h.type << h.id; switch (pt) { - case pt_Unknown: break; - case pt_Data: - if (h.session_id != header.session_id || !is_receiving) { - sendBreak(h.session_id); - return; - } else { - uint rcrc = h.crc; - uint ccrc = crc.calculate(data.data(), data.size_s()); - if (rcrc != ccrc) { - header.id = h.id; - sendReply(pt_ReplyInvalid); - piCoutObj << "invalid CRC"; + case pt_Unknown: break; + case pt_Data: + if (h.session_id != header.session_id || !is_receiving) { + sendBreak(h.session_id); + return; } else { - processData(h.id, data); + uint rcrc = h.crc; + uint ccrc = crc.calculate(data.data(), data.size_s()); + if (rcrc != ccrc) { + header.id = h.id; + sendReply(pt_ReplyInvalid); + piCoutObj << "invalid CRC"; + } else { + processData(h.id, data); + } + if (is_pause) sendReply(pt_Pause); } - } break; - case pt_ReplySuccess: - case pt_ReplyInvalid: - if (h.session_id != header.session_id) return; - if (pt == pt_ReplySuccess) { - send_queue--; - if (send_queue < 0) send_queue = 0; - send_tm.reset(); - } - replies_cnt++; - if (is_sending) { - if (h.id >= 0 && h.id < replies.size()) - replies[h.id] = pt; - } - if (is_receiving && h.id == 0) { - if (checkSession() == 0 && pt == pt_ReplySuccess) finish_receive(true); -// if (checkSession() == 0 && pt == pt_ReplySuccess) { piCoutObj << "Success receive"; finish_receive(true);} - } + case pt_ReplySuccess: + case pt_ReplyInvalid: + if (h.session_id != header.session_id) return; + if (is_pause) sendReply(pt_Pause); + if (pt == pt_ReplySuccess) { + send_queue--; + if (send_queue < 0) send_queue = 0; + send_tm.reset(); + } + replies_cnt++; + if (is_sending) { + if (h.id >= 0 && h.id < replies.size()) + replies[h.id] = pt; + } + if (is_receiving && h.id == 0) { + if (checkSession() == 0 && pt == pt_ReplySuccess) finish_receive(true); + //if (checkSession() == 0 && pt == pt_ReplySuccess) { piCoutObj << "Success receive"; finish_receive(true);} + } break; - case pt_Break: - break_ = true; -// piCoutObj << "BREAK"; - if (is_receiving) { - stopReceive(); - return; - } - if (is_sending) { - stopSend(); - return; - } + case pt_Break: + break_ = true; + // piCoutObj << "BREAK"; + if (is_receiving) { + stopReceive(); + return; + } + if (is_sending) { + stopSend(); + return; + } break; - case pt_Start: - if (is_sending) { - sendBreak(h.session_id); - return; - } - if (is_receiving) { - if (header.session_id != h.session_id) { - // sendBreak(h.session_id); - // return; -// piCoutObj << "restart receive"; - finish_receive(false, true); - } else return; - } - if (data.size() == sizeof(StartRequest)) { - StartRequest sr; - data >> sr; - bytes_cur = 0; - state_string = "start request"; - bytes_all = sr.size; - header.session_id = h.session_id; - header.id = 0; - session.clear(); - replies.clear(); - session.resize(sr.packets); - replies.resize(sr.packets + 1); - replies.fill(pt_Unknown); - diag.reset(); - diag.start(100); -// piCoutObj << "receiveStarted()"; - is_receiving = true; - break_ = false; - receiveStarted(); - replies_cnt = send_queue = 0; - state_string = "receiving"; - replies[0] = pt_ReplySuccess; - sendReply(pt_ReplySuccess); - } + case pt_Start: + if (is_pause && (is_sending || is_receiving)) { + if (header.session_id == h.session_id) { + resumed(); + is_pause = false; + return; + } + } + if (is_sending && header.session_id != h.session_id) { + sendBreak(h.session_id); + return; + } + if (is_receiving) { + if (header.session_id != h.session_id) { + //sendBreak(h.session_id); + //return; + //piCoutObj << "restart receive"; + finish_receive(false, true); + } else return; + } + if (data.size() == sizeof(StartRequest)) { + StartRequest sr; + data >> sr; + bytes_cur = 0; + state_string = "start request"; + bytes_all = sr.size; + header.session_id = h.session_id; + header.id = 0; + session.clear(); + replies.clear(); + session.resize(sr.packets); + replies.resize(sr.packets + 1); + replies.fill(pt_Unknown); + diag.reset(); + diag.start(100); + //piCoutObj << "receiveStarted()"; + is_receiving = true; + break_ = false; + receiveStarted(); + replies_cnt = send_queue = 0; + state_string = "receiving"; + replies[0] = pt_ReplySuccess; + sendReply(pt_ReplySuccess); + } break; - default: break; + case pt_Pause: + if (header.session_id == h.session_id) { + if (!is_pause && pause_tm.elapsed_s() < timeout_) { + sendReply(pt_Start); + return; + } + if (is_pause && pause_tm.elapsed_s() > timeout_/10) sendReply(pt_Pause); + pause_tm.reset(); + if (!is_pause) paused(); + is_pause = true; + } + break; + default: break; } } @@ -163,9 +193,10 @@ bool PIBaseTransfer::send_process() { send_tm.reset(); //int ltm = 0; for (int i = 0; i < session.size_s(); i++) { - if (send_queue >= packets_count) { + if (send_queue >= packets_count || is_pause) { --i; piMSleep(1); + if (is_pause && pause_tm.elapsed_s() > timeout())return finish_send(false); if (send_tm.elapsed_s() > timeout_) return finish_send(false); if (stm.elapsed_s() > timeout_ / 10.) send_queue = 0; @@ -189,6 +220,11 @@ bool PIBaseTransfer::send_process() { piMSleep(1); continue; } + if (is_pause) { + piMSleep(10); + if (pause_tm.elapsed_s() > timeout())return finish_send(false); + else continue; + } prev_chk = chk; if (chk > 0) { if (tm2.elapsed_s() > timeout_ / 10.) { @@ -203,7 +239,7 @@ bool PIBaseTransfer::send_process() { send_queue++; } } -// if (chk == -1) return finish_send(false); + // if (chk == -1) return finish_send(false); if (break_) return finish_send(false); piMSleep(1); } @@ -241,12 +277,12 @@ void PIBaseTransfer::buildSession(PIVector parts) { for (int i = 0; i < parts.size_s(); i++) { state_string = "calculating parts ... " + PIString::fromNumber(i) + " of " + PIString::fromNumber(parts.size()); fi.id = parts[i].id; -// piCout << fi.id << state_string; + //piCout << fi.id << state_string; bytes_all += parts[i].size; -// fi.size = fi.entry.size; + //fi.size = fi.entry.size; fi.start = 0; llong rest = parts[i].size - (packet_size - cur_size); -// piCout << i << fi.size << rest << bytes_all; + //piCout << i << fi.size << rest << bytes_all; if (rest <= 0) { fi.size = parts[i].size; lfi << fi; @@ -255,7 +291,7 @@ void PIBaseTransfer::buildSession(PIVector parts) { fi.size = parts[i].size - rest; fi_index = 1; fi_prts = 1 + 1 + piMaxi(1, rest / (packet_size - min_size)); -// piCout << fi_prts; + //piCout << fi_prts; lfi << fi; session << lfi; lfi.clear(); @@ -329,7 +365,7 @@ bool PIBaseTransfer::getStartRequest() { void PIBaseTransfer::processData(int id, PIByteArray & data) { -// piCoutObj << "received packet" << id << ", size" << data.size(); + //piCoutObj << "received packet" << id << ", size" << data.size(); if (id < 1 || id > replies.size_s()) return; if (!session[id - 1].isEmpty()) { header.id = id; @@ -348,10 +384,11 @@ void PIBaseTransfer::processData(int id, PIByteArray & data) { while (!data.isEmpty()) { ba.clear(); data >> fi; - /*if (fi.size > 0) */data >> ba; -// fi.fsize = ba.size(); + //if (fi.size > 0) + data >> ba; + //fi.fsize = ba.size(); bytes_cur += fi.size; -// piCoutObj << "recv" << fi; + //piCoutObj << "recv" << fi; session[id - 1] << fi; state_string = "receiving..."; receivePart(fi, ba, pheader); @@ -368,13 +405,13 @@ PIByteArray PIBaseTransfer::build_packet(int id) { PIByteArray ba; header.id = id + 1; header.type = pt_Data; -// piCoutObj << "session id" << header.session_id; + //piCoutObj << "session id" << header.session_id; //ret << header; ret.append(customHeader()); for (int i = 0; i < session[id].size_s(); i++) { Part fi = session[id][i]; ret << fi; -// piCout << "biuld" << fi; + //piCout << "biuld" << fi; ba = buildPacket(fi); bytes_cur += ba.size(); if (ba.size() != fi.size) piCoutObj << "***error while build packet, wrong part size"; @@ -383,7 +420,7 @@ PIByteArray PIBaseTransfer::build_packet(int id) { header.crc = crc.calculate(ret); PIByteArray hdr; hdr << header; ret.insert(0, hdr); -// piCoutObj << "Packet" << header.id << ret.size(); + //piCoutObj << "Packet" << header.id << ret.size(); return ret; } @@ -392,7 +429,7 @@ bool PIBaseTransfer::finish_send(bool ok) { is_sending = false; if (ok) state_string = "send done"; else state_string = "send failed"; -// piCoutObj << state_string << PIString::readableSize(bytes_all); + //piCoutObj << state_string << PIString::readableSize(bytes_all); header.id = 0; if (!ok) sendBreak(header.session_id); else sendReply(pt_ReplySuccess); @@ -407,7 +444,7 @@ void PIBaseTransfer::finish_receive(bool ok, bool quet) { is_receiving = false; if (ok) state_string = "receive done"; else state_string = "receive failed"; -// piCoutObj << state_string;// << PIString::readableSize(bytes_all); + //piCoutObj << state_string;// << PIString::readableSize(bytes_all); if (!ok && !quet) sendBreak(header.session_id); receiveFinished(ok); diag.stop(); @@ -418,7 +455,7 @@ void PIBaseTransfer::finish_receive(bool ok, bool quet) { void PIBaseTransfer::diagChanged(PIDiagnostics::Quality new_quality, PIDiagnostics::Quality old_quality) { if (is_receiving) { if (new_quality == PIDiagnostics::Failure) { - piCout << "disconnected!"; + //piCout << "disconnected!"; stopReceive(); } } diff --git a/src/io/pibasetransfer.h b/src/io/pibasetransfer.h index 8d5278fa..f1a994a1 100644 --- a/src/io/pibasetransfer.h +++ b/src/io/pibasetransfer.h @@ -35,6 +35,8 @@ public: bool isSending() const {return is_sending;} bool isReceiving() const {return is_receiving;} + bool isPause() const {return is_pause;} + void setPause(bool pause_); void setPacketSize(int size) {packet_size = size;} int packetSize() const {return packet_size;} @@ -51,15 +53,19 @@ public: const PIDiagnostics &diagnostic() {return diag;} EVENT(receiveStarted) + EVENT(paused) + EVENT(resumed) EVENT1(receiveFinished, bool, ok) EVENT(sendStarted) EVENT1(sendFinished, bool, ok) EVENT1(sendRequest, PIByteArray &, data) EVENT_HANDLER1(void, received, PIByteArray, data); + EVENT_HANDLER(void, pause) {setPause(true);} + EVENT_HANDLER(void, resume) {setPause(false);} protected: uint packet_header_size, part_header_size; - bool break_, is_sending, is_receiving; + bool break_, is_sending, is_receiving, is_pause; PIString state_string; llong bytes_all, bytes_cur; @@ -71,7 +77,7 @@ protected: private: - enum PacketType {pt_Unknown, pt_Data, pt_ReplySuccess, pt_ReplyInvalid, pt_Break, pt_Start}; + enum PacketType {pt_Unknown, pt_Data, pt_ReplySuccess, pt_ReplyInvalid, pt_Break, pt_Start, pt_Pause}; # pragma pack(push,1) struct StartRequest { @@ -88,7 +94,7 @@ private: double timeout_; PIVector > session; PIVector replies; - PITimeMeasurer send_tm; + PITimeMeasurer send_tm, pause_tm; PacketHeader header; CRC_16 crc; int replies_cnt, send_queue;