From 38fd1b5dc4e850ed00bc86669a641629a57aec15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=91=D1=8B=D1=87=D0=BA=D0=BE=D0=B2=20=D0=90=D0=BD=D0=B4?= =?UTF-8?q?=D1=80=D0=B5=D0=B9?= Date: Thu, 28 Jul 2022 17:02:33 +0300 Subject: [PATCH] =?UTF-8?q?PIPacketExtractor=20=D1=82=D0=B5=D0=BF=D0=B5?= =?UTF-8?q?=D1=80=D1=8C=20=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D0=B0=D0=B5=D1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libs/main/io_devices/piiodevice.cpp | 2 +- libs/main/io_utils/piconnection.cpp | 4 - libs/main/io_utils/pipacketextractor.cpp | 104 ++++++++++------------- libs/main/io_utils/pipacketextractor.h | 32 ++----- main.cpp | 34 ++++++-- 5 files changed, 81 insertions(+), 95 deletions(-) diff --git a/libs/main/io_devices/piiodevice.cpp b/libs/main/io_devices/piiodevice.cpp index daae327c..5f88c8cb 100644 --- a/libs/main/io_devices/piiodevice.cpp +++ b/libs/main/io_devices/piiodevice.cpp @@ -597,7 +597,7 @@ PIMap & PIIODevice::fabrics() { bool PIIODevice::threadedRead(const uchar *readed, int size) { // piCout << "iodevice threaded read"; - if (func_read != 0) return func_read(readed, size, ret_data_); + if (func_read) return func_read(readed, size, ret_data_); return true; } diff --git a/libs/main/io_utils/piconnection.cpp b/libs/main/io_utils/piconnection.cpp index d307e39a..adc19c2b 100644 --- a/libs/main/io_utils/piconnection.cpp +++ b/libs/main/io_utils/piconnection.cpp @@ -224,9 +224,7 @@ bool PIConnection::configure(PIConfig & conf, const PIString & name_) { } PIDiagnostics * diag = diags_.value(pe, nullptr); if (diag) diag->setDisconnectTimeout(e->getValue("disconnectTimeout", diag->disconnectTimeout()).toFloat()); - pe->setBufferSize(e->getValue("bufferSize", pe->bufferSize()).toInt()); pe->setPayloadSize(e->getValue("payloadSize", pe->payloadSize()).toInt()); - pe->setPacketSize(e->getValue("packetSize", pe->packetSize()).toInt()); pe->setTimeout(e->getValue("timeout", pe->timeout()).toDouble()); pe->setHeader(PIByteArray::fromUserInput(e->getValue("header", "").toString())); pe->setFooter(PIByteArray::fromUserInput(e->getValue("footer", "").toString())); @@ -287,7 +285,6 @@ PIString PIConnection::makeConfig() const { ts << prefix << ".device." << i << " = " << dname << " #s\n"; } PIDiagnostics * diag = diags_.value(ite.value()->extractor, nullptr); - ts << prefix << ".bufferSize = " << ite.value()->extractor->bufferSize() << " #n\n"; if (diag) ts << prefix << ".disconnectTimeout = " << diag->disconnectTimeout() << " #f\n"; ts << prefix << ".splitMode = "; switch (ite.value()->extractor->splitMode()) { @@ -300,7 +297,6 @@ PIString PIConnection::makeConfig() const { } ts << " #s\n"; ts << prefix << ".payloadSize = " << ite.value()->extractor->payloadSize() << " #n\n"; - ts << prefix << ".packetSize = " << ite.value()->extractor->packetSize() << " #n\n"; ts << prefix << ".timeout = " << ite.value()->extractor->timeout() << " #f\n"; ts << prefix << ".header = " << ite.value()->extractor->header().toString() << " #s\n"; ts << prefix << ".footer = " << ite.value()->extractor->footer().toString() << " #s\n"; diff --git a/libs/main/io_utils/pipacketextractor.cpp b/libs/main/io_utils/pipacketextractor.cpp index d42cbcbb..e3c8d30c 100644 --- a/libs/main/io_utils/pipacketextractor.cpp +++ b/libs/main/io_utils/pipacketextractor.cpp @@ -97,26 +97,23 @@ void PIPacketExtractor::construct() { setPayloadSize(0); setTimeout(100); #ifdef MICRO_PIP - setBufferSize(512); + setThreadedReadBufferSize(512); #else - setBufferSize(65536); + setThreadedReadBufferSize(65536); #endif - setDevice(0); - setPacketSize(0); + setDevice(nullptr); setSplitMode(None); - missed = missed_packets = footerInd = 0; + missed = footerInd = 0; header_found = false; } void PIPacketExtractor::propertyChanged(const char *) { - packetSize_ = property("packetSize").toInt(); mode_ = (SplitMode)(property("splitMode").toInt()); dataSize = property("payloadSize").toInt(); src_header = property("header").toByteArray(); src_footer = property("footer").toByteArray(); time_ = property("timeout").toDouble(); - packetSize_hf = src_header.size_s() + src_footer.size_s() + payloadSize(); } @@ -143,32 +140,21 @@ ssize_t PIPacketExtractor::bytesAvailable() const { } -void PIPacketExtractor::setBufferSize(int new_size) { - buffer_size = new_size; - buffer.resize(buffer_size); - memset(buffer.data(), 0, buffer.size()); - setThreadedReadBufferSize(new_size); -} - - void PIPacketExtractor::setPayloadSize(int size) { setProperty("payloadSize", size); dataSize = size; - packetSize_hf = src_header.size_s() + src_footer.size_s() + payloadSize(); } void PIPacketExtractor::setHeader(const PIByteArray & data) { setProperty("header", data); src_header = data; - packetSize_hf = src_header.size_s() + src_footer.size_s() + payloadSize(); } void PIPacketExtractor::setFooter(const PIByteArray & data) { setProperty("footer", data); src_footer = data; - packetSize_hf = src_header.size_s() + src_footer.size_s() + payloadSize(); } @@ -199,69 +185,76 @@ bool PIPacketExtractor::validatePayload(const uchar * rec, int size) { bool PIPacketExtractor::threadedRead(const uchar * readed, int size_) { //piCoutObj << "readed" << size_; int ss; + tmpbuf.append(readed, size_); switch (mode_) { case PIPacketExtractor::None: + tmpbuf.clear(); if (validatePayload(readed, size_)) { packetReceived(readed, size_); + } else { + missed += size_; } break; case PIPacketExtractor::Header: - tmpbuf.append(readed, size_); - ss = src_header.size_s(); - while (tmpbuf.size_s() >= ss) { - int ns = validateHeader(src_header.data(), tmpbuf.data(), src_header.size_s()); - while (ns < 0) { + if (src_header.isEmpty()) return PIIODevice::threadedRead(readed, size_); + while (tmpbuf.size() >= src_header.size()) { + if (!header_found) { + packetSize_pending = validateHeader(src_header.data(), tmpbuf.data(), src_header.size_s()); + while (packetSize_pending < 0) { + tmpbuf.pop_front(); + ++missed; + if (tmpbuf.size() < src_header.size()) return PIIODevice::threadedRead(readed, size_); + packetSize_pending = validateHeader(src_header.data(), tmpbuf.data(), src_header.size_s()); + } + header_found = true; + } + if (tmpbuf.size_s() < src_header.size_s() + packetSize_pending) return PIIODevice::threadedRead(readed, size_); + if (!validatePayload(tmpbuf.data(src_header.size_s()), packetSize_pending)) { tmpbuf.pop_front(); ++missed; - if (tmpbuf.size() < src_header.size()) return true; - ns = validateHeader(src_header.data(), tmpbuf.data(), src_header.size_s()); + header_found = false; + continue; } - ss = src_header.size_s() + ns; - while (!validatePayload(tmpbuf.data(src_header.size_s()), dataSize)) { - tmpbuf.pop_front(); - ++missed; - if (tmpbuf.size_s() < ss) return true; - } - packetReceived(tmpbuf.data(), ss); - tmpbuf.remove(0, ss); + packetReceived(tmpbuf.data(), src_header.size_s() + packetSize_pending); + tmpbuf.remove(0, src_header.size_s() + packetSize_pending); + header_found = false; } break; case PIPacketExtractor::Footer: - tmpbuf.append(readed, size_); + if (src_footer.isEmpty()) return PIIODevice::threadedRead(readed, size_); ss = src_footer.size_s() + dataSize; while (tmpbuf.size_s() >= ss) { while (!validateFooter(src_footer.data(), tmpbuf.data(dataSize), src_footer.size_s())) { tmpbuf.pop_front(); ++missed; - if (tmpbuf.size_s() < ss) return true; + if (tmpbuf.size_s() < ss) return PIIODevice::threadedRead(readed, size_); } - while (!validatePayload(tmpbuf.data(), dataSize)) { + if (!validatePayload(tmpbuf.data(), dataSize)) { tmpbuf.pop_front(); ++missed; - if (tmpbuf.size_s() < ss) return true; + continue; } packetReceived(tmpbuf.data(), ss); tmpbuf.remove(0, ss); } break; case PIPacketExtractor::HeaderAndFooter: - tmpbuf.append(readed, size_); ss = src_header.size_s() + src_footer.size_s(); - while (tmpbuf.size_s() >= ss) { + while (tmpbuf.size_s() >= ss && ss > 0) { if (!header_found) { - if (tmpbuf.size_s() < ss) return true; + if (tmpbuf.size_s() < ss) return PIIODevice::threadedRead(readed, size_); while (validateHeader(src_header.data(), tmpbuf.data(), src_header.size_s()) < 0) { tmpbuf.pop_front(); ++missed; - if (tmpbuf.size_s() < ss) return true; + if (tmpbuf.size_s() < ss) return PIIODevice::threadedRead(readed, size_); } header_found = true; footerInd = src_header.size_s(); } else { - if (tmpbuf.size_s() < footerInd + src_footer.size_s()) return true; + if (tmpbuf.size_s() < footerInd + src_footer.size_s()) return PIIODevice::threadedRead(readed, size_); while (!validateFooter(src_footer.data(), tmpbuf.data(footerInd), src_footer.size_s())) { ++footerInd; - if (tmpbuf.size_s() < footerInd + src_footer.size_s()) return true; + if (tmpbuf.size_s() < footerInd + src_footer.size_s()) return PIIODevice::threadedRead(readed, size_); } //piCout << "footer found at" << footerInd; header_found = false; @@ -277,31 +270,28 @@ bool PIPacketExtractor::threadedRead(const uchar * readed, int size_) { } break; case PIPacketExtractor::Size: - tmpbuf.append(readed, size_); - if (packetSize_ <= 0) { + if (dataSize <= 0) { tmpbuf.clear(); - return true; + return PIIODevice::threadedRead(readed, size_); } - while (tmpbuf.size_s() >= packetSize_) { - if (!validatePayload(tmpbuf.data(), packetSize_)) { + while (tmpbuf.size_s() >= dataSize) { + if (!validatePayload(tmpbuf.data(), dataSize)) { tmpbuf.pop_front(); ++missed; - missed_packets = missed / packetSize_; continue; } - packetReceived(tmpbuf.data(), packetSize_); - tmpbuf.remove(0, packetSize_); + packetReceived(tmpbuf.data(), dataSize); + tmpbuf.remove(0, dataSize); } break; case PIPacketExtractor::Timeout: - memcpy(buffer.data(), readed, size_); - trbuf = dev->readForTime(time_); - memcpy(buffer.data(size_), trbuf.data(), trbuf.size()); - if (size_ + trbuf.size() > 0) - packetReceived(buffer.data(), size_ + trbuf.size()); + tmpbuf.append(dev->readForTime(time_)); + if (tmpbuf.size() > 0) { + packetReceived(tmpbuf.data(), tmpbuf.size()); + } break; }; - return true; + return PIIODevice::threadedRead(readed, size_); } diff --git a/libs/main/io_utils/pipacketextractor.h b/libs/main/io_utils/pipacketextractor.h index 1aa49edb..4fd95725 100644 --- a/libs/main/io_utils/pipacketextractor.h +++ b/libs/main/io_utils/pipacketextractor.h @@ -31,7 +31,7 @@ /// TODO: написать документацию, тут ничего не понятно // Pass data, recHeaderPtr, received_data, recHeaderSize. Return true if packet is correct nor return false. typedef int (*PacketExtractorHeaderFunc)(const uchar *, const uchar *, int); -typedef bool (*PacketExtractorPayloadFunc)(const uchar *, int ); +typedef bool (*PacketExtractorPayloadFunc)(const uchar *, int); typedef bool (*PacketExtractorFooterFunc)(const uchar *, const uchar *, int); class PIP_EXPORT PIPacketExtractor: public PIIODevice @@ -64,12 +64,6 @@ public: ssize_t bytesAvailable() const override; - //! Returns buffer size - int bufferSize() const {return buffer_size;} - - //! Set buffer size to "new_size" bytes, should be at least greater than whole packet size - void setBufferSize(int new_size); - void setHeaderCheckSlot(PacketExtractorHeaderFunc f) {func_header = f;} void setPayloadCheckSlot(PacketExtractorPayloadFunc f) {func_payload = f;} void setFooterCheckSlot(PacketExtractorFooterFunc f) {func_footer = f;} @@ -87,9 +81,6 @@ public: //! Set footer data, used for PIPacketExtractor::Footer and PIPacketExtractor::HeaderAndFooter algorithms void setFooter(const PIByteArray & data); - //! Set packet size, used for PIPacketExtractor::Size algorithm - void setPacketSize(int size) {setProperty("packetSize", size);} - //! Set timeout in milliseconds, used for PIPacketExtractor::Timeout algorithm void setTimeout(double msecs) {setProperty("timeout", msecs);} @@ -97,7 +88,7 @@ public: //! Returns current extract algorithm SplitMode splitMode() const {return mode_;} - //! Returns current payload size, used for PIPacketExtractor::Header and PIPacketExtractor::Footer algorithms + //! Returns current payload size, used for PIPacketExtractor::Header and PIPacketExtractor::Footer and PIPacketExtractor::Size algorithms int payloadSize() const {return dataSize;} //! Returns current header data, used for PIPacketExtractor::Header and PIPacketExtractor::HeaderAndFooter algorithms @@ -106,25 +97,12 @@ public: //! Returns current footer data, used for PIPacketExtractor::Footer and PIPacketExtractor::HeaderAndFooter algorithms PIByteArray footer() const {return src_footer;} - //! Returns current packet size, used for PIPacketExtractor::Size algorithm - int packetSize() const {return packetSize_;} - //! Returns current timeout in milliseconds, used for PIPacketExtractor::Timeout algorithm double timeout() const {return time_;} - //! Returns missed by validating functions bytes count ullong missedBytes() const {return missed;} - //! Returns missed by validating functions packets count, = missedBytes() / packetSize - ullong missedPackets() const {return missed_packets;} - - //! Returns pointer to \a missedBytes() count. Useful for output to PIConsole - const ullong * missedBytes_ptr() const {return &missed;} - - //! Returns pointer to \a missedPackets() count. Useful for output to PIConsole - const ullong * missedPackets_ptr() const {return &missed_packets;} - //! Add data to extractor, raise \a packetReceived() if packet is ready void appendData(const uchar * d, int s) {threadedRead(d, s);} @@ -175,15 +153,15 @@ private: DeviceInfoFlags deviceInfoFlags() const override; PIIODevice * dev; - PIByteArray buffer, tmpbuf, src_header, src_footer, trbuf; + PIByteArray src_header, src_footer, tmpbuf; PacketExtractorHeaderFunc func_header; PacketExtractorPayloadFunc func_payload; PacketExtractorFooterFunc func_footer; SplitMode mode_; - int buffer_size, dataSize, packetSize_hf, footerInd, packetSize_; + int dataSize, packetSize_pending, footerInd; double time_; bool header_found; - ullong missed, missed_packets; + ullong missed; }; diff --git a/main.cpp b/main.cpp index 158c87ce..779aec40 100644 --- a/main.cpp +++ b/main.cpp @@ -1,14 +1,36 @@ #include "pip.h" -#include "pibinarystream.h" -#include "pitextstream.h" -#include "piiostream.h" -//#include "stream.h" -//#include "ccm_.h" -//#include "pisystemmonitor_.h" using namespace PICoutManipulators; int main(int argc, char * argv[]) { + PIByteArray ba = PIByteArray::fromHex("AA11FFAA22EEAA33FF"); + PIIOByteArray b; + b.open(ba); + PIPacketExtractor p(&b); + p.setSplitMode(PIPacketExtractor::Header); + p.setHeader(PIByteArray::fromHex("AABB")); +// p.setFooter(PIByteArray::fromHex("AA")); + p.setPayloadSize(2); + p.setThreadedReadSlot([](const uchar * data, int size, void * d) { + piCout << size; + return true; + }); + p.setHeaderCheckSlot([](const uchar * src, const uchar * rec, int size) { + if (*src == *rec) { + if (rec[1] == 0x11) return 1; + if (rec[1] == 0x22) return 3; + } + return -1; + }); +// p.setPayloadCheckSlot([](const uchar * data, int size) { +// return data[1] == 0xFF; +// }); + CONNECTL(&p, packetReceived, ([](const uchar * data, int size){ + piCout << PIByteArray(data, size).toHex(); + })); + p.startThreadedRead(); + piMSleep(100); + p.stop(); return 0; }