Files
pip/libs/main/io_utils/pibasetransfer.cpp
peri4 1c7fc39b6c version 4.0.0_alpha
in almost all methods removed timeouts in milliseconds, replaced to PISystemTime
PITimer rewrite, remove internal impl, now only thread implementation, API similar to PIThread
PITimer API no longer pass void*
PIPeer, PIConnection improved stability on reinit and exit
PISystemTime new methods
pisd now exit without hanging
2024-07-30 14:18:02 +03:00

637 lines
15 KiB
C++

/*
PIP - Platform Independent Primitives
Base class for reliable send and receive data in fixed packets with error correction, pause and resume
Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pibasetransfer.h"
#include "pitime.h"
const uint PIBaseTransfer::signature = 0x54424950;
PIBaseTransfer::PIBaseTransfer(): crc(standardCRC_16()), diag(false) {
header.sig = signature;
crc_enabled = true;
header.session_id = 0;
packet_header_size = sizeof(PacketHeader) + customHeader().size();
part_header_size = sizeof(Part) + sizeof(int);
is_sending = is_receiving = is_pause = false;
break_ = true;
bytes_all = bytes_cur = 0;
send_queue = 0;
send_up = 0;
timeout_ = 10.;
diag.setDisconnectTimeout(PISystemTime::fromSeconds(timeout_ / 10.));
diag.setName("PIBaseTransfer");
diag.start(50);
packets_count = 10;
#ifdef MICRO_PIP
setPacketSize(512);
#else
setPacketSize(4096);
#endif
randomize();
}
PIBaseTransfer::~PIBaseTransfer() {
diag.stopAndWait();
break_ = true;
}
void PIBaseTransfer::stopSend() {
if (!is_sending) return;
break_ = true;
}
void PIBaseTransfer::stopReceive() {
if (!is_receiving) return;
break_ = true;
// piCoutObj << "stopReceive()";
finish_receive(false);
}
void PIBaseTransfer::setPause(bool pause_) {
if (is_pause == pause_) return;
pause_tm.reset();
is_pause = pause_;
if (pause_)
paused();
else
resumed();
}
void PIBaseTransfer::setTimeout(double sec) {
timeout_ = sec;
diag.setDisconnectTimeout(PISystemTime::fromSeconds(sec));
}
void PIBaseTransfer::received(PIByteArray data) {
packet_header_size = sizeof(PacketHeader) + customHeader().size();
if (data.size() < sizeof(PacketHeader)) {
diag.received(data.size(), false);
return;
}
PacketHeader h;
data >> h;
PacketType pt = (PacketType)h.type;
if (!h.check_sig()) {
piCoutObj << "invalid packet signature";
diag.received(data.size(), false);
return;
} else
diag.received(data.size(), true);
// piCoutObj << "receive" << h.session_id << h.type << h.id;
switch (pt) {
case pt_Unknown: break;
case pt_Data:
mutex_header.lock();
if (h.session_id != header.session_id || !is_receiving) {
sendBreak(h.session_id);
mutex_header.unlock();
return;
} else {
uint rcrc = h.crc;
uint ccrc;
if (crc_enabled)
ccrc = crc.calculate(data.data(), data.size_s());
else
ccrc = 0;
if (rcrc != ccrc) {
header.id = h.id;
piCoutObj << "invalid CRC";
sendReply(pt_ReplyInvalid);
} else {
mutex_session.lock();
processData(h.id, data);
mutex_session.unlock();
}
if (is_pause) sendReply(pt_Pause);
}
mutex_header.unlock();
break;
case pt_ReplySuccess:
case pt_ReplyInvalid:
mutex_header.lock();
if (h.session_id != header.session_id) {
mutex_header.unlock();
return;
}
if (is_pause) sendReply(pt_Pause);
mutex_header.unlock();
if (pt == pt_ReplySuccess) {
mutex_send.lock();
send_queue--;
if (send_queue < 0) send_queue = 0;
send_tm.reset();
mutex_send.unlock();
}
if (is_sending) {
mutex_session.lock();
if (h.id < replies.size()) {
replies[h.id] = pt;
pm_string[h.id] = pt == pt_ReplySuccess ? '#' : '-';
int s = pm_string.find('+'), s1 = send_queue;
while (s <= (int)h.id && s > 0) {
send_queue--;
if (s >= 0) {
pm_string[s] = '-';
bytes_cur -= packet_size;
}
send_up = 0;
if (send_queue < 0) {
send_queue = 0;
break;
}
s = pm_string.find('+');
}
if (s1 - send_queue > 1 && packets_count > 2) packets_count -= piMaxi(packets_count / 10, 1);
if (s1 == send_queue && s1 < piMaxi(packets_count / 2, 2) && packets_count < 100) send_up++;
if (send_up > 20 && send_up > packets_count * 2) packets_count += piMaxi(packets_count / 10, 1);
// piCoutObj << packets_count;
} else
piCoutObj << "invalid reply id";
mutex_session.unlock();
// piCoutObj << "Done Packet" << h.id;
}
if (is_receiving && h.id == 0) {
if (pt == pt_ReplySuccess) {
mutex_session.lock();
int cs = checkSession();
mutex_session.unlock();
// piCoutObj << "Success receive";
if (cs == 0) finish_receive(true);
}
}
break;
case pt_Break:
break_ = true;
// piCoutObj << "BREAK";
if (is_receiving) {
stopReceive();
return;
}
if (is_sending) {
stopSend();
return;
}
break;
case pt_Start:
mutex_header.lock();
if (is_pause && (is_sending || is_receiving)) {
if (header.session_id == h.session_id) {
is_pause = false;
mutex_header.unlock();
resumed();
return;
}
}
if (is_sending && header.session_id != h.session_id) {
sendBreak(h.session_id);
mutex_header.unlock();
return;
}
if (is_receiving) {
if (header.session_id != h.session_id) {
piCoutObj << "restart receive";
mutex_header.unlock();
finish_receive(false, true);
} else {
header.id = 0;
sendReply(pt_ReplySuccess);
mutex_header.unlock();
return;
}
}
mutex_header.unlock();
if (data.size() == sizeof(StartRequest)) {
StartRequest sr;
data >> sr;
mutex_session.lock();
mutex_header.lock();
bytes_cur = 0;
state_string = "start request";
bytes_all = sr.size;
header.session_id = h.session_id;
header.id = 0;
packets_count = 10;
session.clear();
replies.clear();
session.resize(sr.packets);
replies.resize(sr.packets + 1);
replies.fill(pt_Unknown);
pm_string.resize(replies.size(), '-');
diag.reset();
// piCoutObj << "receiveStarted()";
is_receiving = true;
break_ = false;
mutex_send.lock();
send_queue = 0;
mutex_send.unlock();
beginReceive();
receiveStarted();
state_string = "receiving";
replies[0] = pt_ReplySuccess;
pm_string[0] = '#';
mutex_session.unlock();
sendReply(pt_ReplySuccess);
mutex_header.unlock();
}
break;
case pt_Pause:
mutex_header.lock();
if (header.session_id == h.session_id) {
// piCout << "receive pause";
if (!is_pause && pause_tm.elapsed_s() < timeout_ / 10) {
// piCout << "resume";
sendReply(pt_Start);
mutex_header.unlock();
return;
}
if (!is_pause) paused();
is_pause = true;
if (is_receiving && pause_tm.elapsed_m() > 40) {
// piCout << "send pause";
sendReply(pt_Pause);
}
if (is_sending) send_tm.reset();
pause_tm.reset();
}
mutex_header.unlock();
break;
default: break;
}
}
bool PIBaseTransfer::send_process() {
mutex_session.lock();
packet_header_size = sizeof(PacketHeader) + customHeader().size();
break_ = false;
diag.reset();
sendStarted();
is_sending = true;
int session_size = session.size();
replies.resize(session_size + 1);
replies.fill(pt_Unknown);
pm_string.resize(replies.size(), '-');
mutex_session.unlock();
PIByteArray ba;
if (!getStartRequest()) return finish_send(false);
state_string = "sending";
PITimeMeasurer stm;
mutex_send.lock();
send_queue = 0;
send_up = 0;
send_tm.reset();
mutex_send.unlock();
for (int i = 0; i < session_size; i++) {
mutex_send.lock();
int sq = send_queue;
mutex_send.unlock();
if (sq >= packets_count || is_pause) {
--i;
if (is_pause) {
piMSleep(40);
// piCout << "send pause";
mutex_header.lock();
sendReply(pt_Pause);
mutex_header.unlock();
if (pause_tm.elapsed_s() > timeout()) {
return finish_send(false);
}
}
mutex_send.lock();
if (send_tm.elapsed_s() > timeout_) {
mutex_send.unlock();
return finish_send(false);
}
if (stm.elapsed_s() > timeout_ / 2.) {
send_up = 0;
send_queue = 0;
pm_string.replaceAll("+", "-");
}
mutex_send.unlock();
continue;
}
stm.reset();
ba = build_packet(i);
diag.sended(ba.size_s());
sendRequest(ba);
pm_string[i + 1] = '+';
mutex_send.lock();
send_queue++;
mutex_send.unlock();
if (break_) return finish_send(false);
}
// piCoutObj << "send done, checking";
PITimeMeasurer rtm;
int prev_chk = 0;
mutex_send.lock();
send_queue = 0;
mutex_send.unlock();
piMSleep(10);
state_string = "sending more";
while (rtm.elapsed_s() < timeout_) {
// piCoutObj << "recovering...";
mutex_session.lock();
int chk = checkSession();
mutex_session.unlock();
if (chk == 0) return finish_send(true);
if (chk != prev_chk)
rtm.reset();
else if (rtm.elapsed_m() < 100) {
piMinSleep();
continue;
}
if (is_pause) {
piMSleep(40);
// piCout << "send pause";
mutex_header.lock();
sendReply(pt_Pause);
mutex_header.unlock();
if (pause_tm.elapsed_s() > timeout())
return finish_send(false);
else
continue;
}
prev_chk = chk;
if (chk > 0) {
// piCoutObj << "recovery packet" << chk;
mutex_send.lock();
int sq = send_queue;
mutex_send.unlock();
if (sq >= packets_count) {
piMSleep(10);
send_queue = 0;
send_up = 0;
pm_string.replaceAll("+", "-");
continue;
}
ba = build_packet(chk - 1);
diag.sended(ba.size_s());
sendRequest(ba);
pm_string[chk] = '+';
mutex_send.lock();
send_queue++;
mutex_send.unlock();
}
if (break_) return finish_send(false);
}
return finish_send(false);
}
int PIBaseTransfer::checkSession() {
if (!(is_receiving || is_sending)) return -1;
int miss = 0;
for (int i = 1; i < replies.size_s(); i++) {
if (replies[i] != pt_ReplySuccess) miss++;
if (replies[i] == pt_ReplyInvalid) return i;
}
for (int i = 1; i < replies.size_s(); i++) {
if (replies[i] != pt_ReplySuccess) return i;
}
if (miss > 0) {
// piCoutObj << "missing" << miss << "packets";
return -miss;
} else
return 0;
}
void PIBaseTransfer::buildSession(PIVector<Part> parts) {
mutex_session.lock();
mutex_header.lock();
state_string = "calculating parts ... ";
session.clear();
header.session_id = randomi();
bytes_all = 0;
Part fi;
int fi_index, fi_prts;
PIVector<Part> lfi;
int min_size = packet_header_size + part_header_size;
int cur_size = min_size;
for (int i = 0; i < parts.size_s(); i++) {
state_string = "calculating parts ... " + PIString::fromNumber(i) + " of " + PIString::fromNumber(parts.size());
fi.id = parts[i].id;
// piCout << fi.id << state_string;
bytes_all += parts[i].size;
fi.start = 0;
llong rest = parts[i].size - (packet_size - cur_size);
// piCout << i << fi.size << rest << bytes_all;
if (rest <= 0) {
fi.size = parts[i].size;
lfi << fi;
cur_size += fi.size + part_header_size;
} else {
fi.size = parts[i].size - rest;
fi_index = 1;
fi_prts = 1 + 1 + piMaxi(1, rest / (packet_size - min_size));
// piCout << fi_prts;
lfi << fi;
session << lfi;
lfi.clear();
cur_size = min_size;
llong fs = fi.size;
for (int j = 1; j < fi_prts; j++) {
fi_index++;
fi.start = fs;
fi.size = piMin<ullong>(parts[i].size - fs, packet_size - min_size);
lfi << fi;
cur_size += fi.size + part_header_size;
if (fi_index != fi_prts) {
session << lfi;
lfi.clear();
cur_size = min_size;
fs += fi.size;
}
}
}
if (packet_size - cur_size < min_size) {
session << lfi;
lfi.clear();
cur_size = min_size;
}
}
if (cur_size > min_size) session << lfi;
mutex_header.unlock();
mutex_session.unlock();
}
void PIBaseTransfer::sendBreak(int session_id) {
// piCoutObj << "sendBreak";
uint psid = header.session_id;
header.session_id = session_id;
sendReply(pt_Break);
header.session_id = psid;
}
void PIBaseTransfer::sendReply(PacketType reply) {
// piCoutObj << "sendReply" << reply;
header.type = reply;
PIByteArray ba;
ba << header;
if (is_sending || is_receiving) diag.sended(ba.size_s());
sendRequest(ba);
}
bool PIBaseTransfer::getStartRequest() {
mutex_header.lock();
header.type = pt_Start;
header.id = 0;
PIByteArray ba;
StartRequest st;
st.packets = (uint)session.size();
st.size = bytes_all;
ba << header;
mutex_header.unlock();
ba << st;
state_string = "send request";
PITimeMeasurer tm;
while (tm.elapsed_s() < timeout_) {
diag.sended(ba.size_s());
sendRequest(ba);
if (break_) return false;
// piCoutObj << replies[0];
mutex_session.lock();
if (replies[0] == pt_ReplySuccess) {
state_string = "send permited!";
// piCoutObj << "ping " << tm.elapsed_m();
mutex_session.unlock();
return true;
}
mutex_session.unlock();
piMinSleep();
}
return false;
}
void PIBaseTransfer::processData(int id, PIByteArray & data) {
// piCoutObj << "received packet" << id << ", size" << data.size();
if (id < 1 || id > replies.size_s()) return;
if (!session[id - 1].isEmpty()) {
header.id = id;
replies[id] = pt_ReplySuccess;
pm_string[id] = '#';
sendReply(pt_ReplySuccess);
if (replies[replies.size() - 1] == pt_ReplySuccess)
if (checkSession() == 0) state_string = "receive ok";
return;
}
Part fi;
PIByteArray ba, pheader;
pheader.resize(packet_header_size - sizeof(PacketHeader));
if (!pheader.isEmpty()) {
memcpy(pheader.data(), data.data(), pheader.size());
data.remove(0, pheader.size_s());
}
while (!data.isEmpty()) {
ba.clear();
data >> fi;
data >> ba;
bytes_cur += fi.size;
// piCoutObj << "recv" << fi;
session[id - 1] << fi;
state_string = "receiving...";
receivePart(fi, ba, pheader);
}
header.id = id;
replies[id] = pt_ReplySuccess;
pm_string[id] = '#';
sendReply(pt_ReplySuccess);
if (checkSession() == 0) state_string = "receive ok";
}
PIByteArray PIBaseTransfer::build_packet(int id) {
PIByteArray ret;
PIByteArray ba;
// piCoutObj << "session id" << header.session_id;
ret.append(customHeader());
mutex_session.lock();
for (int i = 0; i < session[id].size_s(); i++) {
Part fi = session[id][i];
ret << fi;
// piCout << "build" << fi;
ba = buildPacket(fi);
bytes_cur += ba.size();
if (ba.size() != fi.size) piCoutObj << "***error while build packet, wrong part size";
ret << ba;
}
mutex_session.unlock();
PIByteArray hdr;
mutex_header.lock();
header.id = id + 1;
header.type = pt_Data;
if (crc_enabled)
header.crc = crc.calculate(ret);
else
header.crc = 0;
hdr << header;
mutex_header.unlock();
ret.insert(0, hdr);
// piCoutObj << "Send Packet" << header.id << ret.size();
return ret;
}
bool PIBaseTransfer::finish_send(bool ok) {
is_sending = false;
if (ok)
state_string = "send done";
else
state_string = "send failed";
mutex_header.lock();
header.id = 0;
if (!ok)
sendBreak(header.session_id);
else
sendReply(pt_ReplySuccess);
mutex_header.unlock();
// piCoutObj << state_string << PIString::readableSize(bytes_all);
sendFinished(ok);
bytes_all = bytes_cur = 0;
return ok;
}
void PIBaseTransfer::finish_receive(bool ok, bool quet) {
is_receiving = false;
if (ok)
state_string = "receive done";
else
state_string = "receive failed";
if (!ok && !quet) {
mutex_header.lock();
sendBreak(header.session_id);
mutex_header.unlock();
}
// piCoutObj << state_string << PIString::readableSize(bytes_all);
receiveFinished(ok);
bytes_all = bytes_cur = 0;
}