Files
pip/libs/main/io_utils/pibasetransfer.cpp
peri4 e6c8714857 version 4.1.0
maybe fix hang on PIEthernet::interrupt()
replace piLetobe with piChangeEndian:
 * piChangeEndianBinary
 * piChangeBinary
 * piChangedBinary
PIDiagnostics::start now accept PISystemTime instead of number
add PITimer::start(PISystemTime, std::function<void()>) overload
2024-08-28 11:56:36 +03:00

638 lines
16 KiB
C++

/*
PIP - Platform Independent Primitives
Base class for reliable send and receive data in fixed packets with error correction, pause and resume
Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pibasetransfer.h"
#include "piliterals_time.h"
#include "pitime.h"
const uint PIBaseTransfer::signature = 0x54424950;
PIBaseTransfer::PIBaseTransfer(): crc(standardCRC_16()), diag(false) {
header.sig = signature;
crc_enabled = true;
header.session_id = 0;
packet_header_size = sizeof(PacketHeader) + customHeader().size();
part_header_size = sizeof(Part) + sizeof(int);
is_sending = is_receiving = is_pause = false;
break_ = true;
bytes_all = bytes_cur = 0;
send_queue = 0;
send_up = 0;
timeout_ = 10.;
diag.setDisconnectTimeout(PISystemTime::fromSeconds(timeout_ / 10.));
diag.setName("PIBaseTransfer");
diag.start(20_Hz);
packets_count = 10;
#ifdef MICRO_PIP
setPacketSize(512);
#else
setPacketSize(4096);
#endif
randomize();
}
PIBaseTransfer::~PIBaseTransfer() {
diag.stopAndWait();
break_ = true;
}
void PIBaseTransfer::stopSend() {
if (!is_sending) return;
break_ = true;
}
void PIBaseTransfer::stopReceive() {
if (!is_receiving) return;
break_ = true;
// piCoutObj << "stopReceive()";
finish_receive(false);
}
void PIBaseTransfer::setPause(bool pause_) {
if (is_pause == pause_) return;
pause_tm.reset();
is_pause = pause_;
if (pause_)
paused();
else
resumed();
}
void PIBaseTransfer::setTimeout(double sec) {
timeout_ = sec;
diag.setDisconnectTimeout(PISystemTime::fromSeconds(sec));
}
void PIBaseTransfer::received(PIByteArray data) {
packet_header_size = sizeof(PacketHeader) + customHeader().size();
if (data.size() < sizeof(PacketHeader)) {
diag.received(data.size(), false);
return;
}
PacketHeader h;
data >> h;
PacketType pt = (PacketType)h.type;
if (!h.check_sig()) {
piCoutObj << "invalid packet signature";
diag.received(data.size(), false);
return;
} else
diag.received(data.size(), true);
// piCoutObj << "receive" << h.session_id << h.type << h.id;
switch (pt) {
case pt_Unknown: break;
case pt_Data:
mutex_header.lock();
if (h.session_id != header.session_id || !is_receiving) {
sendBreak(h.session_id);
mutex_header.unlock();
return;
} else {
uint rcrc = h.crc;
uint ccrc;
if (crc_enabled)
ccrc = crc.calculate(data.data(), data.size_s());
else
ccrc = 0;
if (rcrc != ccrc) {
header.id = h.id;
piCoutObj << "invalid CRC";
sendReply(pt_ReplyInvalid);
} else {
mutex_session.lock();
processData(h.id, data);
mutex_session.unlock();
}
if (is_pause) sendReply(pt_Pause);
}
mutex_header.unlock();
break;
case pt_ReplySuccess:
case pt_ReplyInvalid:
mutex_header.lock();
if (h.session_id != header.session_id) {
mutex_header.unlock();
return;
}
if (is_pause) sendReply(pt_Pause);
mutex_header.unlock();
if (pt == pt_ReplySuccess) {
mutex_send.lock();
send_queue--;
if (send_queue < 0) send_queue = 0;
send_tm.reset();
mutex_send.unlock();
}
if (is_sending) {
mutex_session.lock();
if (h.id < replies.size()) {
replies[h.id] = pt;
pm_string[h.id] = pt == pt_ReplySuccess ? '#' : '-';
int s = pm_string.find('+'), s1 = send_queue;
while (s <= (int)h.id && s > 0) {
send_queue--;
if (s >= 0) {
pm_string[s] = '-';
bytes_cur -= packet_size;
}
send_up = 0;
if (send_queue < 0) {
send_queue = 0;
break;
}
s = pm_string.find('+');
}
if (s1 - send_queue > 1 && packets_count > 2) packets_count -= piMaxi(packets_count / 10, 1);
if (s1 == send_queue && s1 < piMaxi(packets_count / 2, 2) && packets_count < 100) send_up++;
if (send_up > 20 && send_up > packets_count * 2) packets_count += piMaxi(packets_count / 10, 1);
// piCoutObj << packets_count;
} else
piCoutObj << "invalid reply id";
mutex_session.unlock();
// piCoutObj << "Done Packet" << h.id;
}
if (is_receiving && h.id == 0) {
if (pt == pt_ReplySuccess) {
mutex_session.lock();
int cs = checkSession();
mutex_session.unlock();
// piCoutObj << "Success receive";
if (cs == 0) finish_receive(true);
}
}
break;
case pt_Break:
break_ = true;
// piCoutObj << "BREAK";
if (is_receiving) {
stopReceive();
return;
}
if (is_sending) {
stopSend();
return;
}
break;
case pt_Start:
mutex_header.lock();
if (is_pause && (is_sending || is_receiving)) {
if (header.session_id == h.session_id) {
is_pause = false;
mutex_header.unlock();
resumed();
return;
}
}
if (is_sending && header.session_id != h.session_id) {
sendBreak(h.session_id);
mutex_header.unlock();
return;
}
if (is_receiving) {
if (header.session_id != h.session_id) {
piCoutObj << "restart receive";
mutex_header.unlock();
finish_receive(false, true);
} else {
header.id = 0;
sendReply(pt_ReplySuccess);
mutex_header.unlock();
return;
}
}
mutex_header.unlock();
if (data.size() == sizeof(StartRequest)) {
StartRequest sr;
data >> sr;
mutex_session.lock();
mutex_header.lock();
bytes_cur = 0;
state_string = "start request";
bytes_all = sr.size;
header.session_id = h.session_id;
header.id = 0;
packets_count = 10;
session.clear();
replies.clear();
session.resize(sr.packets);
replies.resize(sr.packets + 1);
replies.fill(pt_Unknown);
pm_string.resize(replies.size(), '-');
diag.reset();
// piCoutObj << "receiveStarted()";
is_receiving = true;
break_ = false;
mutex_send.lock();
send_queue = 0;
mutex_send.unlock();
beginReceive();
receiveStarted();
state_string = "receiving";
replies[0] = pt_ReplySuccess;
pm_string[0] = '#';
mutex_session.unlock();
sendReply(pt_ReplySuccess);
mutex_header.unlock();
}
break;
case pt_Pause:
mutex_header.lock();
if (header.session_id == h.session_id) {
// piCout << "receive pause";
if (!is_pause && pause_tm.elapsed_s() < timeout_ / 10) {
// piCout << "resume";
sendReply(pt_Start);
mutex_header.unlock();
return;
}
if (!is_pause) paused();
is_pause = true;
if (is_receiving && pause_tm.elapsed_m() > 40) {
// piCout << "send pause";
sendReply(pt_Pause);
}
if (is_sending) send_tm.reset();
pause_tm.reset();
}
mutex_header.unlock();
break;
default: break;
}
}
bool PIBaseTransfer::send_process() {
mutex_session.lock();
packet_header_size = sizeof(PacketHeader) + customHeader().size();
break_ = false;
diag.reset();
sendStarted();
is_sending = true;
int session_size = session.size();
replies.resize(session_size + 1);
replies.fill(pt_Unknown);
pm_string.resize(replies.size(), '-');
mutex_session.unlock();
PIByteArray ba;
if (!getStartRequest()) return finish_send(false);
state_string = "sending";
PITimeMeasurer stm;
mutex_send.lock();
send_queue = 0;
send_up = 0;
send_tm.reset();
mutex_send.unlock();
for (int i = 0; i < session_size; i++) {
mutex_send.lock();
int sq = send_queue;
mutex_send.unlock();
if (sq >= packets_count || is_pause) {
--i;
if (is_pause) {
piMSleep(40);
// piCout << "send pause";
mutex_header.lock();
sendReply(pt_Pause);
mutex_header.unlock();
if (pause_tm.elapsed_s() > timeout()) {
return finish_send(false);
}
}
mutex_send.lock();
if (send_tm.elapsed_s() > timeout_) {
mutex_send.unlock();
return finish_send(false);
}
if (stm.elapsed_s() > timeout_ / 2.) {
send_up = 0;
send_queue = 0;
pm_string.replaceAll("+", "-");
}
mutex_send.unlock();
continue;
}
stm.reset();
ba = build_packet(i);
diag.sended(ba.size_s());
sendRequest(ba);
pm_string[i + 1] = '+';
mutex_send.lock();
send_queue++;
mutex_send.unlock();
if (break_) return finish_send(false);
}
// piCoutObj << "send done, checking";
PITimeMeasurer rtm;
int prev_chk = 0;
mutex_send.lock();
send_queue = 0;
mutex_send.unlock();
piMSleep(10);
state_string = "sending more";
while (rtm.elapsed_s() < timeout_) {
// piCoutObj << "recovering...";
mutex_session.lock();
int chk = checkSession();
mutex_session.unlock();
if (chk == 0) return finish_send(true);
if (chk != prev_chk)
rtm.reset();
else if (rtm.elapsed_m() < 100) {
piMinSleep();
continue;
}
if (is_pause) {
piMSleep(40);
// piCout << "send pause";
mutex_header.lock();
sendReply(pt_Pause);
mutex_header.unlock();
if (pause_tm.elapsed_s() > timeout())
return finish_send(false);
else
continue;
}
prev_chk = chk;
if (chk > 0) {
// piCoutObj << "recovery packet" << chk;
mutex_send.lock();
int sq = send_queue;
mutex_send.unlock();
if (sq >= packets_count) {
piMSleep(10);
send_queue = 0;
send_up = 0;
pm_string.replaceAll("+", "-");
continue;
}
ba = build_packet(chk - 1);
diag.sended(ba.size_s());
sendRequest(ba);
pm_string[chk] = '+';
mutex_send.lock();
send_queue++;
mutex_send.unlock();
}
if (break_) return finish_send(false);
}
return finish_send(false);
}
int PIBaseTransfer::checkSession() {
if (!(is_receiving || is_sending)) return -1;
int miss = 0;
for (int i = 1; i < replies.size_s(); i++) {
if (replies[i] != pt_ReplySuccess) miss++;
if (replies[i] == pt_ReplyInvalid) return i;
}
for (int i = 1; i < replies.size_s(); i++) {
if (replies[i] != pt_ReplySuccess) return i;
}
if (miss > 0) {
// piCoutObj << "missing" << miss << "packets";
return -miss;
} else
return 0;
}
void PIBaseTransfer::buildSession(PIVector<Part> parts) {
mutex_session.lock();
mutex_header.lock();
state_string = "calculating parts ... ";
session.clear();
header.session_id = randomi();
bytes_all = 0;
Part fi;
int fi_index, fi_prts;
PIVector<Part> lfi;
int min_size = packet_header_size + part_header_size;
int cur_size = min_size;
for (int i = 0; i < parts.size_s(); i++) {
state_string = "calculating parts ... " + PIString::fromNumber(i) + " of " + PIString::fromNumber(parts.size());
fi.id = parts[i].id;
// piCout << fi.id << state_string;
bytes_all += parts[i].size;
fi.start = 0;
llong rest = parts[i].size - (packet_size - cur_size);
// piCout << i << fi.size << rest << bytes_all;
if (rest <= 0) {
fi.size = parts[i].size;
lfi << fi;
cur_size += fi.size + part_header_size;
} else {
fi.size = parts[i].size - rest;
fi_index = 1;
fi_prts = 1 + 1 + piMaxi(1, rest / (packet_size - min_size));
// piCout << fi_prts;
lfi << fi;
session << lfi;
lfi.clear();
cur_size = min_size;
llong fs = fi.size;
for (int j = 1; j < fi_prts; j++) {
fi_index++;
fi.start = fs;
fi.size = piMin<ullong>(parts[i].size - fs, packet_size - min_size);
lfi << fi;
cur_size += fi.size + part_header_size;
if (fi_index != fi_prts) {
session << lfi;
lfi.clear();
cur_size = min_size;
fs += fi.size;
}
}
}
if (packet_size - cur_size < min_size) {
session << lfi;
lfi.clear();
cur_size = min_size;
}
}
if (cur_size > min_size) session << lfi;
mutex_header.unlock();
mutex_session.unlock();
}
void PIBaseTransfer::sendBreak(int session_id) {
// piCoutObj << "sendBreak";
uint psid = header.session_id;
header.session_id = session_id;
sendReply(pt_Break);
header.session_id = psid;
}
void PIBaseTransfer::sendReply(PacketType reply) {
// piCoutObj << "sendReply" << reply;
header.type = reply;
PIByteArray ba;
ba << header;
if (is_sending || is_receiving) diag.sended(ba.size_s());
sendRequest(ba);
}
bool PIBaseTransfer::getStartRequest() {
mutex_header.lock();
header.type = pt_Start;
header.id = 0;
PIByteArray ba;
StartRequest st;
st.packets = (uint)session.size();
st.size = bytes_all;
ba << header;
mutex_header.unlock();
ba << st;
state_string = "send request";
PITimeMeasurer tm;
while (tm.elapsed_s() < timeout_) {
diag.sended(ba.size_s());
sendRequest(ba);
if (break_) return false;
// piCoutObj << replies[0];
mutex_session.lock();
if (replies[0] == pt_ReplySuccess) {
state_string = "send permited!";
// piCoutObj << "ping " << tm.elapsed_m();
mutex_session.unlock();
return true;
}
mutex_session.unlock();
piMinSleep();
}
return false;
}
void PIBaseTransfer::processData(int id, PIByteArray & data) {
// piCoutObj << "received packet" << id << ", size" << data.size();
if (id < 1 || id > replies.size_s()) return;
if (!session[id - 1].isEmpty()) {
header.id = id;
replies[id] = pt_ReplySuccess;
pm_string[id] = '#';
sendReply(pt_ReplySuccess);
if (replies[replies.size() - 1] == pt_ReplySuccess)
if (checkSession() == 0) state_string = "receive ok";
return;
}
Part fi;
PIByteArray ba, pheader;
pheader.resize(packet_header_size - sizeof(PacketHeader));
if (!pheader.isEmpty()) {
memcpy(pheader.data(), data.data(), pheader.size());
data.remove(0, pheader.size_s());
}
while (!data.isEmpty()) {
ba.clear();
data >> fi;
data >> ba;
bytes_cur += fi.size;
// piCoutObj << "recv" << fi;
session[id - 1] << fi;
state_string = "receiving...";
receivePart(fi, ba, pheader);
}
header.id = id;
replies[id] = pt_ReplySuccess;
pm_string[id] = '#';
sendReply(pt_ReplySuccess);
if (checkSession() == 0) state_string = "receive ok";
}
PIByteArray PIBaseTransfer::build_packet(int id) {
PIByteArray ret;
PIByteArray ba;
// piCoutObj << "session id" << header.session_id;
ret.append(customHeader());
mutex_session.lock();
for (int i = 0; i < session[id].size_s(); i++) {
Part fi = session[id][i];
ret << fi;
// piCout << "build" << fi;
ba = buildPacket(fi);
bytes_cur += ba.size();
if (ba.size() != fi.size) piCoutObj << "***error while build packet, wrong part size";
ret << ba;
}
mutex_session.unlock();
PIByteArray hdr;
mutex_header.lock();
header.id = id + 1;
header.type = pt_Data;
if (crc_enabled)
header.crc = crc.calculate(ret);
else
header.crc = 0;
hdr << header;
mutex_header.unlock();
ret.insert(0, hdr);
// piCoutObj << "Send Packet" << header.id << ret.size();
return ret;
}
bool PIBaseTransfer::finish_send(bool ok) {
is_sending = false;
if (ok)
state_string = "send done";
else
state_string = "send failed";
mutex_header.lock();
header.id = 0;
if (!ok)
sendBreak(header.session_id);
else
sendReply(pt_ReplySuccess);
mutex_header.unlock();
// piCoutObj << state_string << PIString::readableSize(bytes_all);
sendFinished(ok);
bytes_all = bytes_cur = 0;
return ok;
}
void PIBaseTransfer::finish_receive(bool ok, bool quet) {
is_receiving = false;
if (ok)
state_string = "receive done";
else
state_string = "receive failed";
if (!ok && !quet) {
mutex_header.lock();
sendBreak(header.session_id);
mutex_header.unlock();
}
// piCoutObj << state_string << PIString::readableSize(bytes_all);
receiveFinished(ok);
bytes_all = bytes_cur = 0;
}