PIPacketExtractor теперь работает

This commit is contained in:
Бычков Андрей
2022-07-28 17:02:33 +03:00
parent 16c12a2756
commit 38fd1b5dc4
5 changed files with 81 additions and 95 deletions

View File

@@ -597,7 +597,7 @@ PIMap<PIConstChars, PIIODevice::FabricInfo> & PIIODevice::fabrics() {
bool PIIODevice::threadedRead(const uchar *readed, int size) {
// piCout << "iodevice threaded read";
if (func_read != 0) return func_read(readed, size, ret_data_);
if (func_read) return func_read(readed, size, ret_data_);
return true;
}

View File

@@ -224,9 +224,7 @@ bool PIConnection::configure(PIConfig & conf, const PIString & name_) {
}
PIDiagnostics * diag = diags_.value(pe, nullptr);
if (diag) diag->setDisconnectTimeout(e->getValue("disconnectTimeout", diag->disconnectTimeout()).toFloat());
pe->setBufferSize(e->getValue("bufferSize", pe->bufferSize()).toInt());
pe->setPayloadSize(e->getValue("payloadSize", pe->payloadSize()).toInt());
pe->setPacketSize(e->getValue("packetSize", pe->packetSize()).toInt());
pe->setTimeout(e->getValue("timeout", pe->timeout()).toDouble());
pe->setHeader(PIByteArray::fromUserInput(e->getValue("header", "").toString()));
pe->setFooter(PIByteArray::fromUserInput(e->getValue("footer", "").toString()));
@@ -287,7 +285,6 @@ PIString PIConnection::makeConfig() const {
ts << prefix << ".device." << i << " = " << dname << " #s\n";
}
PIDiagnostics * diag = diags_.value(ite.value()->extractor, nullptr);
ts << prefix << ".bufferSize = " << ite.value()->extractor->bufferSize() << " #n\n";
if (diag) ts << prefix << ".disconnectTimeout = " << diag->disconnectTimeout() << " #f\n";
ts << prefix << ".splitMode = ";
switch (ite.value()->extractor->splitMode()) {
@@ -300,7 +297,6 @@ PIString PIConnection::makeConfig() const {
}
ts << " #s\n";
ts << prefix << ".payloadSize = " << ite.value()->extractor->payloadSize() << " #n\n";
ts << prefix << ".packetSize = " << ite.value()->extractor->packetSize() << " #n\n";
ts << prefix << ".timeout = " << ite.value()->extractor->timeout() << " #f\n";
ts << prefix << ".header = " << ite.value()->extractor->header().toString() << " #s\n";
ts << prefix << ".footer = " << ite.value()->extractor->footer().toString() << " #s\n";

View File

@@ -97,26 +97,23 @@ void PIPacketExtractor::construct() {
setPayloadSize(0);
setTimeout(100);
#ifdef MICRO_PIP
setBufferSize(512);
setThreadedReadBufferSize(512);
#else
setBufferSize(65536);
setThreadedReadBufferSize(65536);
#endif
setDevice(0);
setPacketSize(0);
setDevice(nullptr);
setSplitMode(None);
missed = missed_packets = footerInd = 0;
missed = footerInd = 0;
header_found = false;
}
void PIPacketExtractor::propertyChanged(const char *) {
packetSize_ = property("packetSize").toInt();
mode_ = (SplitMode)(property("splitMode").toInt());
dataSize = property("payloadSize").toInt();
src_header = property("header").toByteArray();
src_footer = property("footer").toByteArray();
time_ = property("timeout").toDouble();
packetSize_hf = src_header.size_s() + src_footer.size_s() + payloadSize();
}
@@ -143,32 +140,21 @@ ssize_t PIPacketExtractor::bytesAvailable() const {
}
void PIPacketExtractor::setBufferSize(int new_size) {
buffer_size = new_size;
buffer.resize(buffer_size);
memset(buffer.data(), 0, buffer.size());
setThreadedReadBufferSize(new_size);
}
void PIPacketExtractor::setPayloadSize(int size) {
setProperty("payloadSize", size);
dataSize = size;
packetSize_hf = src_header.size_s() + src_footer.size_s() + payloadSize();
}
void PIPacketExtractor::setHeader(const PIByteArray & data) {
setProperty("header", data);
src_header = data;
packetSize_hf = src_header.size_s() + src_footer.size_s() + payloadSize();
}
void PIPacketExtractor::setFooter(const PIByteArray & data) {
setProperty("footer", data);
src_footer = data;
packetSize_hf = src_header.size_s() + src_footer.size_s() + payloadSize();
}
@@ -199,69 +185,76 @@ bool PIPacketExtractor::validatePayload(const uchar * rec, int size) {
bool PIPacketExtractor::threadedRead(const uchar * readed, int size_) {
//piCoutObj << "readed" << size_;
int ss;
tmpbuf.append(readed, size_);
switch (mode_) {
case PIPacketExtractor::None:
tmpbuf.clear();
if (validatePayload(readed, size_)) {
packetReceived(readed, size_);
} else {
missed += size_;
}
break;
case PIPacketExtractor::Header:
tmpbuf.append(readed, size_);
ss = src_header.size_s();
while (tmpbuf.size_s() >= ss) {
int ns = validateHeader(src_header.data(), tmpbuf.data(), src_header.size_s());
while (ns < 0) {
if (src_header.isEmpty()) return PIIODevice::threadedRead(readed, size_);
while (tmpbuf.size() >= src_header.size()) {
if (!header_found) {
packetSize_pending = validateHeader(src_header.data(), tmpbuf.data(), src_header.size_s());
while (packetSize_pending < 0) {
tmpbuf.pop_front();
++missed;
if (tmpbuf.size() < src_header.size()) return true;
ns = validateHeader(src_header.data(), tmpbuf.data(), src_header.size_s());
if (tmpbuf.size() < src_header.size()) return PIIODevice::threadedRead(readed, size_);
packetSize_pending = validateHeader(src_header.data(), tmpbuf.data(), src_header.size_s());
}
ss = src_header.size_s() + ns;
while (!validatePayload(tmpbuf.data(src_header.size_s()), dataSize)) {
header_found = true;
}
if (tmpbuf.size_s() < src_header.size_s() + packetSize_pending) return PIIODevice::threadedRead(readed, size_);
if (!validatePayload(tmpbuf.data(src_header.size_s()), packetSize_pending)) {
tmpbuf.pop_front();
++missed;
if (tmpbuf.size_s() < ss) return true;
header_found = false;
continue;
}
packetReceived(tmpbuf.data(), ss);
tmpbuf.remove(0, ss);
packetReceived(tmpbuf.data(), src_header.size_s() + packetSize_pending);
tmpbuf.remove(0, src_header.size_s() + packetSize_pending);
header_found = false;
}
break;
case PIPacketExtractor::Footer:
tmpbuf.append(readed, size_);
if (src_footer.isEmpty()) return PIIODevice::threadedRead(readed, size_);
ss = src_footer.size_s() + dataSize;
while (tmpbuf.size_s() >= ss) {
while (!validateFooter(src_footer.data(), tmpbuf.data(dataSize), src_footer.size_s())) {
tmpbuf.pop_front();
++missed;
if (tmpbuf.size_s() < ss) return true;
if (tmpbuf.size_s() < ss) return PIIODevice::threadedRead(readed, size_);
}
while (!validatePayload(tmpbuf.data(), dataSize)) {
if (!validatePayload(tmpbuf.data(), dataSize)) {
tmpbuf.pop_front();
++missed;
if (tmpbuf.size_s() < ss) return true;
continue;
}
packetReceived(tmpbuf.data(), ss);
tmpbuf.remove(0, ss);
}
break;
case PIPacketExtractor::HeaderAndFooter:
tmpbuf.append(readed, size_);
ss = src_header.size_s() + src_footer.size_s();
while (tmpbuf.size_s() >= ss) {
while (tmpbuf.size_s() >= ss && ss > 0) {
if (!header_found) {
if (tmpbuf.size_s() < ss) return true;
if (tmpbuf.size_s() < ss) return PIIODevice::threadedRead(readed, size_);
while (validateHeader(src_header.data(), tmpbuf.data(), src_header.size_s()) < 0) {
tmpbuf.pop_front();
++missed;
if (tmpbuf.size_s() < ss) return true;
if (tmpbuf.size_s() < ss) return PIIODevice::threadedRead(readed, size_);
}
header_found = true;
footerInd = src_header.size_s();
} else {
if (tmpbuf.size_s() < footerInd + src_footer.size_s()) return true;
if (tmpbuf.size_s() < footerInd + src_footer.size_s()) return PIIODevice::threadedRead(readed, size_);
while (!validateFooter(src_footer.data(), tmpbuf.data(footerInd), src_footer.size_s())) {
++footerInd;
if (tmpbuf.size_s() < footerInd + src_footer.size_s()) return true;
if (tmpbuf.size_s() < footerInd + src_footer.size_s()) return PIIODevice::threadedRead(readed, size_);
}
//piCout << "footer found at" << footerInd;
header_found = false;
@@ -277,31 +270,28 @@ bool PIPacketExtractor::threadedRead(const uchar * readed, int size_) {
}
break;
case PIPacketExtractor::Size:
tmpbuf.append(readed, size_);
if (packetSize_ <= 0) {
if (dataSize <= 0) {
tmpbuf.clear();
return true;
return PIIODevice::threadedRead(readed, size_);
}
while (tmpbuf.size_s() >= packetSize_) {
if (!validatePayload(tmpbuf.data(), packetSize_)) {
while (tmpbuf.size_s() >= dataSize) {
if (!validatePayload(tmpbuf.data(), dataSize)) {
tmpbuf.pop_front();
++missed;
missed_packets = missed / packetSize_;
continue;
}
packetReceived(tmpbuf.data(), packetSize_);
tmpbuf.remove(0, packetSize_);
packetReceived(tmpbuf.data(), dataSize);
tmpbuf.remove(0, dataSize);
}
break;
case PIPacketExtractor::Timeout:
memcpy(buffer.data(), readed, size_);
trbuf = dev->readForTime(time_);
memcpy(buffer.data(size_), trbuf.data(), trbuf.size());
if (size_ + trbuf.size() > 0)
packetReceived(buffer.data(), size_ + trbuf.size());
tmpbuf.append(dev->readForTime(time_));
if (tmpbuf.size() > 0) {
packetReceived(tmpbuf.data(), tmpbuf.size());
}
break;
};
return true;
return PIIODevice::threadedRead(readed, size_);
}

View File

@@ -64,12 +64,6 @@ public:
ssize_t bytesAvailable() const override;
//! Returns buffer size
int bufferSize() const {return buffer_size;}
//! Set buffer size to "new_size" bytes, should be at least greater than whole packet size
void setBufferSize(int new_size);
void setHeaderCheckSlot(PacketExtractorHeaderFunc f) {func_header = f;}
void setPayloadCheckSlot(PacketExtractorPayloadFunc f) {func_payload = f;}
void setFooterCheckSlot(PacketExtractorFooterFunc f) {func_footer = f;}
@@ -87,9 +81,6 @@ public:
//! Set footer data, used for PIPacketExtractor::Footer and PIPacketExtractor::HeaderAndFooter algorithms
void setFooter(const PIByteArray & data);
//! Set packet size, used for PIPacketExtractor::Size algorithm
void setPacketSize(int size) {setProperty("packetSize", size);}
//! Set timeout in milliseconds, used for PIPacketExtractor::Timeout algorithm
void setTimeout(double msecs) {setProperty("timeout", msecs);}
@@ -97,7 +88,7 @@ public:
//! Returns current extract algorithm
SplitMode splitMode() const {return mode_;}
//! Returns current payload size, used for PIPacketExtractor::Header and PIPacketExtractor::Footer algorithms
//! Returns current payload size, used for PIPacketExtractor::Header and PIPacketExtractor::Footer and PIPacketExtractor::Size algorithms
int payloadSize() const {return dataSize;}
//! Returns current header data, used for PIPacketExtractor::Header and PIPacketExtractor::HeaderAndFooter algorithms
@@ -106,25 +97,12 @@ public:
//! Returns current footer data, used for PIPacketExtractor::Footer and PIPacketExtractor::HeaderAndFooter algorithms
PIByteArray footer() const {return src_footer;}
//! Returns current packet size, used for PIPacketExtractor::Size algorithm
int packetSize() const {return packetSize_;}
//! Returns current timeout in milliseconds, used for PIPacketExtractor::Timeout algorithm
double timeout() const {return time_;}
//! Returns missed by validating functions bytes count
ullong missedBytes() const {return missed;}
//! Returns missed by validating functions packets count, = missedBytes() / packetSize
ullong missedPackets() const {return missed_packets;}
//! Returns pointer to \a missedBytes() count. Useful for output to PIConsole
const ullong * missedBytes_ptr() const {return &missed;}
//! Returns pointer to \a missedPackets() count. Useful for output to PIConsole
const ullong * missedPackets_ptr() const {return &missed_packets;}
//! Add data to extractor, raise \a packetReceived() if packet is ready
void appendData(const uchar * d, int s) {threadedRead(d, s);}
@@ -175,15 +153,15 @@ private:
DeviceInfoFlags deviceInfoFlags() const override;
PIIODevice * dev;
PIByteArray buffer, tmpbuf, src_header, src_footer, trbuf;
PIByteArray src_header, src_footer, tmpbuf;
PacketExtractorHeaderFunc func_header;
PacketExtractorPayloadFunc func_payload;
PacketExtractorFooterFunc func_footer;
SplitMode mode_;
int buffer_size, dataSize, packetSize_hf, footerInd, packetSize_;
int dataSize, packetSize_pending, footerInd;
double time_;
bool header_found;
ullong missed, missed_packets;
ullong missed;
};

View File

@@ -1,14 +1,36 @@
#include "pip.h"
#include "pibinarystream.h"
#include "pitextstream.h"
#include "piiostream.h"
//#include "stream.h"
//#include "ccm_.h"
//#include "pisystemmonitor_.h"
using namespace PICoutManipulators;
int main(int argc, char * argv[]) {
PIByteArray ba = PIByteArray::fromHex("AA11FFAA22EEAA33FF");
PIIOByteArray b;
b.open(ba);
PIPacketExtractor p(&b);
p.setSplitMode(PIPacketExtractor::Header);
p.setHeader(PIByteArray::fromHex("AABB"));
// p.setFooter(PIByteArray::fromHex("AA"));
p.setPayloadSize(2);
p.setThreadedReadSlot([](const uchar * data, int size, void * d) {
piCout << size;
return true;
});
p.setHeaderCheckSlot([](const uchar * src, const uchar * rec, int size) {
if (*src == *rec) {
if (rec[1] == 0x11) return 1;
if (rec[1] == 0x22) return 3;
}
return -1;
});
// p.setPayloadCheckSlot([](const uchar * data, int size) {
// return data[1] == 0xFF;
// });
CONNECTL(&p, packetReceived, ([](const uchar * data, int size){
piCout << PIByteArray(data, size).toHex();
}));
p.startThreadedRead();
piMSleep(100);
p.stop();
return 0;
}