/* PIP - Platform Independent Primitives Base class for reliable send and receive data in fixed packets with error correction, pause and resume Andrey Bychkov work.a.b@yandex.ru This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include "pibasetransfer.h" const uint PIBaseTransfer::signature = 0x54424950; PIBaseTransfer::PIBaseTransfer(): crc(standardCRC_16()), diag(false) { header.sig = signature; crc_enabled = true; header.session_id = 0; packet_header_size = sizeof(PacketHeader) + customHeader().size(); part_header_size = sizeof(Part) + sizeof(int); is_sending = is_receiving = is_pause = false; break_ = true; bytes_all = bytes_cur = 0; send_queue = 0; send_up = 0; timeout_ = 10.; diag.setDisconnectTimeout(timeout_ / 10); diag.setName("PIBaseTransfer"); diag.start(50); packets_count = 10; #ifdef FREERTOS setPacketSize(512); #else setPacketSize(4096); #endif randomize(); } PIBaseTransfer::~PIBaseTransfer() { diag.stop(); break_ = true; } void PIBaseTransfer::stopSend() { if (!is_sending) return; break_ = true; } void PIBaseTransfer::stopReceive() { if (!is_receiving) return; break_ = true; //piCoutObj << "stopReceive()"; finish_receive(false); } void PIBaseTransfer::setPause(bool pause_) { if (is_pause == pause_) return; pause_tm.reset(); is_pause = pause_; if (pause_) paused(); else resumed(); } void PIBaseTransfer::received(PIByteArray data) { packet_header_size = sizeof(PacketHeader) + customHeader().size(); if (data.size() < sizeof(PacketHeader)) { diag.received(data.size(), false); return; } PacketHeader h; data >> h; PacketType pt = (PacketType)h.type; if (!h.check_sig()) { piCoutObj << "invalid packet signature"; diag.received(data.size(), false); return; } else diag.received(data.size(), true); //piCoutObj << "receive" << h.session_id << h.type << h.id; switch (pt) { case pt_Unknown: break; case pt_Data: mutex_header.lock(); if (h.session_id != header.session_id || !is_receiving) { sendBreak(h.session_id); mutex_header.unlock(); return; } else { uint rcrc = h.crc; uint ccrc; if (crc_enabled) ccrc = crc.calculate(data.data(), data.size_s()); else ccrc = 0; if (rcrc != ccrc) { header.id = h.id; piCoutObj << "invalid CRC"; sendReply(pt_ReplyInvalid); } else { mutex_session.lock(); processData(h.id, data); mutex_session.unlock(); } if (is_pause) sendReply(pt_Pause); } mutex_header.unlock(); break; case pt_ReplySuccess: case pt_ReplyInvalid: mutex_header.lock(); if (h.session_id != header.session_id) { mutex_header.unlock(); return; } if (is_pause) sendReply(pt_Pause); mutex_header.unlock(); if (pt == pt_ReplySuccess) { mutex_send.lock(); send_queue--; if (send_queue < 0) send_queue = 0; send_tm.reset(); mutex_send.unlock(); } if (is_sending) { mutex_session.lock(); if (h.id < replies.size()) { replies[h.id] = pt; pm_string[h.id] = pt == pt_ReplySuccess ? '#' : '-'; int s = pm_string.find('+'), s1 = send_queue; while (s <= (int)h.id && s > 0) { send_queue--; if (s >= 0) { pm_string[s] = '-'; bytes_cur -= packet_size; } send_up = 0; if (send_queue < 0) { send_queue = 0; break; } s = pm_string.find('+'); } if (s1-send_queue > 1 && packets_count > 2) packets_count-= piMaxi(packets_count/10,1); if (s1 == send_queue && s1 < piMaxi(packets_count/2, 2) && packets_count < 100) send_up++; if (send_up > 20 && send_up > packets_count*2) packets_count+= piMaxi(packets_count/10,1); //piCoutObj << packets_count; } else piCoutObj << "invalid reply id"; mutex_session.unlock(); // piCoutObj << "Done Packet" << h.id; } if (is_receiving && h.id == 0) { if (pt == pt_ReplySuccess) { mutex_session.lock(); int cs = checkSession(); mutex_session.unlock(); //piCoutObj << "Success receive"; if (cs == 0) finish_receive(true); } } break; case pt_Break: break_ = true; //piCoutObj << "BREAK"; if (is_receiving) { stopReceive(); return; } if (is_sending) { stopSend(); return; } break; case pt_Start: mutex_header.lock(); if (is_pause && (is_sending || is_receiving)) { if (header.session_id == h.session_id) { is_pause = false; mutex_header.unlock(); resumed(); return; } } if (is_sending && header.session_id != h.session_id) { sendBreak(h.session_id); mutex_header.unlock(); return; } if (is_receiving) { if (header.session_id != h.session_id) { piCoutObj << "restart receive"; mutex_header.unlock(); finish_receive(false, true); } else { header.id = 0; sendReply(pt_ReplySuccess); mutex_header.unlock(); return; } } mutex_header.unlock(); if (data.size() == sizeof(StartRequest)) { StartRequest sr; data >> sr; mutex_session.lock(); mutex_header.lock(); bytes_cur = 0; state_string = "start request"; bytes_all = sr.size; header.session_id = h.session_id; header.id = 0; packets_count = 10; session.clear(); replies.clear(); session.resize(sr.packets); replies.resize(sr.packets + 1); replies.fill(pt_Unknown); pm_string.resize(replies.size(), '-'); diag.reset(); //piCoutObj << "receiveStarted()"; is_receiving = true; break_ = false; mutex_send.lock(); send_queue = 0; mutex_send.unlock(); beginReceive(); receiveStarted(); state_string = "receiving"; replies[0] = pt_ReplySuccess; pm_string[0] = '#'; mutex_session.unlock(); sendReply(pt_ReplySuccess); mutex_header.unlock(); } break; case pt_Pause: mutex_header.lock(); if (header.session_id == h.session_id) { //piCout << "receive pause"; if (!is_pause && pause_tm.elapsed_s() < timeout_/10) { //piCout << "resume"; sendReply(pt_Start); mutex_header.unlock(); return; } if (!is_pause) paused(); is_pause = true; if (is_receiving && pause_tm.elapsed_m() > 40) { //piCout << "send pause"; sendReply(pt_Pause); } if (is_sending) send_tm.reset(); pause_tm.reset(); } mutex_header.unlock(); break; default: break; } } bool PIBaseTransfer::send_process() { mutex_session.lock(); packet_header_size = sizeof(PacketHeader) + customHeader().size(); break_ = false; diag.reset(); sendStarted(); is_sending = true; int session_size = session.size(); replies.resize(session_size + 1); replies.fill(pt_Unknown); pm_string.resize(replies.size(), '-'); mutex_session.unlock(); PIByteArray ba; if (!getStartRequest()) return finish_send(false); state_string = "sending"; PITimeMeasurer stm; mutex_send.lock(); send_queue = 0; send_up = 0; send_tm.reset(); mutex_send.unlock(); for (int i = 0; i < session_size; i++) { mutex_send.lock(); int sq = send_queue; mutex_send.unlock(); if (sq >= packets_count || is_pause) { --i; if (is_pause) { piMSleep(40); //piCout << "send pause"; mutex_header.lock(); sendReply(pt_Pause); mutex_header.unlock(); if (pause_tm.elapsed_s() > timeout()) { return finish_send(false); } } mutex_send.lock(); if (send_tm.elapsed_s() > timeout_) { mutex_send.unlock(); return finish_send(false); } if (stm.elapsed_s() > timeout_ / 2.) { send_up = 0; send_queue = 0; pm_string.replaceAll("+", "-"); } mutex_send.unlock(); continue; } stm.reset(); ba = build_packet(i); diag.sended(ba.size_s()); sendRequest(ba); pm_string[i+1] = '+'; mutex_send.lock(); send_queue++; mutex_send.unlock(); if (break_) return finish_send(false); } // piCoutObj << "send done, checking"; PITimeMeasurer rtm; int prev_chk = 0; mutex_send.lock(); send_queue = 0; mutex_send.unlock(); piMSleep(10); state_string = "sending more"; while (rtm.elapsed_s() < timeout_) { //piCoutObj << "recovering..."; mutex_session.lock(); int chk = checkSession(); mutex_session.unlock(); if (chk == 0) return finish_send(true); if (chk != prev_chk) rtm.reset(); else if (rtm.elapsed_m() < 100) { piMinSleep(); continue; } if (is_pause) { piMSleep(40); //piCout << "send pause"; mutex_header.lock(); sendReply(pt_Pause); mutex_header.unlock(); if (pause_tm.elapsed_s() > timeout())return finish_send(false); else continue; } prev_chk = chk; if (chk > 0) { //piCoutObj << "recovery packet" << chk; mutex_send.lock(); int sq = send_queue; mutex_send.unlock(); if (sq >= packets_count) { piMSleep(10); send_queue = 0; send_up = 0; pm_string.replaceAll("+", "-"); continue; } ba = build_packet(chk - 1); diag.sended(ba.size_s()); sendRequest(ba); pm_string[chk] = '+'; mutex_send.lock(); send_queue++; mutex_send.unlock(); } if (break_) return finish_send(false); } return finish_send(false); } int PIBaseTransfer::checkSession() { if (!(is_receiving || is_sending)) return -1; int miss = 0; for (int i = 1; i < replies.size_s(); i++) { if (replies[i] != pt_ReplySuccess) miss++; if (replies[i] == pt_ReplyInvalid) return i; } for (int i = 1; i < replies.size_s(); i++) { if (replies[i] != pt_ReplySuccess) return i; } if (miss > 0) { //piCoutObj << "missing" << miss << "packets"; return -miss; } else return 0; } void PIBaseTransfer::buildSession(PIVector parts) { mutex_session.lock(); mutex_header.lock(); state_string = "calculating parts ... "; session.clear(); header.session_id = randomi(); bytes_all = 0; Part fi; int fi_index, fi_prts; PIVector lfi; int min_size = packet_header_size + part_header_size; int cur_size = min_size; for (int i = 0; i < parts.size_s(); i++) { state_string = "calculating parts ... " + PIString::fromNumber(i) + " of " + PIString::fromNumber(parts.size()); fi.id = parts[i].id; //piCout << fi.id << state_string; bytes_all += parts[i].size; fi.start = 0; llong rest = parts[i].size - (packet_size - cur_size); //piCout << i << fi.size << rest << bytes_all; if (rest <= 0) { fi.size = parts[i].size; lfi << fi; cur_size += fi.size + part_header_size; } else { fi.size = parts[i].size - rest; fi_index = 1; fi_prts = 1 + 1 + piMaxi(1, rest / (packet_size - min_size)); //piCout << fi_prts; lfi << fi; session << lfi; lfi.clear(); cur_size = min_size; llong fs = fi.size; for (int j = 1; j < fi_prts; j++) { fi_index++; fi.start = fs; fi.size = piMin(parts[i].size - fs, packet_size - min_size); lfi << fi; cur_size += fi.size + part_header_size; if (fi_index != fi_prts) { session << lfi; lfi.clear(); cur_size = min_size; fs += fi.size; } } } if (packet_size - cur_size < min_size) { session << lfi; lfi.clear(); cur_size = min_size; } } if (cur_size > min_size) session << lfi; mutex_header.unlock(); mutex_session.unlock(); } void PIBaseTransfer::sendBreak(int session_id) { //piCoutObj << "sendBreak"; uint psid = header.session_id; header.session_id = session_id; sendReply(pt_Break); header.session_id = psid; } void PIBaseTransfer::sendReply(PacketType reply) { //piCoutObj << "sendReply" << reply; header.type = reply; PIByteArray ba; ba << header; if (is_sending || is_receiving) diag.sended(ba.size_s()); sendRequest(ba); } bool PIBaseTransfer::getStartRequest() { mutex_header.lock(); header.type = pt_Start; header.id = 0; PIByteArray ba; StartRequest st; st.packets = (uint)session.size(); st.size = bytes_all; ba << header; mutex_header.unlock(); ba << st; state_string = "send request"; PITimeMeasurer tm; while (tm.elapsed_s() < timeout_) { diag.sended(ba.size_s()); sendRequest(ba); if (break_) return false; // piCoutObj << replies[0]; mutex_session.lock(); if (replies[0] == pt_ReplySuccess) { state_string = "send permited!"; //piCoutObj << "ping " << tm.elapsed_m(); mutex_session.unlock(); return true; } mutex_session.unlock(); piMinSleep(); } return false; } void PIBaseTransfer::processData(int id, PIByteArray & data) { // piCoutObj << "received packet" << id << ", size" << data.size(); if (id < 1 || id > replies.size_s()) return; if (!session[id - 1].isEmpty()) { header.id = id; replies[id] = pt_ReplySuccess; pm_string[id] = '#'; sendReply(pt_ReplySuccess); if (replies[replies.size()-1] == pt_ReplySuccess) if (checkSession() == 0) state_string = "receive ok"; return; } Part fi; PIByteArray ba, pheader; pheader.resize(packet_header_size - sizeof(PacketHeader)); if (!pheader.isEmpty()) { memcpy(pheader.data(), data.data(), pheader.size()); data.remove(0, pheader.size_s()); } while (!data.isEmpty()) { ba.clear(); data >> fi; data >> ba; bytes_cur += fi.size; //piCoutObj << "recv" << fi; session[id - 1] << fi; state_string = "receiving..."; receivePart(fi, ba, pheader); } header.id = id; replies[id] = pt_ReplySuccess; pm_string[id] = '#'; sendReply(pt_ReplySuccess); if (checkSession() == 0) state_string = "receive ok"; } PIByteArray PIBaseTransfer::build_packet(int id) { PIByteArray ret; PIByteArray ba; //piCoutObj << "session id" << header.session_id; ret.append(customHeader()); mutex_session.lock(); for (int i = 0; i < session[id].size_s(); i++) { Part fi = session[id][i]; ret << fi; //piCout << "build" << fi; ba = buildPacket(fi); bytes_cur += ba.size(); if (ba.size() != fi.size) piCoutObj << "***error while build packet, wrong part size"; ret << ba; } mutex_session.unlock(); PIByteArray hdr; mutex_header.lock(); header.id = id + 1; header.type = pt_Data; if (crc_enabled) header.crc = crc.calculate(ret); else header.crc = 0; hdr << header; mutex_header.unlock(); ret.insert(0, hdr); // piCoutObj << "Send Packet" << header.id << ret.size(); return ret; } bool PIBaseTransfer::finish_send(bool ok) { is_sending = false; if (ok) state_string = "send done"; else state_string = "send failed"; mutex_header.lock(); header.id = 0; if (!ok) sendBreak(header.session_id); else sendReply(pt_ReplySuccess); mutex_header.unlock(); //piCoutObj << state_string << PIString::readableSize(bytes_all); sendFinished(ok); bytes_all = bytes_cur = 0; return ok; } void PIBaseTransfer::finish_receive(bool ok, bool quet) { is_receiving = false; if (ok) state_string = "receive done"; else state_string = "receive failed"; if (!ok && !quet) { mutex_header.lock(); sendBreak(header.session_id); mutex_header.unlock(); } //piCoutObj << state_string << PIString::readableSize(bytes_all); receiveFinished(ok); bytes_all = bytes_cur = 0; }