#include "pibasetransfer.h" const uint PIBaseTransfer::signature = 0x54444950; PIBaseTransfer::PIBaseTransfer(): crc(standardCRC_16()) { header.sig = signature; header.session_id = 0; packet_header_size = sizeof(PacketHeader) + customHeader().size(); part_header_size = sizeof(Part) + sizeof(int); is_sending = is_receiving = false; break_ = true; bytes_all = bytes_cur = 0; timeout_ = 1.; setPacketSize(4096); srand(PISystemTime::current().toMilliseconds()); } PIBaseTransfer::~PIBaseTransfer() { break_ = true; session.clear(); replies.clear(); } void PIBaseTransfer::stopSend() { if (!is_sending) return; break_ = true; } void PIBaseTransfer::stopReceive() { if (!is_receiving) return; break_ = true; finish_receive(false); } void PIBaseTransfer::received(PIByteArray& data) { packet_header_size = sizeof(PacketHeader) + customHeader().size(); // piCoutObj << "r" << data.size() << sizeof(PacketHeader); if (data.size() < sizeof(PacketHeader)) return; PacketHeader h; data >> h; PacketType pt = (PacketType)h.type; // piCoutObj << "receive" << h.session_id << h.type << h.id; switch (pt) { case pt_Unknown: break; case pt_Data: if (h.session_id != header.session_id || !is_receiving) { sendBreak(h.session_id); return; } else { uint rcrc = h.crc; uint ccrc = crc.calculate(data.data(), data.size_s()); if (rcrc != ccrc) { header.id = h.id; sendReply(pt_ReplyInvalid); piCoutObj << "invalid CRC"; } else { processData(h.id, data); } } break; case pt_ReplySuccess: case pt_ReplyInvalid: if (h.session_id != header.session_id) return; if (is_sending) { if (h.id >= 0 && h.id < replies.size()) replies[h.id] = pt; } if (is_receiving && h.id == 0) { if (checkSession() == 0 && pt == pt_ReplySuccess) finish_receive(true); } break; case pt_Break: break_ = true; // piCoutObj << "BREAK"; if (is_receiving) { stopReceive(); return; } if (is_sending) { stopSend(); return; } break; case pt_Start: if (is_sending) { sendBreak(h.session_id); return; } if (header.session_id != h.session_id && is_receiving) { sendBreak(h.session_id); return; } if (data.size() == sizeof(StartRequest)) { StartRequest sr; data >> sr; bytes_cur = 0; state_string = "start request"; bytes_all = sr.size; header.session_id = h.session_id; header.id = 0; session.clear(); replies.clear(); session.resize(sr.packets); replies.resize(sr.packets + 1); replies.fill(pt_Unknown); is_receiving = true; break_ = false; receiveStarted(); state_string = "receiving"; replies[0] = pt_ReplySuccess; sendReply(pt_ReplySuccess); } break; default: break; } } bool PIBaseTransfer::send_process() { packet_header_size = sizeof(PacketHeader) + customHeader().size(); break_ = false; is_sending = true; sendStarted(); replies.resize(session.size() + 1); replies.fill(pt_Unknown); PIByteArray ba; if (!getStartRequest()) return finish_send(false); state_string = "sending"; for (int i = 0; i < session.size_s(); i++) { ba = build_packet(i); sendRequest(ba); if (break_) return finish_send(false); } PITimeMeasurer tm; int prev_chk = 0; while (tm.elapsed_s() < timeout_) { int chk = checkSession(); if (chk != prev_chk) tm.reset(); if (chk == 0) return finish_send(true); if (chk > 0) { // piCoutObj << "recovery packet" << chk; piMSleep(1); ba = build_packet(chk - 1); sendRequest(ba); } // if (chk == -1) return finish_send(false); if (break_) return finish_send(false); prev_chk = chk; piMSleep(1); } return finish_send(false); } int PIBaseTransfer::checkSession() { int miss = 0; for (int i = 1; i < replies.size_s(); i++) { if (replies[i] != pt_ReplySuccess) miss++; if (replies[i] == pt_ReplyInvalid) return i; } for (int i = 1; i < replies.size_s(); i++) { if (replies[i] != pt_ReplySuccess) return i; } if (miss > 0) { piCoutObj << "missing" << miss << "packets"; return -miss; } else return 0; } void PIBaseTransfer::buildSession(PIVector parts) { state_string = "calculating files... "; session.clear(); header.session_id = rand(); bytes_all = 0; Part fi; int fi_index, fi_prts; PIVector lfi; int min_size = packet_header_size + part_header_size; int cur_size = min_size; for (int i = 0; i < parts.size_s(); i++) { state_string = "calculating files... " + PIString::fromNumber(i) + " of " + PIString::fromNumber(parts.size()); fi.id = parts[i].id; // piCout << fi.id << state_string; bytes_all += parts[i].size; // fi.size = fi.entry.size; fi.start = 0; int rest = parts[i].size - (packet_size - cur_size); // piCout << i << fi.size << rest << bytes_all; if (rest <= 0) { fi.size = parts[i].size; lfi << fi; cur_size += fi.size + part_header_size; } else { fi.size = parts[i].size - rest; fi_index = 1; fi_prts = 1 + 1 + piMaxi(1, rest / (packet_size - min_size)); // piCout << fi_prts; lfi << fi; session << lfi; lfi.clear(); cur_size = min_size; llong fs = fi.size; for (int j = 1; j < fi_prts; j++) { fi_index++; fi.start = fs; fi.size = piMin(parts[i].size - fs, packet_size - min_size); lfi << fi; cur_size += fi.size + part_header_size; if (fi_index != fi_prts) { session << lfi; lfi.clear(); cur_size = min_size; fs += fi.size; } } } if (packet_size - cur_size < min_size) { session << lfi; lfi.clear(); cur_size = min_size; } } if (cur_size > min_size) session << lfi; } void PIBaseTransfer::sendBreak(int session_id) { uint psid = header.session_id; header.session_id = session_id; sendReply(pt_Break); header.session_id = psid; } void PIBaseTransfer::sendReply(PacketType reply) { header.type = reply; PIByteArray ba; ba << header; sendRequest(ba); } bool PIBaseTransfer::getStartRequest() { PITimeMeasurer tm; header.type = pt_Start; header.id = 0; PIByteArray ba; StartRequest st; st.packets = (uint)session.size(); st.size = bytes_all; ba << header; ba << st; state_string = "send request"; for (int i = 0; i < 5; i++) { tm.reset(); sendRequest(ba); while (tm.elapsed_s() < timeout_) { if (break_) return false; //piCoutObj << send_replyes[0]; if (replies[0] == pt_ReplySuccess) { state_string = "send permited!"; return true; } piMSleep(10); } } return false; } void PIBaseTransfer::processData(int id, PIByteArray & data) { // piCoutObj << "received packet" << id << ", size" << data.size(); if (id < 1 || id > replies.size_s()) return; if (!session[id - 1].isEmpty()) { header.id = id; replies[id] = pt_ReplySuccess; sendReply(pt_ReplySuccess); if (checkSession() == 0) state_string = "receive ok"; return; } Part fi; PIByteArray ba, pheader; pheader.resize(packet_header_size - sizeof(PacketHeader)); if (!pheader.isEmpty()) { memcpy(pheader.data(), data.data(), pheader.size()); data.remove(0, pheader.size_s()); } while (!data.isEmpty()) { ba.clear(); data >> fi; /*if (fi.size > 0) */data >> ba; // fi.fsize = ba.size(); bytes_cur += fi.size; // piCoutObj << "recv" << fi; session[id - 1] << fi; state_string = "receiving..."; receivePart(fi, ba, pheader); } header.id = id; replies[id] = pt_ReplySuccess; if (checkSession() == 0) state_string = "receive ok"; sendReply(pt_ReplySuccess); } PIByteArray PIBaseTransfer::build_packet(int id) { PIByteArray ret; PIByteArray ba; header.id = id + 1; header.type = pt_Data; // piCoutObj << "Packet" << header.id; // piCoutObj << "session id" << header.session_id; //ret << header; ret.append(customHeader()); for (int i = 0; i < session[id].size_s(); i++) { Part fi = session[id][i]; ret << fi; // piCout << "biuld" << fi; ba = buildPacket(fi); bytes_cur += ba.size(); if (ba.size() != fi.size) piCoutObj << "***error while build packet, wrong part size"; ret << ba; } header.crc = crc.calculate(ret); PIByteArray hdr; hdr << header; ret.insert(0, hdr); return ret; } bool PIBaseTransfer::finish_send(bool ok) { if (ok) state_string = "send done"; else state_string = "send failed"; // piCoutObj << state_string << PIString::readableSize(bytes_all); is_sending = false; header.id = 0; if (!ok) sendBreak(header.session_id); else sendReply(pt_ReplySuccess); sendFinished(ok); bytes_all = bytes_cur = 0; return ok; } void PIBaseTransfer::finish_receive(bool ok) { if (ok) state_string = "receive done"; else state_string = "receive failed"; // piCoutObj << state_string << PIString::readableSize(bytes_all); is_receiving = false; if (!ok) sendBreak(header.session_id); receiveFinished(ok); bytes_all = bytes_cur = 0; }