Files
pip/libs/io_utils/pistreampacker.cpp
peri4 1c7fc39b6c version 4.0.0_alpha
in almost all methods removed timeouts in milliseconds, replaced to PISystemTime
PITimer rewrite, remove internal impl, now only thread implementation, API similar to PIThread
PITimer API no longer pass void*
PIPeer, PIConnection improved stability on reinit and exit
PISystemTime new methods
pisd now exit without hanging
2024-07-30 14:18:02 +03:00

217 lines
5.9 KiB
C++

/*
PIP - Platform Independent Primitives
Simple packet wrap aroud any PIIODevice
Ivan Pelipenko peri4ko@yandex.ru, Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef __GNUC__
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wnonnull"
#endif
#include "pistreampacker.h"
#include "piiodevice.h"
#include "piliterals_bytes.h"
#ifdef __GNUC__
# pragma GCC diagnostic pop
#endif
/** \class PIStreamPacker
* \brief Simple packet wrap aroud any PIIODevice
*
* \section PIStreamPacker_synopsis Synopsis
* %PIStreamPacker provides simple pack/unpack logic for any data packets.
*
* When you call \a send() function data splited into several
* parts, \a packetSign() prepended to first part and \a sendRequest()
* event raised several times.
*
* When your device receive some data, call \a received() function.
* \a packetReceiveEvent() event will be raised when packet will be
* collected.
*
* Use \a assignDevice() to connect device to this %PIStreamPacker.
*
*/
PIStreamPacker::PIStreamPacker(PIIODevice * dev): PIObject() {
crypt_frag = crypt_size = false;
aggressive_optimization = true;
packet_size = -1;
size_crypted_size = sizeof(int);
crypt_frag_size = 1_MiB;
max_packet_size = 1400;
packet_sign = 0xAFBE;
assignDevice(dev);
}
void PIStreamPacker::setCryptSizeEnabled(bool on) {
crypt_size = on;
if (crypt_size) {
PIByteArray ba;
ba << int(0);
size_crypted_size = cryptData(ba).size_s();
} else
size_crypted_size = sizeof(int);
}
void PIStreamPacker::clear() {
packet.clear();
packet_size = -1;
stream.clear();
}
void PIStreamPacker::send(const PIByteArray & data) {
if (data.isEmpty()) return;
PIByteArray cd;
if (crypt_frag) {
int fcnt = (data.size_s() - 1) / crypt_frag_size + 1, fst = 0;
// piCout << "crypt_frag send" << fcnt << "frags";
PIByteArray frag;
for (int i = 0; i < fcnt; ++i) {
if (i == fcnt - 1)
frag = PIByteArray(data.data(fst), data.size_s() - fst);
else
frag = PIByteArray(data.data(fst), crypt_frag_size);
fst += crypt_frag_size;
cd << cryptData(frag);
}
} else {
cd = cryptData(data);
}
// piCout << "crypt" << data.size() << "->" << cd.size() << key().size();
PIByteArray hdr, part;
hdr << packet_sign;
if (crypt_size) {
PIByteArray crsz;
crsz << int(cd.size_s());
hdr.append(cryptData(crsz));
} else
hdr << int(cd.size_s());
cd.insert(0, hdr);
int pcnt = (cd.size_s() - 1) / max_packet_size + 1, pst = 0;
for (int i = 0; i < pcnt; ++i) {
if (i == pcnt - 1)
part = PIByteArray(cd.data(pst), cd.size_s() - pst);
else
part = PIByteArray(cd.data(pst), max_packet_size);
// piCout << "send" << part.size();
sendRequest(part);
pst += max_packet_size;
}
}
void PIStreamPacker::received(const uchar * readed, ssize_t size) {
received(PIByteArray(readed, size));
}
void PIStreamPacker::received(const PIByteArray & data) {
stream.append(data);
// piCout << "rec" << data.size();
while (!stream.isEmpty()) {
int hdr_size = sizeof(packet_sign) + size_crypted_size;
if (packet_size < 0) {
if (stream.size_s() < hdr_size) return;
ushort sign(0);
memcpy(&sign, stream.data(), 2);
if (sign != packet_sign) {
if (aggressive_optimization)
stream.clear();
else
stream.pop_front();
continue;
}
int sz = -1;
if (crypt_size) {
PIByteArray crsz((uint)size_crypted_size);
memcpy(crsz.data(), stream.data(2), size_crypted_size);
crsz = decryptData(crsz);
if (crsz.size() < sizeof(sz)) {
if (aggressive_optimization)
stream.clear();
else
stream.pop_front();
continue;
}
crsz >> sz;
} else {
memcpy(&sz, stream.data(2), size_crypted_size);
}
if (sz < 0) {
if (aggressive_optimization)
stream.clear();
else
stream.pop_front();
continue;
}
stream.remove(0, hdr_size);
packet.clear();
packet_size = sz;
if (packet_size == 0) packet_size = -1;
continue;
} else {
int ps = piMini(stream.size_s(), packet_size - packet.size_s());
packet.append(stream.data(), ps);
stream.remove(0, ps);
if (packet.size_s() == packet_size) {
PIByteArray cd;
if (crypt_frag) {
// piCout << "decrypt frags ..." << packet_size;
while (packet.size_s() >= 4) {
// piCout << "decrypt frags take data ...";
PIByteArray frag;
// piCout << "decrypt frags take data done" << frag.size_s();
packet >> frag;
if (frag.isEmpty()) {
// piCout << "decrypt frags corrupt, break";
cd.clear();
break;
}
cd.append(decryptData(frag));
// piCout << "decrypt frags add" << frag.size_s();
}
// piCout << "decrypt frags done" << cd.size();
} else {
cd = decryptData(packet);
}
// piCout << "decrypt" << packet.size() << "->" << cd.size() << key().size();
if (!cd.isEmpty()) {
packetReceived(cd);
packetReceiveEvent(cd);
}
packet.clear();
packet_size = -1;
}
}
}
}
void PIStreamPacker::assignDevice(PIIODevice * dev) {
if (!dev) return;
if (!dev->infoFlags()[PIIODevice::Reliable]) {
piCoutObj << "Warning! Not recommended to use with non-reliable" << dev;
}
CONNECT2(void, const uchar *, ssize_t, dev, threadedReadEvent, this, received);
CONNECT1(void, PIByteArray, this, sendRequest, dev, write);
}