git-svn-id: svn://db.shs.com.ru/pip@632 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5

This commit is contained in:
2018-09-27 10:11:21 +00:00
parent 9b92720b61
commit 08592aae9a
50 changed files with 70 additions and 19 deletions

View File

@@ -0,0 +1,606 @@
#include "pibasetransfer.h"
const uint PIBaseTransfer::signature = 0x54424950;
PIBaseTransfer::PIBaseTransfer(): crc(standardCRC_16()), diag(false) {
header.sig = signature;
crc_enabled = true;
header.session_id = 0;
packet_header_size = sizeof(PacketHeader) + customHeader().size();
part_header_size = sizeof(Part) + sizeof(int);
is_sending = is_receiving = is_pause = false;
break_ = true;
bytes_all = bytes_cur = 0;
send_queue = 0;
send_up = 0;
timeout_ = 10.;
diag.setDisconnectTimeout(timeout_ / 10);
//CONNECTU(&diag, qualityChanged, this, diagChanged);
diag.setName("PIBaseTransfer");
diag.start(50);
packets_count = 10;
setPacketSize(4096);
randomize();
}
PIBaseTransfer::~PIBaseTransfer() {
diag.stop();
break_ = true;
}
void PIBaseTransfer::stopSend() {
if (!is_sending) return;
break_ = true;
}
void PIBaseTransfer::stopReceive() {
if (!is_receiving) return;
break_ = true;
//piCoutObj << "stopReceive()";
finish_receive(false);
}
void PIBaseTransfer::setPause(bool pause_) {
if (is_pause == pause_) return;
pause_tm.reset();
is_pause = pause_;
if (pause_) paused();
else resumed();
}
void PIBaseTransfer::received(PIByteArray data) {
packet_header_size = sizeof(PacketHeader) + customHeader().size();
if (data.size() < sizeof(PacketHeader)) {
diag.received(data.size(), false);
return;
}
PacketHeader h;
data >> h;
PacketType pt = (PacketType)h.type;
if (!h.check_sig()) {
piCoutObj << "invalid packet signature";
diag.received(data.size(), false);
return;
} else diag.received(data.size(), true);
//piCoutObj << "receive" << h.session_id << h.type << h.id;
switch (pt) {
case pt_Unknown: break;
case pt_Data:
mutex_header.lock();
if (h.session_id != header.session_id || !is_receiving) {
sendBreak(h.session_id);
mutex_header.unlock();
return;
} else {
uint rcrc = h.crc;
uint ccrc;
if (crc_enabled) ccrc = crc.calculate(data.data(), data.size_s());
else ccrc = 0;
if (rcrc != ccrc) {
header.id = h.id;
piCoutObj << "invalid CRC";
sendReply(pt_ReplyInvalid);
} else {
mutex_session.lock();
processData(h.id, data);
mutex_session.unlock();
}
if (is_pause) sendReply(pt_Pause);
}
mutex_header.unlock();
break;
case pt_ReplySuccess:
case pt_ReplyInvalid:
mutex_header.lock();
if (h.session_id != header.session_id) {
mutex_header.unlock();
return;
}
if (is_pause) sendReply(pt_Pause);
mutex_header.unlock();
if (pt == pt_ReplySuccess) {
mutex_send.lock();
send_queue--;
if (send_queue < 0) send_queue = 0;
send_tm.reset();
mutex_send.unlock();
}
if (is_sending) {
mutex_session.lock();
if (h.id < replies.size()) {
replies[h.id] = pt;
pm_string[h.id] = pt == pt_ReplySuccess ? '#' : '-';
int s = pm_string.find('+'), s1 = send_queue;
while (s <= (int)h.id && s > 0) {
send_queue--;
if (s >= 0) {
pm_string[s] = '-';
bytes_cur -= packet_size;
}
send_up = 0;
if (send_queue < 0) {
send_queue = 0;
break;
}
s = pm_string.find('+');
}
if (s1-send_queue > 1 && packets_count > 2) packets_count-= piMaxi(packets_count/10,1);
if (s1 == send_queue && s1 < piMaxi(packets_count/2, 2) && packets_count < 100) send_up++;
if (send_up > 20 && send_up > packets_count*2) packets_count+= piMaxi(packets_count/10,1);
//piCoutObj << packets_count;
} else
piCoutObj << "invalid reply id";
mutex_session.unlock();
// piCoutObj << "Done Packet" << h.id;
}
if (is_receiving && h.id == 0) {
// if (checkSession() == 0 && pt == pt_ReplySuccess) finish_receive(true);
if (pt == pt_ReplySuccess) {
mutex_session.lock();
int cs = checkSession();
mutex_session.unlock();
//piCoutObj << "Success receive";
if (cs == 0) finish_receive(true);
}
}
break;
case pt_Break:
break_ = true;
//piCoutObj << "BREAK";
if (is_receiving) {
stopReceive();
return;
}
if (is_sending) {
stopSend();
return;
}
break;
case pt_Start:
mutex_header.lock();
if (is_pause && (is_sending || is_receiving)) {
if (header.session_id == h.session_id) {
is_pause = false;
mutex_header.unlock();
resumed();
return;
}
}
if (is_sending && header.session_id != h.session_id) {
sendBreak(h.session_id);
mutex_header.unlock();
return;
}
if (is_receiving) {
if (header.session_id != h.session_id) {
//sendBreak(h.session_id);
//return;
piCoutObj << "restart receive";
mutex_header.unlock();
finish_receive(false, true);
} else {
header.id = 0;
sendReply(pt_ReplySuccess);
mutex_header.unlock();
return;
}
}
mutex_header.unlock();
if (data.size() == sizeof(StartRequest)) {
StartRequest sr;
data >> sr;
mutex_session.lock();
mutex_header.lock();
bytes_cur = 0;
state_string = "start request";
bytes_all = sr.size;
header.session_id = h.session_id;
header.id = 0;
packets_count = 10;
session.clear();
replies.clear();
session.resize(sr.packets);
replies.resize(sr.packets + 1);
replies.fill(pt_Unknown);
pm_string.resize(replies.size(), '-');
diag.reset();
// diag.start(100);
//piCoutObj << "receiveStarted()";
is_receiving = true;
break_ = false;
mutex_send.lock();
send_queue = 0;
mutex_send.unlock();
receiveStarted();
state_string = "receiving";
replies[0] = pt_ReplySuccess;
pm_string[0] = '#';
mutex_session.unlock();
sendReply(pt_ReplySuccess);
mutex_header.unlock();
}
break;
case pt_Pause:
mutex_header.lock();
if (header.session_id == h.session_id) {
//piCout << "receive pause";
if (!is_pause && pause_tm.elapsed_s() < timeout_/10) {
//piCout << "resume";
sendReply(pt_Start);
mutex_header.unlock();
return;
}
if (!is_pause) paused();
is_pause = true;
if (is_receiving && pause_tm.elapsed_m() > 40) {
//piCout << "send pause";
sendReply(pt_Pause);
}
if (is_sending) send_tm.reset();
pause_tm.reset();
}
mutex_header.unlock();
break;
default: break;
}
}
bool PIBaseTransfer::send_process() {
mutex_session.lock();
packet_header_size = sizeof(PacketHeader) + customHeader().size();
break_ = false;
diag.reset();
// diag.start(100);
sendStarted();
is_sending = true;
int session_size = session.size();
replies.resize(session_size + 1);
replies.fill(pt_Unknown);
pm_string.resize(replies.size(), '-');
mutex_session.unlock();
PIByteArray ba;
if (!getStartRequest()) return finish_send(false);
state_string = "sending";
PITimeMeasurer stm;
mutex_send.lock();
send_queue = 0;
send_up = 0;
send_tm.reset();
mutex_send.unlock();
//int ltm = 0;
for (int i = 0; i < session_size; i++) {
mutex_send.lock();
int sq = send_queue;
mutex_send.unlock();
if (sq >= packets_count || is_pause) {
--i;
//piMSleep(1);
if (is_pause) {
piMSleep(40);
//piCout << "send pause";
mutex_header.lock();
sendReply(pt_Pause);
mutex_header.unlock();
if (pause_tm.elapsed_s() > timeout()) {
return finish_send(false);
}
}
mutex_send.lock();
if (send_tm.elapsed_s() > timeout_) {
mutex_send.unlock();
return finish_send(false);
}
if (stm.elapsed_s() > timeout_ / 2.) {
send_up = 0;
send_queue = 0;
pm_string.replaceAll("+", "-");
}
mutex_send.unlock();
continue;
}
stm.reset();
ba = build_packet(i);
diag.sended(ba.size_s());
sendRequest(ba);
pm_string[i+1] = '+';
mutex_send.lock();
send_queue++;
mutex_send.unlock();
if (break_) return finish_send(false);
}
// piCoutObj << "send done, checking";
PITimeMeasurer rtm;
int prev_chk = 0;
mutex_send.lock();
send_queue = 0;
mutex_send.unlock();
piMSleep(10);
state_string = "sending more";
while (rtm.elapsed_s() < timeout_) {
//piCoutObj << "recovering...";
mutex_session.lock();
int chk = checkSession();
mutex_session.unlock();
if (chk == 0) return finish_send(true);
if (chk != prev_chk) rtm.reset();
else if (rtm.elapsed_m() < 100) {
piMSleep(1);
continue;
}
if (is_pause) {
piMSleep(40);
//piCout << "send pause";
mutex_header.lock();
sendReply(pt_Pause);
mutex_header.unlock();
if (pause_tm.elapsed_s() > timeout())return finish_send(false);
else continue;
}
prev_chk = chk;
if (chk > 0) {
//piCoutObj << "recovery packet" << chk;
mutex_send.lock();
int sq = send_queue;
mutex_send.unlock();
if (sq >= packets_count) {
piMSleep(10);
send_queue = 0;
send_up = 0;
pm_string.replaceAll("+", "-");
continue;
}
ba = build_packet(chk - 1);
diag.sended(ba.size_s());
sendRequest(ba);
pm_string[chk] = '+';
mutex_send.lock();
send_queue++;
mutex_send.unlock();
}
// if (chk == -1) return finish_send(false);
if (break_) return finish_send(false);
//piMSleep(1);
}
return finish_send(false);
}
int PIBaseTransfer::checkSession() {
if (!(is_receiving || is_sending)) return -1;
int miss = 0;
for (int i = 1; i < replies.size_s(); i++) {
if (replies[i] != pt_ReplySuccess) miss++;
if (replies[i] == pt_ReplyInvalid) return i;
}
for (int i = 1; i < replies.size_s(); i++) {
if (replies[i] != pt_ReplySuccess) return i;
}
if (miss > 0) {
//piCoutObj << "missing" << miss << "packets";
return -miss;
} else return 0;
}
void PIBaseTransfer::buildSession(PIVector<Part> parts) {
mutex_session.lock();
mutex_header.lock();
state_string = "calculating parts ... ";
session.clear();
header.session_id = randomi();
bytes_all = 0;
Part fi;
int fi_index, fi_prts;
PIVector<Part> lfi;
int min_size = packet_header_size + part_header_size;
int cur_size = min_size;
for (int i = 0; i < parts.size_s(); i++) {
state_string = "calculating parts ... " + PIString::fromNumber(i) + " of " + PIString::fromNumber(parts.size());
fi.id = parts[i].id;
//piCout << fi.id << state_string;
bytes_all += parts[i].size;
//fi.size = fi.entry.size;
fi.start = 0;
llong rest = parts[i].size - (packet_size - cur_size);
//piCout << i << fi.size << rest << bytes_all;
if (rest <= 0) {
fi.size = parts[i].size;
lfi << fi;
cur_size += fi.size + part_header_size;
} else {
fi.size = parts[i].size - rest;
fi_index = 1;
fi_prts = 1 + 1 + piMaxi(1, rest / (packet_size - min_size));
//piCout << fi_prts;
lfi << fi;
session << lfi;
lfi.clear();
cur_size = min_size;
llong fs = fi.size;
for (int j = 1; j < fi_prts; j++) {
fi_index++;
fi.start = fs;
fi.size = piMin<ullong>(parts[i].size - fs, packet_size - min_size);
lfi << fi;
cur_size += fi.size + part_header_size;
if (fi_index != fi_prts) {
session << lfi;
lfi.clear();
cur_size = min_size;
fs += fi.size;
}
}
}
if (packet_size - cur_size < min_size) {
session << lfi;
lfi.clear();
cur_size = min_size;
}
}
if (cur_size > min_size) session << lfi;
mutex_header.unlock();
mutex_session.unlock();
}
void PIBaseTransfer::sendBreak(int session_id) {
//piCoutObj << "sendBreak";
uint psid = header.session_id;
header.session_id = session_id;
sendReply(pt_Break);
header.session_id = psid;
}
void PIBaseTransfer::sendReply(PacketType reply) {
//piCoutObj << "sendReply" << reply;
header.type = reply;
PIByteArray ba;
ba << header;
if (is_sending || is_receiving) diag.sended(ba.size_s());
sendRequest(ba);
}
bool PIBaseTransfer::getStartRequest() {
mutex_header.lock();
header.type = pt_Start;
header.id = 0;
PIByteArray ba;
StartRequest st;
st.packets = (uint)session.size();
st.size = bytes_all;
ba << header;
mutex_header.unlock();
ba << st;
state_string = "send request";
PITimeMeasurer tm;
while (tm.elapsed_s() < timeout_) {
diag.sended(ba.size_s());
sendRequest(ba);
if (break_) return false;
// piCoutObj << replies[0];
mutex_session.lock();
if (replies[0] == pt_ReplySuccess) {
state_string = "send permited!";
//packets_count = piClampi(10 / tm.elapsed_m(), 2, 100);
//piCoutObj << "ping " << tm.elapsed_m() << packets_count;
mutex_session.unlock();
return true;
}
mutex_session.unlock();
piMSleep(1);
}
return false;
}
void PIBaseTransfer::processData(int id, PIByteArray & data) {
// piCoutObj << "received packet" << id << ", size" << data.size();
if (id < 1 || id > replies.size_s()) return;
if (!session[id - 1].isEmpty()) {
header.id = id;
replies[id] = pt_ReplySuccess;
pm_string[id] = '#';
sendReply(pt_ReplySuccess);
if (replies[replies.size()-1] == pt_ReplySuccess)
if (checkSession() == 0) state_string = "receive ok";
return;
}
Part fi;
PIByteArray ba, pheader;
pheader.resize(packet_header_size - sizeof(PacketHeader));
if (!pheader.isEmpty()) {
memcpy(pheader.data(), data.data(), pheader.size());
data.remove(0, pheader.size_s());
}
while (!data.isEmpty()) {
ba.clear();
data >> fi;
//if (fi.size > 0)
data >> ba;
//fi.fsize = ba.size();
bytes_cur += fi.size;
//piCoutObj << "recv" << fi;
session[id - 1] << fi;
state_string = "receiving...";
receivePart(fi, ba, pheader);
}
header.id = id;
replies[id] = pt_ReplySuccess;
pm_string[id] = '#';
sendReply(pt_ReplySuccess);
if (checkSession() == 0) state_string = "receive ok";
}
PIByteArray PIBaseTransfer::build_packet(int id) {
PIByteArray ret;
PIByteArray ba;
//piCoutObj << "session id" << header.session_id;
//ret << header;
ret.append(customHeader());
mutex_session.lock();
for (int i = 0; i < session[id].size_s(); i++) {
Part fi = session[id][i];
ret << fi;
//piCout << "build" << fi;
ba = buildPacket(fi);
bytes_cur += ba.size();
if (ba.size() != fi.size) piCoutObj << "***error while build packet, wrong part size";
ret << ba;
}
mutex_session.unlock();
PIByteArray hdr;
mutex_header.lock();
header.id = id + 1;
header.type = pt_Data;
if (crc_enabled) header.crc = crc.calculate(ret);
else header.crc = 0;
hdr << header;
mutex_header.unlock();
ret.insert(0, hdr);
// piCoutObj << "Send Packet" << header.id << ret.size();
return ret;
}
bool PIBaseTransfer::finish_send(bool ok) {
is_sending = false;
if (ok) state_string = "send done";
else state_string = "send failed";
mutex_header.lock();
header.id = 0;
if (!ok) sendBreak(header.session_id);
else sendReply(pt_ReplySuccess);
mutex_header.unlock();
//piCoutObj << state_string << PIString::readableSize(bytes_all);
sendFinished(ok);
// diag.stop();
bytes_all = bytes_cur = 0;
return ok;
}
void PIBaseTransfer::finish_receive(bool ok, bool quet) {
is_receiving = false;
if (ok) state_string = "receive done";
else state_string = "receive failed";
if (!ok && !quet) {
mutex_header.lock();
sendBreak(header.session_id);
mutex_header.unlock();
}
//piCoutObj << state_string << PIString::readableSize(bytes_all);
receiveFinished(ok);
// diag.stop();
bytes_all = bytes_cur = 0;
}

View File

@@ -0,0 +1,156 @@
/*! \file pibasetransfer.h
* \brief Base class for reliable send and receive data in fixed packets with error correction, pause and resume
*/
/*
PIP - Platform Independent Primitives
Base class for reliable send and receive data in fixed packets with error correction, pause and resume
Copyright (C) 2018 Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PIBASETRANSFER_H
#define PIBASETRANSFER_H
#include "picrc.h"
#include "pidiagnostics.h"
class PIP_EXPORT PIBaseTransfer: public PIObject
{
PIOBJECT_SUBCLASS(PIBaseTransfer, PIObject)
public:
PIBaseTransfer();
~PIBaseTransfer();
# pragma pack(push,1)
struct PIP_EXPORT PacketHeader {
uint sig;
int type; // PacketType
int session_id;
uint id;
uint crc;
bool check_sig() {return (sig == signature);}
};
struct PIP_EXPORT Part {
Part(uint id_ = 0, ullong size_ = 0, ullong start_ = 0) : id(id_), size(size_), start(start_) {}
uint id;
ullong size;
ullong start;
};
# pragma pack(pop)
void stopSend();
void stopReceive();
bool isSending() const {return is_sending;}
bool isReceiving() const {return is_receiving;}
bool isPause() const {return is_pause;}
void setPause(bool pause_);
void setPacketSize(int size) {packet_size = size;}
int packetSize() const {return packet_size;}
void setTimeout(double sec) {timeout_ = sec; diag.setDisconnectTimeout(sec);}
double timeout() const {return timeout_;}
void setCRCEnabled(bool en = true) {crc_enabled = en;}
bool isCRCEnabled() const {return crc_enabled;}
PIString stateString() const {return state_string;}
PIString packetMap() const {return pm_string;}
llong bytesAll() const {return bytes_all;}
llong bytesCur() const {return bytes_cur;}
const PIDiagnostics &diagnostic() {return diag;}
static uint packetSignature() {return signature;}
EVENT_HANDLER1(void, received, PIByteArray, data);
EVENT_HANDLER(void, stop) {stopSend(); stopReceive();}
EVENT_HANDLER(void, pause) {setPause(true);}
EVENT_HANDLER(void, resume) {setPause(false);}
EVENT(receiveStarted)
EVENT(paused)
EVENT(resumed)
EVENT1(receiveFinished, bool, ok)
EVENT(sendStarted)
EVENT1(sendFinished, bool, ok)
EVENT1(sendRequest, PIByteArray &, data)
protected:
void buildSession(PIVector<Part> parts);
virtual PIByteArray buildPacket(Part fi) = 0;
virtual void receivePart(Part fi, PIByteArray ba, PIByteArray pheader) = 0;
virtual PIByteArray customHeader() {return PIByteArray();}
bool send_process();
uint packet_header_size, part_header_size;
bool break_, is_sending, is_receiving, is_pause;
PIString state_string;
llong bytes_all, bytes_cur;
private:
enum PIP_EXPORT PacketType {pt_Unknown, pt_Data, pt_ReplySuccess, pt_ReplyInvalid, pt_Break, pt_Start, pt_Pause};
# pragma pack(push,1)
struct PIP_EXPORT StartRequest {
uint packets;
ullong size;
};
# pragma pack(pop)
friend PIByteArray & operator >>(PIByteArray & s, PIBaseTransfer::StartRequest & v);
friend PIByteArray & operator <<(PIByteArray & s, const PIBaseTransfer::StartRequest & v);
void processData(int id, PIByteArray &data);
PIByteArray build_packet(int id);
int checkSession();
void sendBreak(int session_id);
void sendReply(PacketType reply);
bool getStartRequest();
bool finish_send(bool ok);
void finish_receive(bool ok, bool quet = false);
static const uint signature;
int packet_size, packets_count;
double timeout_;
PIVector<PIVector<Part> > session;
PIVector<PacketType> replies;
PITimeMeasurer send_tm, pause_tm;
PacketHeader header;
CRC_16 crc;
int send_queue;
int send_up;
PIDiagnostics diag;
PIMutex mutex_session;
PIMutex mutex_send;
PIMutex mutex_header;
bool crc_enabled;
PIString pm_string;
};
inline PIByteArray & operator <<(PIByteArray & s, const PIBaseTransfer::PacketHeader & v) {s << v.sig << v.type << v.session_id << v.id << v.crc; return s;}
inline PIByteArray & operator >>(PIByteArray & s, PIBaseTransfer::PacketHeader & v) {s >> v.sig >> v.type >> v.session_id >> v.id >> v.crc; return s;}
inline PIByteArray & operator <<(PIByteArray & s, const PIBaseTransfer::Part & v) {s << v.id << v.size << v.start; return s;}
inline PIByteArray & operator >>(PIByteArray & s, PIBaseTransfer::Part & v) {s >> v.id >> v.size >> v.start; return s;}
inline PIByteArray & operator <<(PIByteArray & s, const PIBaseTransfer::StartRequest & v) {s << v.packets << v.size; return s;}
inline PIByteArray & operator >>(PIByteArray & s, PIBaseTransfer::StartRequest & v) {s >> v.packets >> v.size; return s;}
inline PICout operator <<(PICout s, const PIBaseTransfer::Part & v) {s.setControl(0, true); s << "Part(\"" << v.id << "\", " << PIString::readableSize(v.start) << " b | " << PIString::readableSize(v.size) << " b)"; s.restoreControl(); return s;}
#endif // PIBASETRANSFER_H

View File

@@ -0,0 +1,1285 @@
/*
PIP - Platform Independent Primitives
Complex I/O point
Copyright (C) 2018 Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "piconnection.h"
#include "piconfig.h"
/** \class PIConnection
* \brief Complex Input/Output point
*
* \section PIConnection_synopsis Synopsis
* %PIConnection provides abstract layer over physical devices,
* filtering and connecting data streams. Each %PIConnection
* works through Device Pool, so several %PIConnections can
* read from single physical device. General scheme:
* \image html piconnection.png
*
* \section PIConnection_pool Device pool concept
* Device pool is static object, single for each application, which
* contains unique devices. Each %PIConnection works with real devices
* through Device pool. Each device has assosiated thread for read
* and it can be started or stopped with %PIConnection functions
* \a startThreadedRead() and \a stopThreadedRead().
*
* \section PIConnection_filters Filters
* %PIConnection filter is a PIPacketExtractor and assosiated
* array of devices or other filters. When read thread is successfully read
* from device this data can be passed to one or more filters. Each filter
* has name and filter names should be unique. You can use this name for
* access to PIPacketExtractor* with function \a filter(), or get array of
* assosiated devices and filters with function \a filterBoundedDevices().
* One filter can receive data from several sources, and can be bounded to
* several filters.
* \image html piconnection_filters.png
*
* \section PIConnection_diag Diagnostics
* %PIConnection create PIDiagnostics for each device or filter. You can
* access to these objects with functions \a diagnostic().
*
* \section PIConnection_sender Senders
* %PIConnection can send data to devices with named timers ("senders").
* You can create sender or add device to sender with function \a addSender().
* Each sender has internal timer and every tick execute virtual function
* \a senderData(). Returns value of this function sended to bounded devices.
* You can assign fixed send data to sender with function \a setSenderFixedData().
* In this case sender will NOT execute \a senderData(), but send assigned data.
* \image html piconnection_senders.png
*
* \section PIConnection_config Configuration
* You can create %PIConnection from config file section or configure
* it later with function \a configureFromConfig(). Devices describes
* with its full pathes, for details see \ref PIIODevice_sec7. Example:
* \image html piconnection_conf.png
* Also %PIConnection can create PIString with its configuration with
* function \a makeConfig(). This string can be directly inserted into the
* config file.
*
*/
PIVector<PIConnection * > PIConnection::_connections;
PIConnection::PIConnection(const PIString & name): PIObject(name) {
_connections << this;
}
PIConnection::PIConnection(const PIString & config, const PIString & name_): PIObject(name_) {
_connections << this;
configureFromConfig(config, name_);
}
PIConnection::PIConnection(PIString * string, const PIString & name_): PIObject(name_) {
_connections << this;
configureFromString(string, name_);
}
PIConnection::~PIConnection() {
__device_pool__->unboundConnection(this);
removeAllFilters();
_connections.removeAll(this);
}
bool PIConnection::configureFromConfig(const PIString & conf_path, const PIString & name_) {
PIConfig conf(conf_path, PIIODevice::ReadOnly);
return configure(conf, name_);
}
bool PIConnection::configureFromString(PIString * string, const PIString & name_) {
PIConfig conf(string, PIIODevice::ReadOnly);
return configure(conf, name_);
}
bool PIConnection::configure(PIConfig & conf, const PIString & name_) {
if (!conf.isOpened()) return false;
__device_pool__->unboundConnection(this);
removeAllSenders();
removeAllChannels();
removeAllFilters();
removeAllDevices();
setName(name_);
if (name_.isEmpty()) piCoutObj << "Warning, can't configure connection with empty name";
PIConfig::Entry ce(conf.getValue(name_));
PIConfig::Branch db(ce.getValue("device").children()), fb(ce.getValue("filter").children()),
cb(ce.getValue("channel").children()), sb(ce.getValue("sender").children());
PIStringList dev_list(ce.getValue("device").value());
PIStringList name_list(ce.getValue("device").name());
PIStringList flt_list(ce.getValue("filter").value());
piForeachC (PIConfig::Entry * e, db) {
dev_list << e->value();
name_list << e->name();
}
piForeachC (PIConfig::Entry * e, fb)
flt_list << e->name();
PISet<PIString> chk_set = (PISet<PIString>(name_list) & PISet<PIString>(flt_list));
//piCout << name_list << flt_list << chk_set;
chk_set.remove("");
if (!chk_set.isEmpty()) {
piCoutObj << "Error," << chk_set.toVector() << "names assigned to both devices and filters!";
return false;
}
PIMap<PIString, PIString> dev_aliases;
for (int i = 0; i < dev_list.size_s(); ++i) {
PIString fn(dev_list[i]);
if (fn.isEmpty()) continue;
PIString & n(name_list[i]);
PIIODevice::DeviceMode dm = PIIODevice::ReadWrite;
PIIODevice::splitFullPath(fn, &fn, &dm);
//piCout << fn;
//piCoutObj << "add" << fn << n;
PIIODevice * dev = addDevice(fn, dm);
if (!dev) continue;
dev_aliases[n] = fn;
//device_names[n] = dev;
setDeviceName(dev, n);
dev->setName(name_ + ".device." + dev_list[i]);
PIConfig::Entry de = ce.getValue("device." + n);
dev->setThreadedReadBufferSize(de.getValue("bufferSize", dev->threadedReadBufferSize()));
PIDiagnostics * diag = diags_.value(dev, 0);
if (diag != 0)
diag->setDisconnectTimeout(de.getValue("disconnectTimeout", diag->disconnectTimeout()));
}
int added(0), padded(-1), tries(0);
bool pdebug = debug();
setDebug(false);
PIStringList filter_fails;
while (added != padded && tries < 100) {
padded = added;
added = 0;
++tries;
piForeachC (PIConfig::Entry * e, fb) {
PIPacketExtractor::SplitMode sm = PIPacketExtractor::None;
PIString sms(e->getValue("splitMode").value());
int smi = sms.toInt();
if (smi >= 1 && smi <= 5) sm = (PIPacketExtractor::SplitMode)smi;
else {
sms = sms.trim().toLowerCase();
if (sms.find("header") >= 0 && sms.find("footer") >= 0)
sm = PIPacketExtractor::HeaderAndFooter;
else {
if (sms.find("header") >= 0)
sm = PIPacketExtractor::Header;
else {
if (sms.find("footer") >= 0)
sm = PIPacketExtractor::Footer;
else {
if (sms.find("time") >= 0)
sm = PIPacketExtractor::Timeout;
else {
if (sms.find("size") >= 0)
sm = PIPacketExtractor::Size;
}
}
}
}
}
PIStringList devs(e->value());
PIConfig::Branch db(e->getValue("device").children());
devs << e->getValue("device", "").value();
piForeachC (PIConfig::Entry * e2, db)
devs << e2->value();
devs.removeStrings("");
if (devs.isEmpty()) continue;
PIString dname = dev_aliases.value(devs.front(), devs.front());
PIPacketExtractor * pe = addFilter(e->name(), dname, sm);
if (pe == 0) {
if (!filter_fails.contains(dname))
filter_fails << dname;
continue;
} else {
filter_fails.removeAll(dname);
}
++added;
for (int i = 1; i < devs.size_s(); ++i) {
dname = dev_aliases.value(devs[i], devs[i]);
if (addFilter(e->name(), dname, sm) != 0) {
filter_fails.removeAll(dname);
++added;
} else {
if (!filter_fails.contains(dname))
filter_fails << dname;
}
}
PIDiagnostics * diag = diags_.value(pe, 0);
if (diag != 0)
diag->setDisconnectTimeout(e->getValue("disconnectTimeout", diag->disconnectTimeout()));
pe->setBufferSize(e->getValue("bufferSize", pe->bufferSize()));
pe->setPayloadSize(e->getValue("payloadSize", pe->payloadSize()));
pe->setPacketSize(e->getValue("packetSize", pe->packetSize()));
pe->setTimeout(e->getValue("timeout", pe->timeout()));
pe->setHeader(PIByteArray::fromUserInput(e->getValue("header", "").value()));
pe->setFooter(PIByteArray::fromUserInput(e->getValue("footer", "").value()));
}
}
setDebug(pdebug);
piForeachC (PIString & f, filter_fails)
piCoutObj << "\"addFilter\" error: no such device \"" << f << "\"!";
piForeachC (PIConfig::Entry * e, cb) {
PIString f(e->getValue("from").value()), t(e->getValue("to").value());
addChannel(dev_aliases.value(f, f), dev_aliases.value(t, t));
}
piForeachC (PIConfig::Entry * e, sb) {
PIStringList devs(e->value());
PIConfig::Branch db(e->getValue("device").children());
devs << e->getValue("device", "").value();
piForeachC (PIConfig::Entry * e2, db)
devs << e2->value();
devs.removeStrings("");
if (devs.isEmpty()) continue;
float freq = e->getValue("frequency");
piForeachC (PIString & d, devs)
addSender(e->name(), dev_aliases.value(d, d), freq);
PIByteArray fd(PIByteArray::fromUserInput(e->getValue("fixedData").value()));
setSenderFixedData(e->name(), fd);
}
return true;
}
PIString PIConnection::makeConfig() const {
PIString ret;
ret << "[" << name() << "]\n";
PIVector<PIIODevice * > devs(boundedDevices());
int dn(-1);
piForeachC (PIIODevice * d, devs) {
PIStringList dnl(deviceNames(d));
if (dnl.isEmpty()) dnl << PIString::fromNumber(++dn);
piForeachC (PIString & dname, dnl) {
ret << "device." << dname << " = " << d->constructFullPath() << " #s\n";
ret << "device." << dname << ".bufferSize = " << d->threadedReadBufferSize() << " #n\n";
PIDiagnostics * diag = diags_.value(const_cast<PIIODevice * >(d), 0);
if (diag != 0)
ret << "device." << dname << ".disconnectTimeout = " << diag->disconnectTimeout() << " #f\n";
}
}
piForeachC (PEPair & f, extractors) {
if (f.second == 0) continue;
if (f.second->extractor == 0) continue;
PIString prefix = "filter." + f.first;
for (int i = 0; i < f.second->devices.size_s(); ++i) {
PIString dname = device_names.key(f.second->devices[i]);
if (dname.isEmpty()) dname = devPath(f.second->devices[i]);
ret << prefix << ".device." << i << " = " << dname << " #s\n";
}
PIDiagnostics * diag = diags_.value(f.second->extractor, 0);
ret << prefix << ".bufferSize = " << f.second->extractor->bufferSize() << " #n\n";
if (diag != 0)
ret << prefix << ".disconnectTimeout = " << diag->disconnectTimeout() << " #f\n";
ret << prefix << ".splitMode = ";
switch (f.second->extractor->splitMode()) {
case PIPacketExtractor::None: ret << "none"; break;
case PIPacketExtractor::Header: ret << "header"; break;
case PIPacketExtractor::Footer: ret << "footer"; break;
case PIPacketExtractor::HeaderAndFooter: ret << "header & footer"; break;
case PIPacketExtractor::Size: ret << "size"; break;
case PIPacketExtractor::Timeout: ret << "timeout"; break;
}
ret << " #s\n";
ret << prefix << ".payloadSize = " << f.second->extractor->payloadSize() << " #n\n";
ret << prefix << ".packetSize = " << f.second->extractor->packetSize() << " #n\n";
ret << prefix << ".timeout = " << f.second->extractor->timeout() << " #f\n";
ret << prefix << ".header = " << f.second->extractor->header().toString() << " #s\n";
ret << prefix << ".footer = " << f.second->extractor->footer().toString() << " #s\n";
}
dn = 0;
piForeachC (CPair & c, channels_) {
piForeachC (PIIODevice * d, c.second) {
PIString prefix = "channel." + PIString::fromNumber(dn); ++dn;
PIString dname = device_names.key(c.first);
if (dname.isEmpty()) dname = devPath(c.first);
ret << prefix << ".from = " << dname << " #s\n";
dname = device_names.key(const_cast<PIIODevice *>(d));
if (dname.isEmpty()) dname = devPath(d);
ret << prefix << ".to = " << dname << " #s\n";
}
}
piForeachC (SPair & s, senders) {
if (s.second == 0) continue;
PIString prefix = "sender." + s.second->name();
for (int i = 0; i < s.second->devices.size_s(); ++i) {
PIString dname = device_names.key(s.second->devices[i]);
if (dname.isEmpty()) dname = devPath(s.second->devices[i]);
ret << prefix << ".device." << i << " = " << dname << " #s\n";
}
double int_ = s.second->int_;
if (int_ > 0.)
ret << prefix << ".frequency = " << (1000. / int_) << " #f\n";
if (!s.second->sdata.isEmpty())
ret << prefix << ".fixedData = " << s.second->sdata.toString() << " #s\n";
}
ret << "[]\n";
return ret;
}
PIIODevice * PIConnection::addDevice(const PIString & full_path, PIIODevice::DeviceMode mode, bool start) {
PIString fp(PIIODevice::normalizeFullPath(full_path));
PIIODevice * dev = __device_pool__->addDevice(this, fp, mode, start);
if (dev) {
dev->setName(name() + ".device." + fp);
device_modes[dev] = mode;
__device_pool__->lock();
if (diags_.value(dev, 0) == 0) {
PIDiagnostics * d = new PIDiagnostics(false);
d->setInterval(10.);
diags_[dev] = d;
CONNECTU(d, qualityChanged, this, diagQualityChanged);
__device_pool__->init();
}
__device_pool__->unlock();
}
return dev;
}
void PIConnection::setDeviceName(PIIODevice * dev, const PIString & name) {
if (!dev) return;
device_names[name] = dev;
//dev->setProperty();
}
PIStringList PIConnection::deviceNames(const PIIODevice * dev) const {
PIStringList ret;
piForeachC (DNPair & s, device_names)
if (s.second == dev)
ret << s.first;
return ret;
}
bool PIConnection::removeDevice(const PIString & full_path) {
PIString fp(PIIODevice::normalizeFullPath(full_path));
PIIODevice * dev = __device_pool__->device(fp);
if (dev == 0) return false;
PIStringList dntd(deviceNames(dev));
piForeachC (PIString & n, dntd)
device_names.removeOne(n);
piForeachC (SPair & s, senders) {
if (s.second == 0) continue;
s.second->lock();
s.second->devices.removeAll(dev);
s.second->unlock();
}
device_modes.remove(dev);
piForeachC (PEPair & i, extractors) {
if (i.second == 0) continue;
i.second->devices.removeAll(dev);
}
bounded_extractors.remove(dev);
channels_.remove(dev);
for (PIMap<PIIODevice * , PIVector<PIIODevice * > >::iterator it = channels_.begin(); it != channels_.end(); ++it)
it.value().removeAll(dev);
__device_pool__->lock();
if (diags_.value(dev, 0) != 0)
delete diags_.value(dev);
diags_.remove(dev);
__device_pool__->unlock();
return __device_pool__->removeDevice(this, fp);
}
void PIConnection::removeAllDevices() {
device_names.clear();
PIVector<PIIODevice * > bdevs(__device_pool__->boundedDevices(this));
__device_pool__->lock();
piForeach (PIIODevice * d, bdevs) {
piForeachC (SPair & s, senders) {
if (s.second == 0) continue;
s.second->lock();
s.second->devices.removeAll(d);
s.second->unlock();
}
channels_.remove(d);
for (PIMap<PIIODevice * , PIVector<PIIODevice * > >::iterator it = channels_.begin(); it != channels_.end(); ++it)
it.value().removeAll(d);
if (diags_.value(d, 0) != 0)
delete diags_.value(d);
diags_.remove(d);
}
__device_pool__->unboundConnection(this);
__device_pool__->unlock();
device_modes.clear();
bounded_extractors.clear();
piForeachC (PEPair & i, extractors) {
if (i.second == 0) continue;
i.second->devices.clear();
}
}
PIIODevice * PIConnection::deviceByFullPath(const PIString & full_path) const {
PIString fp(PIIODevice::normalizeFullPath(full_path));
DevicePool::DeviceData * dd = __device_pool__->devices.value(fp);
if (dd == 0) return 0;
if (dd->dev == 0) return 0;
if (!dd->listeners.contains(const_cast<PIConnection * >(this))) return 0;
return dd->dev;
}
PIIODevice * PIConnection::deviceByName(const PIString & name) const {
return device_names.value(name, 0);
}
PIVector<PIIODevice * > PIConnection::boundedDevices() const {
return __device_pool__->boundedDevices(this);
}
PIPacketExtractor * PIConnection::addFilter(const PIString & name_, const PIString & full_path, PIPacketExtractor::SplitMode mode) {
PIString fname_ = name_.trimmed();
Extractor * e = extractors.value(fname_);
if (full_path.isEmpty()) return (e == 0 ? 0 : e->extractor);
PIIODevice * dev = devByString(full_path);
PIPacketExtractor * pe(0);
if (extractors.value(full_path) != 0) pe = extractors.value(full_path)->extractor;
if (pe != 0) dev = pe;
if (dev == 0) {
piCoutObj << "\"addFilter\" error: no such device or filter \"" << full_path << "\"!";
return 0;
}
if (e == 0) {
e = new Extractor();
extractors[fname_] = e;
}
if (e->extractor == 0) {
e->extractor = new PIPacketExtractor(0, mode);
e->extractor->setName(fname_);
e->extractor->setThreadedReadData(new PIPair<PIConnection * , PIString>(this, fname_));
e->extractor->setHeaderCheckSlot(filterValidateHeaderS);
e->extractor->setFooterCheckSlot(filterValidateFooterS);
e->extractor->setPayloadCheckSlot(filterValidatePayloadS);
__device_pool__->lock();
if (diags_.value(e->extractor, 0) == 0) {
PIDiagnostics * d = new PIDiagnostics(false);
d->setInterval(10.);
diags_[e->extractor] = d;
CONNECTU(d, qualityChanged, this, diagQualityChanged);
}
__device_pool__->unlock();
CONNECT2(void, uchar * , int, e->extractor, packetReceived, this, packetExtractorReceived)
}
if (!e->devices.contains(dev)) {
bounded_extractors[dev] << e->extractor;
//if (PIString(dev->className()) == "PIPacketExtractor") dev->setThreadSafe(false);
e->devices << dev;
}
return e->extractor;
}
PIPacketExtractor * PIConnection::addFilter(PIPacketExtractor * filter, const PIString & full_path) {
Extractor * e = 0;
if (full_path.isEmpty()) return (e == 0 ? 0 : e->extractor);
PIIODevice * dev = devByString(full_path);
PIPacketExtractor * pe(0);
if (extractors.value(full_path) != 0) pe = extractors.value(full_path)->extractor;
if (pe != 0) dev = pe;
if (dev == 0) {
piCoutObj << "\"addFilter\" error: no such device or filter \"" << full_path << "\"!";
return 0;
}
if (e == 0) {
e = new Extractor();
extractors[filter->name()] = e;
}
if (e->extractor == 0) {
e->extractor = filter;
e->extractor->setThreadedReadData(new PIPair<PIConnection * , PIString>(this, filter->name()));
__device_pool__->lock();
if (diags_.value(e->extractor, 0) == 0) {
PIDiagnostics * d = new PIDiagnostics(false);
d->setInterval(10.);
diags_[e->extractor] = d;
CONNECTU(d, qualityChanged, this, diagQualityChanged);
}
__device_pool__->unlock();
CONNECT2(void, uchar * , int, e->extractor, packetReceived, this, packetExtractorReceived)
}
if (!e->devices.contains(dev)) {
bounded_extractors[dev] << e->extractor;
//if (PIString(dev->className()) == "PIPacketExtractor") dev->setThreadSafe(false);
e->devices << dev;
}
return e->extractor;
}
bool PIConnection::removeFilter(const PIString & name_, const PIString & full_path) {
return removeFilter(name_, devByString(full_path));
}
bool PIConnection::removeFilter(const PIString & name_, const PIIODevice * dev) {
if (dev == 0) return false;
Extractor * p = extractors.value(name_.trimmed());
if (p == 0) return false;
bool ret = false;
for (int i = 0; i < p->devices.size_s(); ++i) {
if (p->devices[i] == dev) {
bounded_extractors[p->devices[i]].removeAll(p->extractor);
p->devices.remove(i);
--i;
ret = true;
}
}
if (p->devices.isEmpty()) {
unboundExtractor(p->extractor);
delete p;
}
return ret;
}
bool PIConnection::removeFilter(const PIString & name_) {
Extractor * p = extractors.value(name_.trimmed());
if (p == 0) return false;
unboundExtractor(p->extractor);
delete p;
return true;
}
void PIConnection::removeAllFilters() {
__device_pool__->lock();
piForeachC (PEPair & i, extractors) {
if (i.second == 0) continue;
channels_.remove(i.second->extractor);
for (PIMap<PIIODevice * , PIVector<PIIODevice * > >::iterator it = channels_.begin(); it != channels_.end(); ++it)
it.value().removeAll(i.second->extractor);
if (diags_.value(i.second->extractor, 0) != 0)
delete diags_.value(i.second->extractor);
diags_.remove(i.second->extractor);
delete i.second;
}
extractors.clear();
bounded_extractors.clear();
__device_pool__->unlock();
}
PIVector<PIPacketExtractor * > PIConnection::filters() const {
PIVector<PIPacketExtractor * > ret;
piForeachC (PEPair & i, extractors)
if (i.second != 0)
if (i.second->extractor != 0) ret << i.second->extractor;
return ret;
}
PIStringList PIConnection::filterNames() const {
PIStringList ret;
piForeachC (PEPair & i, extractors)
if (i.second != 0)
if (i.second->extractor != 0) ret << i.first;
return ret;
}
PIPacketExtractor * PIConnection::filter(const PIString & name_) const {
PIString fname_ = name_.trimmed();
piForeachC (PEPair & i, extractors)
if (i.second != 0)
if (i.second->extractor != 0 && i.first == fname_)
return i.second->extractor;
return 0;
}
PIVector<PIIODevice * > PIConnection::filterBoundedDevices(const PIString & name_) const {
PIVector<PIIODevice * > ret;
Extractor * p = extractors.value(name_.trimmed());
if (p == 0) return ret;
return p->devices;
}
bool PIConnection::addChannel(const PIString & name0, const PIString & name1) {
//piCout << "addChannel" << name0 << name1;
if (name0.isEmpty() || name1.isEmpty()) return false;
PIIODevice * dev0 = devByString(name0), * dev1 = devByString(name1);
PIPacketExtractor * pe0(0), * pe1(0);
if (extractors.value(name0) != 0) pe0 = extractors.value(name0)->extractor;
if (extractors.value(name1) != 0) pe1 = extractors.value(name1)->extractor;
if (pe0 != 0) dev0 = pe0;
if (pe1 != 0) dev1 = pe1;
if (dev0 == 0 || dev1 == 0) {
if (dev0 == 0) piCoutObj << "\"addChannel\" error: no such device \"" << name0 << "\"!";
if (dev1 == 0) piCoutObj << "\"addChannel\" error: no such device \"" << name1 << "\"!";
return false;
}
if (!channels_[dev0].contains(dev1))
channels_[dev0] << dev1;
return true;
}
bool PIConnection::removeChannel(const PIString & name0, const PIString & name1) {
PIIODevice * dev0 = devByString(name0), * dev1 = devByString(name1);
PIPacketExtractor * pe0(0), * pe1(0);
if (extractors.value(name0) != 0) pe0 = extractors.value(name0)->extractor;
if (extractors.value(name1) != 0) pe1 = extractors.value(name1)->extractor;
if (pe0 != 0) dev0 = pe0;
if (pe1 != 0) dev1 = pe1;
if (dev0 == 0 || dev1 == 0) return false;
channels_[dev0].removeAll(dev1);
return true;
}
bool PIConnection::removeChannel(const PIString & name0) {
PIIODevice * dev0 = devByString(name0);
PIPacketExtractor * pe0(0);
if (extractors.value(name0) != 0) pe0 = extractors.value(name0)->extractor;
if (pe0 != 0) dev0 = pe0;
if (dev0 == 0) return false;
channels_.remove(dev0);
for (PIMap<PIIODevice * , PIVector<PIIODevice * > >::iterator it = channels_.begin(); it != channels_.end(); ++it)
it.value().removeAll(dev0);
return true;
}
void PIConnection::removeAllChannels() {
channels_.clear();
}
PIString PIConnection::devPath(const PIIODevice * d) const {
if (d == 0) return PIString();
if (strcmp(d->className(), "PIPacketExtractor") == 0) return d->name();
return d->constructFullPath();
}
PIString PIConnection::devFPath(const PIIODevice * d) const {
if (d == 0) return PIString();
if (d->isPropertyExists("__fullPath__")) return d->property("__fullPath__").toString();
return d->name();
}
PIIODevice * PIConnection::devByString(const PIString & s) const {
if (s.isEmpty()) return 0;
PIIODevice * ret = deviceByName(s);
if (!ret) ret = deviceByFullPath(s);
return ret;
}
PIVector<PIPair<PIString, PIString > > PIConnection::channels() const {
PIVector<PIPair<PIString, PIString > > ret;
piForeachC (CPair & i, channels_) {
PIString fp0(devFPath(i.first));
piForeachC (PIIODevice * d, i.second)
ret << PIPair<PIString, PIString>(fp0, devFPath(d));
}
return ret;
}
void PIConnection::addSender(const PIString & name_, const PIString & full_path_name, float frequency, bool start_) {
PIString fname_ = name_.trimmed();
if (full_path_name.isEmpty() || frequency <= 0.) return;
Sender * s = senders.value(fname_);
if (s == 0) {
s = new Sender(this);
s->setName(fname_);
s->int_ = 1000. / frequency;
senders[fname_] = s;
}
PIIODevice * dev = devByString(full_path_name);
if (dev == 0) {
piCoutObj << "\"addSender\" error: no such device \"" << full_path_name << "\"!";
return;
}
if (!s->isRunning() && start_) {
//piCoutObj << name_ << "start" << 1000. / frequency;
if (!__device_pool__->fake) s->start(s->int_);
}
s->lock();
if (!s->devices.contains(dev))
s->devices << dev;
s->unlock();
}
bool PIConnection::removeSender(const PIString & name, const PIString & full_path_name) {
Sender * s = senders.value(name, 0);
PIIODevice * d = devByString(full_path_name);
if (s == 0 || d == 0) return false;
s->lock();
bool ret = s->devices.contains(d);
if (ret)
s->devices.removeAll(d);
s->unlock();
return ret;
}
bool PIConnection::removeSender(const PIString & name) {
Sender * s = senders.value(name, 0);
if (s == 0) return false;
delete s;
senders.remove(name);
return true;
}
bool PIConnection::setSenderFixedData(const PIString & name, const PIByteArray & data) {
Sender * s = senders.value(name, 0);
if (s == 0) return false;
s->lock();
s->sdata = data;
s->unlock();
return true;
}
bool PIConnection::clearSenderFixedData(const PIString & name) {
Sender * s = senders.value(name, 0);
if (s == 0) return false;
s->lock();
s->sdata.clear();
s->unlock();
return true;
}
PIByteArray PIConnection::senderFixedData(const PIString & name) const {
Sender * s = senders.value(name, 0);
if (s == 0) return PIByteArray();
return s->sdata;
}
float PIConnection::senderFrequency(const PIString & name) const {
Sender * s = senders.value(name, 0);
if (s == 0) return -1.f;
double i = s->interval();
if (i == 0.) return 0.f;
return 1000. / s->interval();
}
void PIConnection::removeAllSenders() {
piForeachC (SPair & s, senders)
if (s.second != 0)
delete s.second;
senders.clear();
}
void PIConnection::startThreadedRead(const PIString & full_path_name) {
DevicePool::DeviceData * dd = __device_pool__->deviceData(devByString(full_path_name));
if (dd == 0) return;
if (dd->dev == 0) return;
if (dd->started || dd->dev->mode() == PIIODevice::WriteOnly) return;
if (!__device_pool__->fake) dd->rthread->start();
dd->started = true;
}
void PIConnection::startAllThreadedReads() {
piForeachC (DevicePool::DDPair & d, __device_pool__->devices)
startThreadedRead(d.first);
}
void PIConnection::startSender(const PIString & name) {
Sender * s = senders.value(name, 0);
if (s == 0) return;
if (!s->isRunning() && !__device_pool__->fake)
s->start(s->int_);
}
void PIConnection::startAllSenders() {
piForeachC (SPair & s, senders) {
if (s.second == 0) continue;
if (!s.second->isRunning() && !__device_pool__->fake)
s.second->start(s.second->int_);
}
}
void PIConnection::stopThreadedRead(const PIString & full_path_name) {
DevicePool::DeviceData * dd = __device_pool__->deviceData(devByString(full_path_name));
if (dd == 0) return;
if (dd->dev == 0) return;
if (!dd->started || dd->dev->mode() == PIIODevice::WriteOnly) return;
dd->rthread->stop();
dd->started = false;
}
void PIConnection::stopAllThreadedReads() {
piForeachC (DevicePool::DDPair & d, __device_pool__->devices)
stopThreadedRead(d.first);
}
void PIConnection::stopSender(const PIString & name) {
Sender * s = senders.value(name, 0);
if (s == 0) return;
if (s->isRunning()) s->stop();
}
void PIConnection::stopAllSenders() {
piForeachC (SPair & s, senders) {
if (s.second == 0) continue;
if (s.second->isRunning())
s.second->stop();
}
}
PIDiagnostics * PIConnection::diagnostic(const PIString & full_path_name) const {
PIIODevice * dev = devByString(full_path_name);
PIPacketExtractor * pe(0);
if (extractors.value(full_path_name) != 0) pe = extractors.value(full_path_name)->extractor;
if (pe != 0) dev = pe;
if (dev == 0) return 0;
return diags_.value(dev, 0);
}
int PIConnection::writeByFullPath(const PIString & full_path, const PIByteArray & data) {
PIString fp = PIIODevice::normalizeFullPath(full_path);
PIIODevice * dev = __device_pool__->device(fp);
//piCout << "SEND" << full_path << fp;
if (!dev) {
piCoutObj << "No such full path \"" << full_path << "\"!";
return -1;
}
return write(dev, data);
}
int PIConnection::writeByName(const PIString & name_, const PIByteArray & data) {
PIIODevice * dev = deviceByName(name_);
if (!dev) {
piCoutObj << "No such device \"" << name_ << "\"!";
return -1;
}
return write(dev, data);
}
int PIConnection::write(PIIODevice * dev, const PIByteArray & data) {
if (dev == 0) {
piCoutObj << "Null Device!";
return -1;
}
if (!dev->isOpened()) return -1;
if (!dev->canWrite()) {
piCoutObj << "Device \"" << dev->constructFullPath() << "\" can`t write!";
return -1;
}
int ret = dev->write(data);
PIDiagnostics * diag = diags_.value(dev);
if (diag != 0 && ret > 0) diag->sended(ret);
return ret;
}
PIVector< PIConnection * > PIConnection::allConnections() {
return _connections;
}
PIVector< PIIODevice * > PIConnection::allDevices() {
return __device_pool__->boundedDevices();
}
bool PIConnection::setFakeMode(bool yes) {
bool ret = isFakeMode();
__device_pool__->fake = yes;
return ret;
}
bool PIConnection::isFakeMode() {
return __device_pool__->fake;
}
PIConnection::DevicePool::DevicePool(): PIThread(false, 10) {
setName("PIConnection::DevicePool");
needLockRun(true);
fake = false;
}
void PIConnection::DevicePool::init() {
if (!isRunning())
start(10);
}
PIIODevice * PIConnection::DevicePool::addDevice(PIConnection * parent, const PIString & fp, PIIODevice::DeviceMode mode, bool start) {
DeviceData * dd = devices[fp];
int pmode(0);
bool need_start = false;
if (dd == 0) {
dd = new DeviceData();
devices[fp] = dd;
}
if (dd->dev == 0) {
//piCout << "new device" << fp;
dd->dev = PIIODevice::createFromFullPath(fp);
if (dd->dev == 0) {
piCoutObj << "Error: can`t create device \"" << fp << "\"!"; //:" << errorString();
return 0;
}
dd->dev->setProperty("__fullPath__", fp);
} else
pmode = dd->dev->mode();
if (!dd->listeners.contains(parent))
dd->listeners << parent;
if (pmode == mode || pmode == PIIODevice::ReadWrite)
return dd->dev;
if ((mode & PIIODevice::ReadOnly) > 0) {
if (dd->rthread != 0) {
delete dd->rthread;
dd->rthread = 0;
dd->started = false;
}
dd->rthread = new PIThread(dd, __DevicePool_threadReadDP);
dd->rthread->setName("__S__connection_" + fp + "_read_thread");
need_start = true;
pmode |= PIIODevice::ReadOnly;
}
if ((mode & PIIODevice::WriteOnly) > 0)
pmode |= PIIODevice::WriteOnly;
if (!fake) {
dd->dev->close();
dd->dev->open((PIIODevice::DeviceMode)pmode);
} else
dd->dev->setMode((PIIODevice::DeviceMode)pmode);
if (need_start && start) {
if (!fake) dd->rthread->start();
dd->started = true;
}
return dd->dev;
}
bool PIConnection::DevicePool::removeDevice(PIConnection * parent, const PIString & fp) {
DeviceData * dd = devices.value(fp);
if (dd == 0)
return false;
if (dd->dev == 0)
return false;
bool ok = dd->listeners.contains(parent);
dd->listeners.removeAll(parent);
if (dd->listeners.isEmpty()) {
delete dd;
devices.remove(fp);
}
return ok;
}
void PIConnection::DevicePool::unboundConnection(PIConnection * parent) {
PIStringList rem;
piForeachC (DDPair & i, devices) {
if (i.second == 0) {
rem << i.first;
continue;
}
i.second->listeners.removeAll(parent);
if (i.second->listeners.isEmpty())
rem << i.first;
}
piForeachC (PIString & i, rem) {
DeviceData * dd = devices.value(i);
if (dd == 0)
continue;
delete dd;
devices.remove(i);
}
}
PIIODevice * PIConnection::DevicePool::device(const PIString & fp) const {
DeviceData * dd = devices.value(fp);
if (dd == 0) return 0;
return dd->dev;
}
PIConnection::DevicePool::DeviceData * PIConnection::DevicePool::deviceData(PIIODevice * d) const {
if (!d) return 0;
piForeachC (DDPair & i, devices) {
if (i.second->dev == d)
return i.second;
}
return 0;
}
PIVector<PIConnection * > PIConnection::DevicePool::boundedConnections() const {
PIVector<PIConnection * > ret;
piForeachC (DDPair & i, devices) {
if (i.second == 0)
continue;
ret << i.second->listeners;
}
for (int i = 0; i < ret.size_s(); ++i)
for (int j = i + 1; j < ret.size_s(); ++j)
if (ret[i] == ret[j]) {
ret.remove(j);
--j;
}
return ret;
}
PIVector< PIIODevice * > PIConnection::DevicePool::boundedDevices() const {
PIVector<PIIODevice * > ret;
piForeachC (DDPair & i, devices) {
if (i.second == 0) continue;
if (i.second->dev == 0) continue;
ret << i.second->dev;
}
return ret;
}
PIVector<PIIODevice * > PIConnection::DevicePool::boundedDevices(const PIConnection * parent) const {
PIVector<PIIODevice * > ret;
piForeachC (DDPair & i, devices) {
if (i.second == 0) continue;
if (i.second->dev == 0) continue;
if (i.second->listeners.contains(const_cast<PIConnection*>(parent)))
ret << i.second->dev;
}
return ret;
}
PIConnection::DevicePool::DeviceData::~DeviceData() {
if (rthread != 0) {
rthread->stop();
if (!rthread->waitForFinish(1000))
rthread->terminate();
delete rthread;
rthread = 0;
}
if (dev != 0) {
dev->close();
delete dev;
dev = 0;
}
}
void PIConnection::DevicePool::run() {
PIVector<PIConnection * > conns(PIConnection::allConnections());
piForeach (PIConnection * c, conns) {
piForeachC (PIConnection::DPair & d, c->diags_) {
if (d.second == 0) continue;
d.second->tick(0, 1);
}
}
}
void __DevicePool_threadReadDP(void * ddp) {
PIConnection::DevicePool::DeviceData * dd((PIConnection::DevicePool::DeviceData * )ddp);
if (dd->dev == 0) {piMSleep(100); return;}
if (dd->dev->isClosed())
if (!dd->dev->open()) {piMSleep(dd->dev->reopenTimeout()); return;}
PIByteArray ba;
ba = dd->dev->read(dd->dev->threadedReadBufferSize());
// dd->dev->threadedRead(ba.data(), ba.size());
if (ba.isEmpty()) {piMSleep(10); return;}
dd->dev->threadedRead(ba.data(), ba.size_s());
//piCout << "Readed from" << dd->dev->path() << Hex << ba;
__device_pool__->deviceReaded(dd, ba);
}
void PIConnection::DevicePool::deviceReaded(PIConnection::DevicePool::DeviceData * dd, const PIByteArray & data) {
PIString from = dd->dev->property("__fullPath__").toString();
piForeach (PIConnection * ld, dd->listeners)
ld->rawReceived(dd->dev, from, data);
}
bool PIConnection::filterValidateHeaderS(void * c, uchar * src, uchar * rec, int size) {
PIPair<PIConnection * , PIString> * p((PIPair<PIConnection * , PIString> * )c);
return p->first->filterValidateHeader(p->second, src, rec, size);
}
bool PIConnection::filterValidateFooterS(void * c, uchar * src, uchar * rec, int size) {
PIPair<PIConnection * , PIString> * p((PIPair<PIConnection * , PIString> * )c);
return p->first->filterValidateFooter(p->second, src, rec, size);
}
bool PIConnection::filterValidatePayloadS(void * c, uchar * rec, int size) {
PIPair<PIConnection * , PIString> * p((PIPair<PIConnection * , PIString> * )c);
return p->first->filterValidatePayload(p->second, rec, size);
}
void PIConnection::rawReceived(PIIODevice * dev, const PIString & from, const PIByteArray & data) {
dataReceived(from, data);
dataReceivedEvent(from, data);
PIVector<PIPacketExtractor * > be(bounded_extractors.value(dev));
//piCout << be;
piForeach (PIPacketExtractor * i, be)
i->threadedRead(const_cast<uchar * >(data.data()), data.size_s());
PIVector<PIIODevice * > chd(channels_.value(dev));
piForeach (PIIODevice * d, chd) {
int ret = d->write(data);
PIDiagnostics * diag = diags_.value(d);
if (diag != 0 && ret > 0) diag->sended(ret);
}
PIDiagnostics * diag = diags_.value(dev);
if (diag != 0) diag->received(data.size_s());
}
bool PIConnection::filterValidateHeader(const PIString & filter_name, uchar * src, uchar * rec, int size) {
for (int i = 0; i < size; ++i)
if (src[i] != rec[i])
return false;
return true;
}
bool PIConnection::filterValidateFooter(const PIString & filter_name, uchar * src, uchar * rec, int size) {
for (int i = 0; i < size; ++i)
if (src[i] != rec[i])
return false;
return true;
}
bool PIConnection::filterValidatePayload(const PIString & filter_name, uchar * rec, int size) {
return true;
}
PIByteArray PIConnection::senderData(const PIString & sender_name) {
return PIByteArray();
}
PIConnection::Extractor::~Extractor() {
if (extractor != 0) {
if (extractor->threadedReadData() != 0)
delete (PIPair<PIConnection * , PIString> * )(extractor->threadedReadData());
delete extractor;
extractor = 0;
}
}
void PIConnection::Sender::tick(void * , int) {
if (parent == 0) return;
PIByteArray data;
if (!sdata.isEmpty()) data = sdata;
else data = parent->senderData(name());
if (data.isEmpty()) return;
//piCoutObj << "write"<<data.size()<<"bytes to"<<devices.size()<<"devices";
piForeach (PIIODevice * d, devices) {
int ret = d->write(data);
PIDiagnostics * diag = parent->diags_.value(d);
if (diag != 0 && ret > 0) diag->sended(ret);
}
}
void PIConnection::unboundExtractor(PIPacketExtractor * pe) {
if (pe == 0) return;
channels_.remove(pe);
for (PIMap<PIIODevice * , PIVector<PIIODevice * > >::iterator it = channels_.begin(); it != channels_.end(); ++it)
it.value().removeAll(pe);
bounded_extractors.remove(pe);
PIVector<PIIODevice * > k = bounded_extractors.keys();
piForeach (PIIODevice * i, k) {
PIVector<PIPacketExtractor * > & be(bounded_extractors[i]);
be.removeAll(pe);
if (be.isEmpty())
bounded_extractors.remove(i);
}
__device_pool__->lock();
if (diags_.value(pe, 0) != 0)
delete diags_.value(pe);
diags_.remove(pe);
extractors.remove(pe->name());
__device_pool__->unlock();
}
void PIConnection::packetExtractorReceived(uchar * data, int size) {
PIString from(emitter() == 0 ? "" : emitter()->name());
PIIODevice * cd = (PIIODevice * )emitter();
// piCout << "packetExtractorReceived" << from << cd;
if (cd != 0) {
PIVector<PIPacketExtractor * > be(bounded_extractors.value(cd));
//piCout << be << (void*)data << size;
piForeach (PIPacketExtractor * i, be)
i->threadedRead(data, size);
PIVector<PIIODevice * > chd(channels_.value(cd));
piForeach (PIIODevice * d, chd) {
int ret = d->write(data, size);
PIDiagnostics * diag = diags_.value(d);
if (diag != 0) diag->sended(ret);
}
PIDiagnostics * diag = diags_.value(cd);
if (diag != 0) diag->received(size);
}
packetReceived(from, PIByteArray(data, size));
packetReceivedEvent(from, PIByteArray(data, size));
}
void PIConnection::diagQualityChanged(PIDiagnostics::Quality new_quality, PIDiagnostics::Quality old_quality) {
qualityChanged(diags_.key((PIDiagnostics*)emitter()), new_quality, old_quality);
}
PIConnection::DevicePool * __device_pool__;
bool __DevicePoolContainer__::inited_(false);
__DevicePoolContainer__::__DevicePoolContainer__() {
if (inited_) return;
inited_ = true;
__device_pool__ = new PIConnection::DevicePool();
}

View File

@@ -0,0 +1,426 @@
/*! \file piconnection.h
* \brief Complex I/O point
*/
/*
PIP - Platform Independent Primitives
Complex I/O point
Copyright (C) 2018 Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PICONNECTION_H
#define PICONNECTION_H
#include "pipacketextractor.h"
#include "pidiagnostics.h"
class PIConfig;
class PIP_EXPORT PIConnection: public PIObject
{
PIOBJECT_SUBCLASS(PIConnection, PIObject)
public:
//! Constructs connection with name "name", or with default name = "connection"
PIConnection(const PIString & name = PIStringAscii("connection"));
//! Constructs connection and configure it from config file "config" from section "name"
PIConnection(const PIString & config, const PIString & name);
//! Constructs connection and configure it from config content "string" from section "name"
PIConnection(PIString * string, const PIString & name);
~PIConnection();
/*! \brief Configure connection from config file "config" from section "name". Returns if configuration was successful
* \details \b Warning: all devices, filters and channels removed before configure! */
bool configureFromConfig(const PIString & config, const PIString & name = PIStringAscii("connection"));
/*! \brief Configure connection from config content "string" from section "name". Returns if configuration was successful
* \details \b Warning: all devices, filters and channels removed before configure! */
bool configureFromString(PIString * string, const PIString & name = PIStringAscii("connection"));
//! Returns config file section of current connection configuration
PIString makeConfig() const;
/*! \brief Add device with full path "full_path", open mode "mode" to Device pool and connection
* \details Returns pointer to device or null if device can not be created. If "start" is true,
* read thread is started immediately. Else, you can start read thread with functions \a startThreadedRead()
* or \a startAllThreadedReads(). By default, read thread doesn`t start */
PIIODevice * addDevice(const PIString & full_path, PIIODevice::DeviceMode mode = PIIODevice::ReadWrite, bool start = false);
void setDeviceName(PIIODevice * dev, const PIString & name);
PIStringList deviceNames(const PIIODevice * dev) const;
/*! \brief Remove device with full path "full_path" from connection
* \details Returns if device was removed. If there is no connection bounded to this device,
* it will be removed from Device pool */
bool removeDevice(const PIString & full_path);
/*! \brief Remove all device from connection
* \details If there is no connection bounded to there devices, they removed from Device pool */
void removeAllDevices();
//! Returns device with full path "full_path" or null if there is no such device
PIIODevice * deviceByFullPath(const PIString & full_path) const;
//! Returns device with name "name" or null if there is no such device
PIIODevice * deviceByName(const PIString & name) const;
//! Returns all devices bounded to this connection
PIVector<PIIODevice * > boundedDevices() const;
/*! \brief Add filter with name "name" to device with full path "full_path_name" or filter "full_path_name"
* \details If there is no filter with name "name", connection create new with split mode "mode" and bound
* to it device "full_path_name" or filter "full_path_name". If filter with name "name" already exists,
* device "full_path_name" or filter "full_path_name" add to this filter.
* This function returns PIPacketExtractor * assosiated with this filter
* \n \b Attention! "mode" is altual olny if new filter was created! */
PIPacketExtractor * addFilter(const PIString & name, const PIString & full_path_name, PIPacketExtractor::SplitMode mode = PIPacketExtractor::None);
//! Add filter with name "name" to device "dev"
PIPacketExtractor * addFilter(const PIString & name, const PIIODevice * dev, PIPacketExtractor::SplitMode mode = PIPacketExtractor::None) {return addFilter(name, devFPath(dev), mode);}
//! Add filter with "filter" to device "dev"
PIPacketExtractor * addFilter(PIPacketExtractor * filter, const PIString & full_path_name);
//! Add filter with "filter" to device "dev"
PIPacketExtractor * addFilter(PIPacketExtractor * filter, const PIIODevice * dev) {return addFilter(filter, devFPath(dev));}
/*! \brief Remove from filter with name "name" device with full path "full_path_name" or filter "full_path_name"
* \details If there is no devices bounded to this filter, it will be removed. Returns if device was removed */
bool removeFilter(const PIString & name, const PIString & full_path_name);
//! Remove from filter with name "name" device or filter "dev"
bool removeFilter(const PIString & name, const PIIODevice * dev);
//! Remove filter with name "name". Returns if filter was removed
bool removeFilter(const PIString & name);
//! Remove all filters from connection
void removeAllFilters();
//! Returns all filters of connection
PIVector<PIPacketExtractor * > filters() const;
//! Returns all filter names of connection
PIStringList filterNames() const;
//! Returns PIPacketExtractor * assosiated with filter "name" or null if there is no such filter
PIPacketExtractor * filter(const PIString & name) const;
//! Returns all devices bounded to filter "name"
PIVector<PIIODevice * > filterBoundedDevices(const PIString & name) const;
/*! \brief Add to connection channel from "name_from" to "name_to"
* \details "name_from" and "name_to" can be full pathes of devices or device names or filter names.
* Returns \b false if there if no such device or filter, else create channel and returns \b true */
bool addChannel(const PIString & name_from, const PIString & name_to);
//! Add to connection channel from "name_from" to "dev_to"
bool addChannel(const PIString & name_from, const PIIODevice * dev_to) {return addChannel(name_from, devFPath(dev_to));}
//! Add to connection channel from "dev_from" to "name_to"
bool addChannel(const PIIODevice * dev_from, const PIString & name_to) {return addChannel(devFPath(dev_from), name_to);}
//! Add to connection channel from "dev_from" to "dev_to"
bool addChannel(const PIIODevice * dev_from, const PIIODevice * dev_to) {return addChannel(devFPath(dev_from), devFPath(dev_to));}
/*! \brief Remove from connection channel from "name_from" to "name_to"
* \details "name_from" and "name_to" can be full pathes of devices or filter names.
* Returns \b false if there if no such device or filter, else remove channel and returns \b true */
bool removeChannel(const PIString & name_from, const PIString & name_to);
//! Remove from connection channel from "name_from" to "dev_to"
bool removeChannel(const PIString & name_from, const PIIODevice * dev_to) {return removeChannel(name_from, devFPath(dev_to));}
//! Remove from connection channel from "dev_from" to "name_to"
bool removeChannel(const PIIODevice * dev_from, const PIString & name_to) {return removeChannel(devFPath(dev_from), name_to);}
//! Remove from connection channel from "dev_from" to "dev_to"
bool removeChannel(const PIIODevice * dev_from, const PIIODevice * dev_to) {return removeChannel(devFPath(dev_from), devFPath(dev_to));}
/*! \brief Remove from connection all channels from "name_from"
* \details "name_from" can be full path of device or filter name.
* Returns \b false if there if no such device or filter, else remove channels and returns \b true */
bool removeChannel(const PIString & name_from);
//! Remove from connection all channels from "dev_from"
bool removeChannel(const PIIODevice * dev_from) {return removeChannel(devFPath(dev_from));}
//! Remove from connection all channels
void removeAllChannels();
//! Returns all channels of this connection as full pathes or filter names pair array (from, to)
PIVector<PIPair<PIString, PIString> > channels() const;
/*! \brief Add to connection sender with name "name" device with full path "full_path"
* \details If there is no sender with name "name", connection create new, bound
* to it device "full_path_name" and start sender timer with frequency "frequency".
* If sender with name "name" already exists, device "full_path_name" add to this sender
* If "start" is true, sender is started immediately. Else, you can start sender with
* functions \a startSender()
* \n \b Attention! "frequency" is actual olny if new sender was created! */
void addSender(const PIString & name, const PIString & full_path_name, float frequency, bool start = false);
//! Add to connection sender with name "name" device "dev"
void addSender(const PIString & name, const PIIODevice * dev, float frequency, bool start = false) {addSender(name, devFPath(dev), frequency, start);}
/*! \brief Remove from sender with name "name" device with full path "full_path_name"
* \details If there is no devices bounded to this sender, it will be removed. Returns if sender was removed */
bool removeSender(const PIString & name, const PIString & full_path_name);
//! Remove from sender with name "name" device "dev"
bool removeSender(const PIString & name, const PIIODevice * dev) {return removeSender(name, devFPath(dev));}
//! Remove sender with name "name", returns if sender was removed
bool removeSender(const PIString & name);
//! Set sender "name" fixed send data "data", returns if sender exists
bool setSenderFixedData(const PIString & name, const PIByteArray & data);
//! Remove sender "name" fixed send data, returns if sender exists
bool clearSenderFixedData(const PIString & name);
//! Returns sender "name" fixed send data
PIByteArray senderFixedData(const PIString & name) const;
//! Returns sender "name" timer frequency, -1 if there is no such sender, or 0 if sender is not started yet
float senderFrequency(const PIString & name) const;
//! Remove from connection all senders
void removeAllSenders();
//! Start read thread of device with full path "full_path"
void startThreadedRead(const PIString & full_path_name);
//! Start read thread of device "dev"
void startThreadedRead(const PIIODevice * dev) {startThreadedRead(devFPath(dev));}
//! Start read threads of all Device pool device
void startAllThreadedReads();
//! Start sender "name" timer
void startSender(const PIString & name);
//! Start all senders timers
void startAllSenders();
//! Start all read threads and senders
void start() {startAllThreadedReads(); startAllSenders();}
//! Stop read thread of device with full path "full_path"
void stopThreadedRead(const PIString & full_path_name);
//! Stop read thread of device "dev"
void stopThreadedRead(const PIIODevice * dev) {stopThreadedRead(devFPath(dev));}
//! Stop read threads of all Device pool device
void stopAllThreadedReads();
//! Stop sender "name" timer
void stopSender(const PIString & name);
//! Stop all senders timers
void stopAllSenders();
//! Stop all read threads and senders
void stop() {stopAllThreadedReads(); stopAllSenders();}
//! Stop connection and remove all devices
void destroy() {stop(); removeAllDevices();}
//! Returns if there are no devices in this connection
bool isEmpty() const {return device_modes.isEmpty();}
//! Returns PIDiagnostics * assosiated with device with full path "full_path_name", name "full_path_name" or filter "full_path_name"
PIDiagnostics * diagnostic(const PIString & full_path_name) const;
//! Returns PIDiagnostics * assosiated with device or filter "dev"
PIDiagnostics * diagnostic(const PIIODevice * dev) const {return diags_.value(const_cast<PIIODevice * >(dev), 0);}
//! Write data "data" to device with full path "full_path" and returns result of \a write() function of device
int writeByFullPath(const PIString & full_path, const PIByteArray & data);
//! Write data "data" to device with name "name" and returns result of \a write() function of device
int writeByName(const PIString & name, const PIByteArray & data);
//! Write data "data" to device "dev" and returns result of \a write() function of device
int write(PIIODevice * dev, const PIByteArray & data);
//! Returns all connections in application
static PIVector<PIConnection * > allConnections();
//! Returns all devices in Device pool
static PIVector<PIIODevice * > allDevices();
//! Set Device pool fake mode to \"yes\" and returns previous mode
static bool setFakeMode(bool yes);
//! Returns if Device pool works in fake mode
static bool isFakeMode();
class PIP_EXPORT DevicePool: public PIThread {
PIOBJECT_SUBCLASS(DevicePool, PIThread)
friend void __DevicePool_threadReadDP(void * ddp);
friend class PIConnection;
protected:
struct DeviceData;
public:
DevicePool();
void init();
PIIODevice * addDevice(PIConnection * parent, const PIString & fp, PIIODevice::DeviceMode mode = PIIODevice::ReadWrite, bool start = true);
bool removeDevice(PIConnection * parent, const PIString & fp);
void unboundConnection(PIConnection * parent);
PIIODevice * device(const PIString & fp) const;
DeviceData * deviceData(PIIODevice * d) const;
PIVector<PIConnection * > boundedConnections() const;
PIVector<PIIODevice * > boundedDevices() const;
PIVector<PIIODevice * > boundedDevices(const PIConnection * parent) const;
protected:
struct PIP_EXPORT DeviceData {
DeviceData(): dev(0), rthread(0), started(false) {}
~DeviceData();
PIIODevice * dev;
PIThread * rthread;
bool started;
PIVector<PIConnection * > listeners;
};
void run();
void deviceReaded(DeviceData * dd, const PIByteArray & data);
typedef PIMap<PIString, DeviceData * >::value_type DDPair;
PIMap<PIString, DeviceData * > devices;
bool fake;
};
EVENT2(dataReceivedEvent, const PIString &, from, const PIByteArray &, data)
EVENT2(packetReceivedEvent, const PIString &, from, const PIByteArray &, data)
EVENT3(qualityChanged, const PIIODevice * , dev, PIDiagnostics::Quality, new_quality, PIDiagnostics::Quality, old_quality)
//! \events
//! \{
//! \fn void dataReceivedEvent(const PIString & from, const PIByteArray & data)
//! \brief Raise on data received from device with full path "from"
//! \fn void packetReceivedEvent(const PIString & from, const PIByteArray & data)
//! \brief Raise on packet received from filter with name "from"
//! \fn void qualityChanged(const PIIODevice * device, PIDiagnostics::Quality new_quality, PIDiagnostics::Quality old_quality)
//! \brief Raise on diagnostic quality of device "device" changed from "old_quality" to "new_quality"
//! \}
protected:
//! Executes on data received from device with full path "from"
virtual void dataReceived(const PIString & from, const PIByteArray & data) {}
//! Executes on packet received from filter with name "from"
virtual void packetReceived(const PIString & from, const PIByteArray & data) {}
//! Validate header "rec" with source header "src" and size "size", executes from filter "filter_name"
virtual bool filterValidateHeader(const PIString & filter_name, uchar * src, uchar * rec, int size);
//! Validate footer "rec" with source footer "src" and size "size", executes from filter "filter_name"
virtual bool filterValidateFooter(const PIString & filter_name, uchar * src, uchar * rec, int size);
//! Validate payload "rec" with size "size", executes from filter "filter_name"
virtual bool filterValidatePayload(const PIString & filter_name, uchar * rec, int size);
//! You should returns data for sender "sender_name"
virtual PIByteArray senderData(const PIString & sender_name);
private:
static bool filterValidateHeaderS(void * c, uchar * src, uchar * rec, int size);
static bool filterValidateFooterS(void * c, uchar * src, uchar * rec, int size);
static bool filterValidatePayloadS(void * c, uchar * rec, int size);
bool configure(PIConfig & conf, const PIString & name_);
void rawReceived(PIIODevice * dev, const PIString & from, const PIByteArray & data);
void unboundExtractor(PIPacketExtractor * pe);
EVENT_HANDLER2(void, packetExtractorReceived, uchar * , data, int, size);
EVENT_HANDLER2(void, diagQualityChanged, PIDiagnostics::Quality, new_quality, PIDiagnostics::Quality, old_quality);
PIString devPath(const PIIODevice * d) const;
PIString devFPath(const PIIODevice * d) const;
PIIODevice * devByString(const PIString & s) const;
struct PIP_EXPORT Extractor {
Extractor(): extractor(0) {}
~Extractor();
PIPacketExtractor * extractor;
PIVector<PIIODevice * > devices;
};
class PIP_EXPORT Sender: public PITimer {
PIOBJECT_SUBCLASS(Sender, PIObject)
public:
Sender(PIConnection * parent_ = 0): parent(parent_), int_(0.f) {needLockRun(true);}
~Sender() {stop();}
PIConnection * parent;
PIVector<PIIODevice * > devices;
PIByteArray sdata;
float int_;
void tick(void * , int);
};
typedef PIMap<PIString, Extractor * >::value_type PEPair;
typedef PIMap<PIString, Sender * >::value_type SPair;
typedef PIMap<PIString, PIIODevice * >::value_type DNPair;
typedef PIMap<PIIODevice * , PIVector<PIPacketExtractor * > >::value_type BEPair;
typedef PIMap<PIIODevice * , PIVector<PIIODevice * > >::value_type CPair;
typedef PIMap<PIIODevice * , PIDiagnostics * >::value_type DPair;
PIMap<PIString, Extractor * > extractors;
PIMap<PIString, Sender * > senders;
PIMap<PIString, PIIODevice * > device_names;
PIMap<PIIODevice * , PIIODevice::DeviceMode> device_modes;
PIMap<PIIODevice * , PIVector<PIPacketExtractor * > > bounded_extractors;
PIMap<PIIODevice * , PIVector<PIIODevice * > > channels_;
PIMap<PIIODevice * , PIDiagnostics * > diags_;
static PIVector<PIConnection * > _connections;
};
void __DevicePool_threadReadDP(void * ddp);
extern PIConnection::DevicePool * __device_pool__;
class PIP_EXPORT __DevicePoolContainer__ {
public:
__DevicePoolContainer__();
static bool inited_;
};
static __DevicePoolContainer__ __device_pool_container__;
#endif // PICONNECTION_H

View File

@@ -0,0 +1,24 @@
#include "pidatatransfer.h"
PIByteArray PIDataTransfer::buildPacket(Part fi) {
PIByteArray ba;
ba.resize(fi.size);
memcpy(ba.data(), data_.data(fi.start), fi.size);
return ba;
}
void PIDataTransfer::receivePart(Part fi, PIByteArray ba, PIByteArray pheader) {
if (data_.size() < fi.start + fi.size) data_.resize(fi.start + fi.size);
memcpy(data_.data(fi.start), ba.data(), ba.size_s());
}
bool PIDataTransfer::send(const PIByteArray& ba) {
data_ = ba;
buildSession(PIVector<Part>() << Part(0, data_.size()));
return send_process();
}

View File

@@ -0,0 +1,46 @@
/*! \file pidatatransfer.h
* \brief Class for send and receive PIByteArray via \a PIBaseTransfer
*/
/*
PIP - Platform Independent Primitives
Class for send and receive PIByteArray via PIBaseTransfer
Copyright (C) 2018 Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PIDATATRANSFER_H
#define PIDATATRANSFER_H
#include "pibasetransfer.h"
class PIP_EXPORT PIDataTransfer: public PIBaseTransfer
{
PIOBJECT_SUBCLASS(PIDataTransfer, PIBaseTransfer)
public:
PIDataTransfer() {;}
~PIDataTransfer() {;}
bool send(const PIByteArray &ba);
const PIByteArray & data() {return data_;}
private:
virtual PIByteArray buildPacket(Part p);
virtual void receivePart(PIBaseTransfer::Part fi, PIByteArray ba, PIByteArray pheader);
PIByteArray data_;
};
#endif // PIDATATRANSFER_H

View File

@@ -0,0 +1,245 @@
/*
PIP - Platform Independent Primitives
Speed and quality in/out diagnostics
Copyright (C) 2018 Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pidiagnostics.h"
/** \class PIDiagnostics
* \brief Connection quality diagnostics
* \details
* \section PIDiagnostics_sec0 Synopsis
* This class provide abstract connection quality diagnostics and
* counting. You should create instance of %PIDiagnostics and on
* packet receive call function \a received(), on packet send call
* function \a sended(). %PIDiagnostics calculates correct, wrong
* and sended counters, packets per second, bytes per seconds,
* immediate and integral receive frequencies and receive/send speeds
* in human readable representation. There statistics are calculates
* one time per period, by default 1 second. To calculate them you
* should start %PIDiagnostics with function \a start() or pass \b true
* to constructor.
* */
PIDiagnostics::State::State() {
immediate_freq = integral_freq = 0.f;
received_packets_per_sec = 0ull;
received_packets = 0ull;
received_packets_wrong = 0ull;
received_bytes_per_sec = 0ull;
received_bytes = 0ull;
received_bytes_wrong = 0ull;
sended_packets_per_sec = 0ull;
sended_packets = 0ull;
sended_bytes_per_sec = 0ull;
sended_bytes = 0ull;
receive_speed = send_speed = PIString::readableSize(0) + "/s";
quality = PIDiagnostics::Unknown;
}
PIDiagnostics::PIDiagnostics(bool start_): PITimer(PITimer::Pool) {
disconn_ = 0.;
setInterval(100);
reset();
setDisconnectTimeout(3.);
changeDisconnectTimeout(3.);
if (start_) start(100);
}
PIDiagnostics::State PIDiagnostics::state() const {
constLock();
State ret = cur_state;
constUnlock();
return ret;
}
PIString PIDiagnostics::receiveSpeed() const {
constLock();
PIString ret = cur_state.receive_speed;
constUnlock();
return ret;
}
PIString PIDiagnostics::sendSpeed() const {
constLock();
PIString ret = cur_state.send_speed;
constUnlock();
return ret;
}
void PIDiagnostics::reset() {
lock();
cur_state = State();
if (disconn_ != 0.) {
int hist_size = history_rec.size();
history_rec.clear();
history_send.clear();
history_rec.resize(hist_size);
history_send.resize(hist_size);
}
unlock();
}
void PIDiagnostics::received(int size, bool correct) {
lock();
Entry & e(history_rec.front());
if (correct) {
e.cnt_ok++;
e.bytes_ok += size;
cur_state.received_packets++;
cur_state.received_bytes += size;
} else {
e.cnt_fail++;
e.bytes_fail += size;
cur_state.received_packets_wrong++;
cur_state.received_bytes_wrong += size;
}
e.empty = false;
unlock();
}
void PIDiagnostics::sended(int size) {
lock();
Entry & e(history_send.front());
e.cnt_ok++;
e.bytes_ok += size;
cur_state.sended_packets++;
cur_state.sended_bytes += size;
e.empty = false;
unlock();
}
void PIDiagnostics::tick(void * , int ) {
lock();
int tcnt_recv = 0;
int tcnt_send = 0;
Entry send = calcHistory(history_send, tcnt_send);
Entry recv = calcHistory(history_rec, tcnt_recv);
float itr = disconn_ * (float(tcnt_recv) / history_rec.size());
float its = disconn_ * (float(tcnt_send) / history_send.size());
float hz = interval() / 1000.f;
if (tcnt_recv == 0) {
cur_state.integral_freq = cur_state.immediate_freq = 0;
cur_state.received_packets_per_sec = cur_state.received_bytes_per_sec = 0;
} else {
cur_state.integral_freq = recv.cnt_ok / itr;
cur_state.received_packets_per_sec = ullong(float(recv.cnt_ok) / itr);
cur_state.received_bytes_per_sec = ullong(double(recv.bytes_ok) / itr);
cur_state.immediate_freq = double(history_rec.front().cnt_ok) / hz;
}
if (tcnt_send == 0) {
cur_state.sended_packets_per_sec = cur_state.sended_bytes_per_sec = 0;
} else {
cur_state.sended_packets_per_sec = ullong(float(send.cnt_ok) / its);
cur_state.sended_bytes_per_sec = ullong(double(send.bytes_ok) / its);
}
// piCoutObj << "tick" << recv.cnt_ok << send.cnt_ok;
// speedRecv = PIString::readableSize(ullong(double(history_rec.front().bytes_ok) / hz)) + "/s";
// speedSend = PIString::readableSize(ullong(double(history_send.front().bytes_ok) / hz)) + "/s";
cur_state.receive_speed = PIString::readableSize(cur_state.received_bytes_per_sec) + "/s";
cur_state.send_speed = PIString::readableSize(cur_state.sended_bytes_per_sec) + "/s";
int arc = recv.cnt_ok + recv.cnt_fail;
float good_percents = 0.f;
if (arc > 0) good_percents = (float)recv.cnt_ok / arc * 100.f;
PIDiagnostics::Quality diag;
if (tcnt_recv == 0) {
diag = PIDiagnostics::Unknown;
} else {
if (good_percents == 0.f) diag = PIDiagnostics::Failure;
else if (good_percents <= 20.f) diag = PIDiagnostics::Bad;
else if (good_percents > 20.f && good_percents <= 80.f) diag = PIDiagnostics::Average;
else diag = PIDiagnostics::Good;
}
if ((tcnt_send + tcnt_recv) != 0) {
// piCoutObj << tcnt_recv << tcnt_send;
history_rec.dequeue();
history_send.dequeue();
Entry e;
e.empty = false;
history_rec.enqueue(e);
history_send.enqueue(e);
}
if (diag != cur_state.quality) {
qualityChanged(diag, cur_state.quality);
cur_state.quality = diag;
}
unlock();
}
PIDiagnostics::Entry PIDiagnostics::calcHistory(PIQueue<Entry> & hist, int & cnt) {
Entry e;
cnt = 0;
for (int i = 0; i < hist.size_s(); ++i) {
e.bytes_ok += hist[i].bytes_ok;
e.bytes_fail += hist[i].bytes_fail;
e.cnt_ok += hist[i].cnt_ok;
e.cnt_fail += hist[i].cnt_fail;
if (!hist[i].empty) cnt++;
}
e.empty = false;
// piCoutObj << hist.size() << cnt;
return e;
}
void PIDiagnostics::propertyChanged(const PIString &) {
float disct = property("disconnectTimeout").toFloat();
changeDisconnectTimeout(disct);
}
void PIDiagnostics::changeDisconnectTimeout(float disct) {
lock();
disconn_ = piMaxf(disct, interval() / 1000.f);
if (interval() > 0) {
int hist_size = piClampi(int(disconn_ * 1000.f / float(interval())), 1, 65536);
//piCoutObj << hist_size << interval();
history_rec.resize(hist_size);
history_send.resize(hist_size);
} else {
history_rec.resize(1);
history_send.resize(1);
}
//piCoutObj << hist_size << disconn_ << interval();
unlock();
}
void PIDiagnostics::constLock() const {
const_cast<PIDiagnostics*>(this)->lock();
}
void PIDiagnostics::constUnlock() const {
const_cast<PIDiagnostics*>(this)->unlock();
}

View File

@@ -0,0 +1,242 @@
/*! \file pidiagnostics.h
* \brief Connection quality diagnostics
*/
/*
PIP - Platform Independent Primitives
Speed and quality in/out diagnostics
Copyright (C) 2018 Ivan Pelipenko peri4ko@yandex.ru, Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PIDIAGNOSTICS_H
#define PIDIAGNOSTICS_H
#include "pitimer.h"
#include "piqueue.h"
class PIP_EXPORT PIDiagnostics: public PITimer
{
PIOBJECT_SUBCLASS(PIDiagnostics, PITimer)
friend class PIConnection;
public:
//! Constructs an empty diagnostics and if "start_" start it
PIDiagnostics(bool start_ = true);
virtual ~PIDiagnostics() {;}
//! Connection quality
enum Quality {
Unknown /** Unknown, no one packet received yet */ = 1,
Failure /** No connection, no one correct packet received for last period */ = 2,
Bad /** Bad connection, correct packets received <= 20% */ = 3,
Average /** Average connection, correct packets received > 20% and <= 80% */ = 4,
Good /** Good connection, correct packets received > 80% */ = 5
};
//! Information about current diagnostics state
struct State {
State();
float immediate_freq;
float integral_freq;
ullong received_packets_per_sec;
ullong received_packets;
ullong received_packets_wrong;
ullong received_bytes_per_sec;
ullong received_bytes;
ullong received_bytes_wrong;
ullong sended_packets_per_sec;
ullong sended_packets;
ullong sended_bytes_per_sec;
ullong sended_bytes;
PIString receive_speed;
PIString send_speed;
PIDiagnostics::Quality quality;
};
//! Returns current state
PIDiagnostics::State state() const;
//! Returns period of full disconnect in seconds and period of averaging frequency
float disconnectTimeout() const {return disconn_;}
//! Returns period of full disconnect in seconds and period of averaging frequency
void setDisconnectTimeout(float s) {setProperty("disconnectTimeout", s);}
//! Returns immediate receive frequency, packets/s
float immediateFrequency() const {return cur_state.immediate_freq;}
//! Returns integral receive frequency for \a disconnectTimeout() seconds, packets/s
float integralFrequency() const {return cur_state.integral_freq;}
//! Returns correct received packets per second
ullong receiveCountPerSec() const {return cur_state.received_packets_per_sec;}
//! Returns sended packets per second
ullong sendCountPerSec() const {return cur_state.sended_packets_per_sec;}
//! Returns correct received bytes per second
ullong receiveBytesPerSec() const {return cur_state.received_bytes_per_sec;}
//! Returns sended bytes per second
ullong sendBytesPerSec() const {return cur_state.sended_bytes_per_sec;}
//! Returns overall correct received bytes
ullong receiveBytes() const {return cur_state.received_bytes;}
//! Returns overall wrong received bytes
ullong wrongBytes() const {return cur_state.received_bytes_wrong;}
//! Returns overall sended bytes
ullong sendBytes() const {return cur_state.sended_bytes;}
//! Returns overall correct received packets count
ullong receiveCount() const {return cur_state.received_packets;}
//! Returns overall wrong received packets count
ullong wrongCount() const {return cur_state.received_packets_wrong;}
//! Returns overall sended packets count
ullong sendCount() const {return cur_state.sended_packets;}
//! Returns connection quality
PIDiagnostics::Quality quality() const {return cur_state.quality;}
//! Returns receive speed in format "n {B|kB|MB|GB|TB}/s"
PIString receiveSpeed() const;
//! Returns send speed in format "n {B|kB|MB|GB|TB}/s"
PIString sendSpeed() const;
//! Returns immediate receive frequency pointer, packets/s. Useful for output to PIConsole
const float * immediateFrequency_ptr() const {return &cur_state.immediate_freq;}
//! Returns integral receive frequency pointer for period, packets/s. Useful for output to PIConsole
const float * integralFrequency_ptr() const {return &cur_state.integral_freq;}
//! Returns correct received packets per second pointer. Useful for output to PIConsole
const ullong * receiveCountPerSec_ptr() const {return &cur_state.received_packets_per_sec;}
//! Returns sended packets per second pointer. Useful for output to PIConsole
const ullong * sendCountPerSec_ptr() const {return &cur_state.sended_packets_per_sec;}
//! Returns correct received bytes per second pointer. Useful for output to PIConsole
const ullong * receiveBytesPerSec_ptr() const {return &cur_state.received_bytes_per_sec;}
//! Returns sended bytes per second pointer. Useful for output to PIConsole
const ullong * sendBytesPerSec_ptr() const {return &cur_state.sended_bytes_per_sec;}
//! Returns overall correct received bytes pointer. Useful for output to PIConsole
const ullong * receiveBytes_ptr() const {return &cur_state.received_bytes;}
//! Returns overall wrong received bytes pointer. Useful for output to PIConsole
const ullong * wrongBytes_ptr() const {return &cur_state.received_bytes_wrong;}
//! Returns overall sended bytes pointer. Useful for output to PIConsole
const ullong * sendBytes_ptr() const {return &cur_state.sended_bytes;}
//! Returns overall correct received packets count pointer. Useful for output to PIConsole
const ullong * receiveCount_ptr() const {return &cur_state.received_packets;}
//! Returns overall wrong received packets count pointer. Useful for output to PIConsole
const ullong * wrongCount_ptr() const {return &cur_state.received_packets_wrong;}
//! Returns overall sended packets count pointer. Useful for output to PIConsole
const ullong * sendCount_ptr() const {return &cur_state.sended_packets;}
//! Returns connection quality pointer. Useful for output to PIConsole
const int * quality_ptr() const {return (int * )&cur_state.quality;}
//! Returns receive speed pointer in format "n {B|kB|MB|GB|TB}/s". Useful for output to PIConsole
const PIString * receiveSpeed_ptr() const {return &cur_state.receive_speed;}
//! Returns send speed pointer in format "n {B|kB|MB|GB|TB}/s". Useful for output to PIConsole
const PIString * sendSpeed_ptr() const {return &cur_state.send_speed;}
EVENT_HANDLER0(void, start) {start(100.); changeDisconnectTimeout(disconn_);}
EVENT_HANDLER1(void, start, double, msecs) {if (msecs > 0.) {PITimer::start(msecs); changeDisconnectTimeout(disconn_);}}
EVENT_HANDLER0(void, stop) {PITimer::stop();}
EVENT_HANDLER0(void, reset);
EVENT_HANDLER1(void, received, int, size) {received(size, true);}
EVENT_HANDLER2(void, received, int, size, bool, correct);
EVENT_HANDLER1(void, sended, int, size);
EVENT2(qualityChanged, PIDiagnostics::Quality, new_quality, PIDiagnostics::Quality, old_quality)
//! \handlers
//! \{
//! \fn void start(double msecs = 1000.)
//! \brief Start diagnostics evaluations with period "msecs" milliseconds
//! \fn void reset()
//! \brief Reset diagnostics counters
//! \fn void received(int size, bool correct = true)
//! \brief Notify diagnostics about "correct" corected received packet
//! \fn void sended(int size)
//! \brief Notify diagnostics about sended packet
//! \}
//! \events
//! \{
//! \fn void qualityChanged(PIDiagnostics::Quality new_quality, PIDiagnostics::Quality old_quality)
//! \brief Raise on change receive quality from "old_quality" to "new_quality"
//! \}
private:
struct Entry {
Entry() {bytes_ok = bytes_fail = 0; cnt_ok = cnt_fail = 0; empty = true;}
ullong bytes_ok;
ullong bytes_fail;
uint cnt_ok;
uint cnt_fail;
bool empty;
};
friend bool operator ==(const PIDiagnostics::Entry & f, const PIDiagnostics::Entry & s);
friend bool operator !=(const PIDiagnostics::Entry & f, const PIDiagnostics::Entry & s);
friend bool operator <(const PIDiagnostics::Entry & f, const PIDiagnostics::Entry & s);
void tick(void *, int);
Entry calcHistory(PIQueue<Entry> & hist, int & cnt);
void propertyChanged(const PIString &);
void changeDisconnectTimeout(float disct);
void constLock() const;
void constUnlock() const;
PIQueue<Entry> history_rec, history_send;
float disconn_;
State cur_state;
};
inline bool operator ==(const PIDiagnostics::Entry & f, const PIDiagnostics::Entry & s) {
return f.bytes_ok == s.bytes_ok &&
f.bytes_fail == s.bytes_fail &&
f.cnt_ok == s.cnt_ok &&
f.cnt_fail == s.cnt_fail &&
f.empty == s.empty;
}
inline bool operator !=(const PIDiagnostics::Entry & f, const PIDiagnostics::Entry & s) {return !(f == s);}
inline bool operator <(const PIDiagnostics::Entry & f, const PIDiagnostics::Entry & s) {return f.bytes_ok < s.bytes_ok;}
#endif // PIDIAGNOSTICS_H

View File

@@ -0,0 +1,322 @@
#include "pifiletransfer.h"
const char PIFileTransfer::sign[] = {'P', 'F', 'T'};
PIFileTransfer::PIFileTransfer() {
for (uint i = 0; i < sizeof(sign); i++)
pftheader.sig[i] = sign[i];
pftheader.version = __PIFILETRANSFER_VERSION;
pftheader.session_id = 0;
pftheader.step = pft_None;
dir = PIDir::current();
started_ = scanning = false;
bytes_file_all = bytes_file_cur = 0;
// CONNECT(void, this, sendStarted, this, send_started);
CONNECT(void, this, receiveStarted, this, receive_started);
CONNECT1(void, bool, this, sendFinished, this, send_finished);
CONNECT1(void, bool, this, receiveFinished, this, receive_finished);
}
PIFileTransfer::~PIFileTransfer() {
stop();
work_file.close();
}
bool PIFileTransfer::send(const PIFile& file) {
return send(file.path());
}
bool PIFileTransfer::send(const PIString & file) {
return send(PIVector<PIFile::FileInfo>() << PIFile::fileInfo(file));
}
bool PIFileTransfer::send(const PIStringList& files) {
PIVector<PIFile::FileInfo> fil;
piForeachC(PIString &file, files) fil << PIFile::fileInfo(file);
return send(fil);
}
bool PIFileTransfer::send(PIVector<PIFile::FileInfo> entries) {
started_ = true;
PIVector<PFTFileInfo> allEntries;
for (int i = 0; i < entries.size_s(); i++) {
allEntries << PFTFileInfo(entries[i]);
allEntries.back().dest_path = entries[i].name();
if (entries[i].isDir()) {
cur_file_string = "scanning ... ";
scan_dir.setDir(entries[i].path);
scanning = true;
PIVector<PIFile::FileInfo> fls = scan_dir.allEntries();
scanning = false;
scan_dir.up();
//piCout << "files rel to" << d.absolutePath();
for (uint j = 0; j < fls.size(); j++) {
allEntries << PFTFileInfo(fls[j]);
allEntries.back().dest_path = scan_dir.relative(fls[j].path);
//piCout << " abs path" << fls[j].path;
//piCout << " rel path" << allEntries.back().dest_path;
}
}
}
return sendFiles(allEntries);
}
bool PIFileTransfer::sendFiles(const PIVector<PFTFileInfo> &files) {
files_ = files;
PIStringList names;
//piCoutObj << "prepare to send" << files_.size() << "files";
for (int i = 0; i < files_.size_s(); i++) {
//files_[i].path = dir.relative(files_[i].path);
if (names.contains(files_[i].path)) {
files_.remove(i);
--i;
}
else names << files_[i].path;
if (files_[i].isDir())
files_[i].size = 0;
//piCout << "prepare" << i << files_[i].path << files_[i].dest_path << files_[i].name();
}
randomize();
step_mutex.lock();
pftheader.session_id = randomi();
sendFilesStarted();
cur_file_string = "build session";
desc.clear();
desc << files_;
pftheader.step = pft_Description;
buildSession(PIVector<Part>() << Part(0, desc.size()));
cur_file_string = "";
step_mutex.unlock();
if (!send_process()) {
sendFilesFinished(false);
started_ = false;
return false;
}
step_mutex.lock();
pftheader.step = pft_Data;
PIVector<Part> pts;
for (int i = 0; i < files_.size_s(); i++) {
pts << Part(i + 1, files_[i].size);
}
buildSession(pts);
step_mutex.unlock();
bool ok = send_process();
sendFilesFinished(ok);
started_ = false;
return ok;
}
void PIFileTransfer::processFile(int id, ullong start, PIByteArray & data) {
//piCout << "processFile" << id << files_.size();
PFTFileInfo fi = files_[id-1];
bytes_file_all = fi.size;
bytes_file_cur = start;
cur_file_string = fi.dest_path;
PIString path = dir.absolutePath() + dir.separator + fi.dest_path;
//piCout << "receive" << path << fi.size << data.size();
if (fi.isDir()) {
//piCoutObj << "make dir" << fi.entry.path;
if (!PIDir::make(path)) {
cur_file_string = "Error: While create directory \"" + path + "\"";
piCoutObj << cur_file_string;
stopReceive();
return;
}
}
if (fi.isFile()) {
if (work_file.path() != path || !work_file.isOpened()) {
//piCout << "file close";
work_file.close();
//piCout << "new file" << path << work_file.path() << work_file.isOpened();
if (!work_file.open(path, PIIODevice::ReadWrite)) {
cur_file_string = "Error: While open file \"" + path + "\"";
piCoutObj << cur_file_string;
stopReceive();
return;
}
PIFile::applyFileInfo(path, fi);
if (work_file.size() > fi.size) {
//piCoutObj << "error size" << work_file.size() << fi.size;
work_file.clear();
work_file.resize(fi.size);
//piCoutObj << "correct size" << work_file.size() << fi.size;
}
}
//piCoutObj << "write file" << path << work_file.path() << work_file.size() << fi.entry.size << work_file.pos() << fi.fstart << fi.fsize;
if (work_file.size() < (llong)start) {
//piCoutObj << "error pos size" << work_file.pos() << fi.fstart;
work_file.resize(start);
//piCoutObj << "correct pos size" << work_file.pos() << fi.fstart;
}
if (work_file.size() > fi.size) {
piCoutObj << "****** error size" << work_file.size() << fi.size;
work_file.clear();
work_file.resize(fi.size);
piCoutObj << "****** correct size" << work_file.size() << fi.size;
}
//if (fi.fstart != work_file.pos()) piCoutObj << "error pos" << work_file.pos() << fi.fstart;
work_file.seek(start);
int rs = work_file.write(data.data(), data.size());
if (rs != data.size_s()) {
cur_file_string = "Error: While write file \"" + fi.path + "\" (must " + PIString::fromNumber(data.size()) + ", but write " + PIString::fromNumber(rs) + ")";
piCoutObj << cur_file_string;
stopReceive();
return;
}
}
}
PIByteArray PIFileTransfer::buildPacket(Part p) {
//piCoutObj << "Step" << pftheader.step;
//piCoutObj << "session id" << pftheader.session_id;
PIByteArray ba;
switch(pftheader.step) {
case pft_None:
stopSend();
return PIByteArray();
case pft_Description:
ba.resize(p.size);
memcpy(ba.data(), desc.data(p.start), p.size);
return ba;
case pft_Data:
//piCout << "send data" << p.id << files_.size();
PIFile::FileInfo fi = files_[p.id-1];
if (fi.isFile()) {
//piCout << "send file" << fi.name() << fi.size;
PIString path = fi.path;
if (work_file.path() != path || !work_file.isOpened()) {
if (!work_file.open(path, PIIODevice::ReadOnly)) {
break_ = true;
cur_file_string = "Error: While open file \"" + fi.path + "\"";
piCoutObj << cur_file_string;
stopSend();
return PIByteArray();
}
}
work_file.seek(p.start);
ba.resize(p.size);
int rs = work_file.read(ba.data(), ba.size());
if ((llong)rs != (llong)p.size) {
break_ = true;
cur_file_string = "Error: While read file \"" + fi.path + "\" (must " + PIString::fromNumber(p.size) + ", but read " + PIString::fromNumber(rs) + ")";
piCoutObj << cur_file_string;
stopSend();
return PIByteArray();
}
}
//if (fi.isDir()) {
// piCout << "create dir" << fi.path;
// dir.make(fi.path);
//}
cur_file_string = fi.path;
bytes_file_all = fi.size;
bytes_file_cur = p.start;
return ba;
}
return ba;
}
void PIFileTransfer::receivePart(PIBaseTransfer::Part fi, PIByteArray ba, PIByteArray pheader) {
PFTHeader h;
// piCout << pheader.size() << sizeof(PFTHeader);
pheader >> h;
// piCout << h.session_id;
PIMutexLocker ml(step_mutex);
StepType st = (StepType)h.step;
pftheader.step = st;
if (!h.check_sig()) {
cur_file_string = "Error: Check signature: File Transfer incompatable or invalid version!";
piCoutObj << cur_file_string;
stopReceive();
return;
}
switch(st) {
case pft_None:
break;
case pft_Description:
pftheader.session_id = h.session_id;
if (desc.size() < fi.start + fi.size) desc.resize(fi.start + fi.size);
memcpy(desc.data(fi.start), ba.data(), ba.size_s());
break;
case pft_Data:
if (h.session_id == pftheader.session_id)
processFile(fi.id, fi.start, ba);
else stopReceive();
break;
default:
break;
}
}
PIString PIFileTransfer::curFile() const {
PIString s = cur_file_string;
if (scanning)
return s + scan_dir.scanDir();
return s;
}
PIByteArray PIFileTransfer::customHeader() {
PIByteArray ba;
ba << pftheader;
return ba;
}
void PIFileTransfer::receive_started() {
if (pftheader.step == pft_None) {
files_.clear();
// piCoutObj << "start receive"
started_ = true;
receiveFilesStarted();
}
}
void PIFileTransfer::receive_finished(bool ok) {
step_mutex.lock();
StepType st = (StepType)pftheader.step;
step_mutex.unlock();
if (st == pft_Description) {
bool user_ok = true;
if (ok) {
// piCoutObj << desc.size() << PICoutManipulators::Hex << desc;
desc >> files_;
// piCoutObj << files_;
PIStringList files;
piForeachC(PFTFileInfo &fi, files_) files << fi.dest_path;
pause();
receiveFilesRequest(files, bytesAll(), &user_ok);
}
if (!ok || !user_ok) {
pftheader.session_id--;
piCoutObj << "receive aborted";
receiveFilesFinished(false);
started_ = false;
} else resume();
}
if (st == pft_Data) {
//piCoutObj << "file close";
work_file.close();
receiveFilesFinished(ok);
started_ = false;
}
}
void PIFileTransfer::send_finished(bool ok) {
// piCoutObj << "file close";
if (pftheader.step == pft_Data) {
work_file.close();
}
}

View File

@@ -0,0 +1,129 @@
/*! \file pifiletransfer.h
* \brief Class for send and receive files and directories via \a PIBaseTransfer
*/
/*
PIP - Platform Independent Primitives
Class for send and receive files and directories via PIBaseTransfer
Copyright (C) 2018 Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PIFILETRANSFER_H
#define PIFILETRANSFER_H
#include "pidir.h"
#include "pibasetransfer.h"
#define __PIFILETRANSFER_VERSION 2
class PIP_EXPORT PIFileTransfer: public PIBaseTransfer
{
PIOBJECT_SUBCLASS(PIFileTransfer, PIBaseTransfer)
public:
PIFileTransfer();
~PIFileTransfer();
enum PIP_EXPORT StepType {pft_None, pft_Description, pft_Data};
struct PIP_EXPORT PFTFileInfo: public PIFile::FileInfo {
PFTFileInfo(const PIFile::FileInfo &fi = PIFile::FileInfo()): PIFile::FileInfo(fi) {}
PIString dest_path;
};
#pragma pack(push,1)
struct PIP_EXPORT PFTHeader {
union {
struct {
char sig[3]; // PFT
uchar version;
};
uint raw_sig;
};
int step; // PacketType
int session_id;
bool check_sig() {
if (sig[0] != sign[0] || sig[1] != sign[1] || sig[2] != sign[2] || version != __PIFILETRANSFER_VERSION) return false;
return true;
}
};
#pragma pack(pop)
bool send(const PIFile & file);
bool send(const PIString & file);
bool send(const PIStringList &files);
bool send(PIFile::FileInfo entry) {return send(PIVector<PIFile::FileInfo>() << entry);}
bool send(PIVector<PIFile::FileInfo> entries);
void setDirectory(const PIDir &d) {dir = d;}
void setDirectory(const PIString &path) {dir.setDir(path);}
PIDir directory() const {return dir;}
bool isStarted() const {return started_;}
PIString curFile() const;
llong bytesFileAll() const {return bytes_file_all;}
llong bytesFileCur() const {return bytes_file_cur;}
const PIString * curFile_ptr() const {return &cur_file_string;}
const llong * bytesFileAll_ptr() const {return &bytes_file_all;}
const llong * bytesFileCur_ptr() const {return &bytes_file_cur;}
EVENT(receiveFilesStarted)
EVENT1(receiveFilesFinished, bool, ok)
EVENT(sendFilesStarted)
EVENT1(sendFilesFinished, bool, ok)
EVENT3(receiveFilesRequest, PIStringList, files, llong, total_bytes, bool *, ok)
private:
static const char sign[];
PIVector<PFTFileInfo> files_;
PIString cur_file_string;
llong bytes_file_all, bytes_file_cur;
PFTHeader pftheader;
PIDir dir;
PIFile work_file;
PIByteArray desc;
PIDir scan_dir;
bool started_, scanning;
PIMutex step_mutex;
bool sendFiles(const PIVector<PFTFileInfo> &files);
void processFile(int id, ullong start, PIByteArray &data);
virtual void receivePart(Part fi, PIByteArray ba, PIByteArray pheader);
virtual PIByteArray buildPacket(Part fi);
virtual PIByteArray customHeader();
// EVENT_HANDLER(void, send_started);
EVENT_HANDLER(void, receive_started);
EVENT_HANDLER1(void, send_finished, bool, ok);
EVENT_HANDLER1(void, receive_finished, bool, ok);
};
inline PIByteArray & operator <<(PIByteArray & s, const PIFileTransfer::PFTHeader & v) {s << v.raw_sig << v.step << v.session_id; return s;}
inline PIByteArray & operator >>(PIByteArray & s, PIFileTransfer::PFTHeader & v) {s >> v.raw_sig >> v.step >> v.session_id; return s;}
inline PIByteArray & operator <<(PIByteArray & s, const PIFileTransfer::PFTFileInfo & v) {s << v.dest_path << v.size << v.time_access << v.time_modification <<
v.flags << v.id_user << v.id_group << v.perm_user.raw << v.perm_group.raw << v.perm_other.raw; return s;}
inline PIByteArray & operator >>(PIByteArray & s, PIFileTransfer::PFTFileInfo & v) {s >> v.dest_path >> v.size >> v.time_access >> v.time_modification >>
v.flags >> v.id_user >> v.id_group >> v.perm_user.raw >> v.perm_group.raw >> v.perm_other.raw; return s;}
inline PICout operator <<(PICout s, const PIFileTransfer::PFTFileInfo & v) {
s.setControl(0, true);
s << "FileInfo(\"" << v.dest_path << "\", " << PIString::readableSize(v.size) << ", "
<< v.perm_user.toString() << " " << v.perm_group.toString() << " " << v.perm_other.toString() << ", "
<< v.time_access.toString() << ", " << v.time_modification.toString()
<< ", 0x" << PICoutManipulators::Hex << v.flags << ")";
s.restoreControl();
return s;
}
#endif // PIFILETRANSFER_H

View File

@@ -0,0 +1,30 @@
/*
PIP - Platform Independent Primitives
Module includes
Copyright (C) 2018 Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PIIOUTILSMODULE_H
#define PIIOUTILSMODULE_H
#include "pibasetransfer.h"
#include "piconnection.h"
#include "pidatatransfer.h"
#include "pidiagnostics.h"
#include "pifiletransfer.h"
#include "pipacketextractor.h"
#endif // PIIOUTILSMODULE_H

View File

@@ -0,0 +1,308 @@
/*
PIP - Platform Independent Primitives
Packets extractor
Copyright (C) 2018 Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pipacketextractor.h"
/** \class PIPacketExtractor
* \brief Packets extractor
* \details
* \section PIPacketExtractor_main Synopsis
* This class implements packet recognition by various algorithms and custom
* validating from data stream. Stream is formed from child %PIIODevice
* passed from contructor or with function \a setDevice().
*
* \section PIPacketExtractor_work Principle of work
* %PIPacketExtractor works with child %PIIODevice. \a read and \a write
* functions directly call child device functions. You should start threaded
* read of \b extractor (not child device) to proper work. Extractor read data
* from child device, try to detect packet from readed data and raise
* \a packetReceived() event on success.
*
* \section PIPacketExtractor_algorithms Algorithms
* There are 6 algorithms: \n
* * PIPacketExtractor::None \n
* Packet is successfully received on every read without any validation. \n \n
* * PIPacketExtractor::Header \n
* Wait for at least \a header() bytes + \a payloadSize(), then validate
* header with virtual function \a validateHeader() and if it fail, shifts
* for next 1 byte. If header is successfully validated check payload with
* function \a validatePayload() and if it fail, shifts for next 1 byte. If
* all validations were successful raise \a packetReceived() event. \n \n
* * PIPacketExtractor::Footer \n
* This algorithm similar to previous, but instead of \a header() first validate
* \a footer() at after \a payloadSize() bytes with function \a validateFooter(). \n \n
* * PIPacketExtractor::HeaderAndFooter \n
* Wait for at least \a header() bytes + \a footer() bytes, then validate
* header with virtual function \a validateHeader() and if it fail, shifts
* for next 1 byte. If header is successfully validated check footer with
* function \a validateFooter() and if it fail, shifts footer position for
* next 1 byte. Then validate payload and if it fail, search header again,
* starts from next byte of previous header. If all validations were successful
* raise \a packetReceived() event. \n \n
* * PIPacketExtractor::Size \n
* Wait for at least \a packetSize() bytes, then validate packet with function
* \a validatePayload() and if it fail, shifts for next 1 byte. If validating
* was successfull raise \a packetReceived() event. \n \n
* * PIPacketExtractor::Timeout \n
* Wait for first read, then read for \a timeout() milliseconds and raise
* \a packetReceived() event. \n
*
* \section PIPacketExtractor_control Control validating
* There are three parameters:
* * header content
* * header size
* * payload size
*
* Extractor can detect packet with compare your header with readed data.
* It is default implementation of function \a packetHeaderValidate().
* If header validating passed, function \a packetValidate() will be called.
* If either of this function return \b false extractor shifts by one byte
* and takes next header. If both functions returns \b true extractor shifts
* by whole packet size.
* \image html packet_detection.png
*
* */
REGISTER_DEVICE(PIPacketExtractor)
PIPacketExtractor::PIPacketExtractor(PIIODevice * device_, PIPacketExtractor::SplitMode mode) {
construct();
setDevice(device_);
setSplitMode(mode);
}
void PIPacketExtractor::construct() {
ret_func_header = ret_func_footer = 0;
setPayloadSize(0);
setTimeout(100);
setBufferSize(65536);
setDevice(0);
setPacketSize(0);
setSplitMode(None);
missed = missed_packets = footerInd = 0;
header_found = false;
}
void PIPacketExtractor::propertyChanged(const PIString &) {
packetSize_ = property("packetSize").toInt();
mode_ = (SplitMode)(property("splitMode").toInt());
dataSize = property("payloadSize").toInt();
src_header = property("header").toByteArray();
src_footer = property("footer").toByteArray();
packetSize_hf = src_header.size_s() + src_footer.size_s() + payloadSize();
}
void PIPacketExtractor::setDevice(PIIODevice * device_) {
dev = device_;
if (dev == 0) return;
}
void PIPacketExtractor::setBufferSize(int new_size) {
buffer_size = new_size;
buffer.resize(buffer_size);
memset(buffer.data(), 0, buffer.size());
setThreadedReadBufferSize(new_size);
}
void PIPacketExtractor::setPayloadSize(int size) {
setProperty("payloadSize", size);
dataSize = size;
packetSize_hf = src_header.size_s() + src_footer.size_s() + payloadSize();
}
void PIPacketExtractor::setHeader(const PIByteArray & data) {
setProperty("header", data);
src_header = data;
packetSize_hf = src_header.size_s() + src_footer.size_s() + payloadSize();
}
void PIPacketExtractor::setFooter(const PIByteArray & data) {
setProperty("footer", data);
src_footer = data;
packetSize_hf = src_header.size_s() + src_footer.size_s() + payloadSize();
}
bool PIPacketExtractor::threadedRead(uchar * readed, int size_) {
//piCoutObj << "readed" << size_;
int ss;
switch (mode_) {
case PIPacketExtractor::None:
if (validatePayload(readed, size_))
packetReceived(readed, size_);
break;
case PIPacketExtractor::Header:
tmpbuf.append(readed, size_);
ss = src_header.size_s() + dataSize;
while (tmpbuf.size_s() >= ss) {
while (!validateHeader(src_header.data(), tmpbuf.data(), src_header.size_s())) {
tmpbuf.pop_front();
++missed;
if (tmpbuf.size_s() < ss) return true;
}
while (!validatePayload(tmpbuf.data(src_header.size_s()), dataSize)) {
tmpbuf.pop_front();
++missed;
if (tmpbuf.size_s() < ss) return true;
}
packetReceived(tmpbuf.data(), ss);
tmpbuf.remove(0, ss);
}
break;
case PIPacketExtractor::Footer:
/*memcpy(buffer.data(allReaded), readed, size_);
allReaded += size_;
footer_ = (mode_ == PIPacketExtractor::Footer);
while (allReaded >= packetSize_hf + addSize && allReaded > 0) {
if (!src_header.isEmpty()) {
if (allReaded + curInd >= buffer_size) {
memcpy(sbuffer.data(), buffer.data(), buffer_size);
memcpy(buffer.data(), sbuffer.data(buffer_size - packetSize_hf), allReaded);
allReaded = packetSize_hf;
addSize = curInd = 0;
}
bool brk = false;
while (!validateHeader((uchar * )(footer_ ? src_footer.data() : src_header.data()), buffer.data(curInd + (footer_ ? dataSize : 0)), footer_ ? src_footer.size_s() : src_header.size_s())) {
++curInd; ++missed;
if (packetSize_hf > 0) missed_packets = missed / packetSize_hf;
if (curInd > addSize) {
addSize += packetSize_hf;
brk = true;
break;
}
}
if (brk) continue;
//memcpy(mheader.data(), buffer.data(curInd + (footer_ ? dataSize : 0)), src_header.size_s());
if (!src_header.isEmpty()) memcpy(src_header.data(), buffer.data(curInd), src_header.size_s());
if (!validatePayload(buffer.data(curInd + src_header.size_s()), dataSize)) {
++curInd; ++missed;
if (packetSize_hf > 0) missed_packets = missed / packetSize_hf;
continue;
}
packetReceived(buffer.data(curInd), packetSize_hf);
memcpy(sbuffer.data(), buffer.data(), allReaded);
memcpy(buffer.data(), sbuffer.data(packetSize_hf + curInd), allReaded);
allReaded -= packetSize_hf + curInd;
curInd = addSize = 0;
} else {
if (dataSize == 0) {
if (validatePayload(buffer.data(), size_))
packetReceived(buffer.data(), size_);
memcpy(sbuffer.data(), buffer.data(), allReaded);
memcpy(buffer.data(), sbuffer.data(size_), allReaded);
allReaded -= size_;
} else {
if (validatePayload(buffer.data(), dataSize))
packetReceived(buffer.data(), dataSize);
memcpy(sbuffer.data(), buffer.data(), allReaded);
memcpy(buffer.data(), sbuffer.data(packetSize_hf), allReaded);
allReaded -= packetSize_hf;
}
}
}*/
tmpbuf.append(readed, size_);
ss = src_footer.size_s() + dataSize;
while (tmpbuf.size_s() >= ss) {
while (!validateFooter(src_footer.data(), tmpbuf.data(dataSize), src_footer.size_s())) {
tmpbuf.pop_front();
++missed;
if (tmpbuf.size_s() < ss) return true;
}
while (!validatePayload(tmpbuf.data(), dataSize)) {
tmpbuf.pop_front();
++missed;
if (tmpbuf.size_s() < ss) return true;
}
packetReceived(tmpbuf.data(), ss);
tmpbuf.remove(0, ss);
}
break;
case PIPacketExtractor::HeaderAndFooter:
tmpbuf.append(readed, size_);
ss = src_header.size_s() + src_footer.size_s();
while (tmpbuf.size_s() >= ss) {
if (!header_found) {
if (tmpbuf.size_s() < ss) return true;
while (!validateHeader(src_header.data(), tmpbuf.data(), src_header.size_s())) {
tmpbuf.pop_front();
++missed;
if (tmpbuf.size_s() < ss) return true;
}
header_found = true;
footerInd = src_header.size_s();
} else {
if (tmpbuf.size_s() < footerInd + src_footer.size_s()) return true;
while (!validateFooter(src_footer.data(), tmpbuf.data(footerInd), src_footer.size_s())) {
++footerInd;
if (tmpbuf.size_s() < footerInd + src_footer.size_s()) return true;
}
//piCout << "footer found at" << footerInd;
header_found = false;
if (!validatePayload(tmpbuf.data(src_header.size_s()), footerInd - src_header.size_s())) {
tmpbuf.pop_front();
++missed;
continue;
}
packetReceived(tmpbuf.data(), footerInd + src_footer.size_s());
tmpbuf.remove(0, footerInd + src_footer.size_s());
footerInd = src_header.size_s();
}
}
break;
case PIPacketExtractor::Size:
tmpbuf.append(readed, size_);
if (packetSize_ <= 0) {
tmpbuf.clear();
return true;
}
while (tmpbuf.size_s() >= packetSize_) {
if (!validatePayload(tmpbuf.data(), packetSize_)) {
tmpbuf.pop_front();
++missed;
missed_packets = missed / packetSize_;
continue;
}
packetReceived(tmpbuf.data(), packetSize_);
tmpbuf.remove(0, packetSize_);
}
break;
case PIPacketExtractor::Timeout:
memcpy(buffer.data(), readed, size_);
trbuf = dev->readForTime(time_);
memcpy(buffer.data(size_), trbuf.data(), trbuf.size());
if (size_ + trbuf.size() > 0)
packetReceived(buffer.data(), size_ + trbuf.size());
break;
};
return true;
}
PIString PIPacketExtractor::constructFullPathDevice() const {
return "";
}

View File

@@ -0,0 +1,177 @@
/*! \file pipacketextractor.h
* \brief Packets extractor
*/
/*
PIP - Platform Independent Primitives
Packets extractor
Copyright (C) 2018 Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PIPACKETEXTRACTOR_H
#define PIPACKETEXTRACTOR_H
#include "piiodevice.h"
// Pass data, recHeaderPtr, received_data, recHeaderSize. Return true if packet is correct nor return false.
typedef bool (*PacketExtractorCheckFunc)(void * , uchar * , uchar * , int );
class PIP_EXPORT PIPacketExtractor: public PIIODevice
{
PIIODEVICE(PIPacketExtractor)
friend class PIConnection;
public:
//! Extract algorithms
enum SplitMode {
None /** No data processing */ ,
Header /** Detect packets with \a header() and following \a payloadSize() */ ,
Footer /** Detect packets with \a footer() and leading \a payloadSize() */ ,
HeaderAndFooter /** Detect packets with \a header() and \a footer() without \a payloadSize() */ ,
Size /** Detect packets with \a packetSize() */ ,
Timeout /** Wait for first read, then read for \a timeout() milliseconds */
};
//! Contructs extractor with child device "device_" and extract algorithm "mode"
explicit PIPacketExtractor(PIIODevice * device_ = 0, SplitMode mode = None);
virtual ~PIPacketExtractor() {stop();}
//! Returns child %device
PIIODevice * device() {return dev;}
//! Set child %device to "device_"
void setDevice(PIIODevice * device_);
//! Returns buffer size
int bufferSize() const {return buffer_size;}
//! Set buffer size to "new_size" bytes, should be at least greater than whole packet size
void setBufferSize(int new_size);
void setHeaderCheckSlot(PacketExtractorCheckFunc f) {ret_func_header = f;}
void setFooterCheckSlot(PacketExtractorCheckFunc f) {ret_func_footer = f;}
void setPayloadCheckSlot(ReadRetFunc f) {ret_func_ = f;}
//! Set extract algorithm
void setSplitMode(SplitMode mode) {setProperty("splitMode", int(mode)); mode_ = mode;}
//! Set payload size, used for PIPacketExtractor::Header and PIPacketExtractor::Footer algorithms
void setPayloadSize(int size);
//! Set header data, used for PIPacketExtractor::Header and PIPacketExtractor::HeaderAndFooter algorithms
void setHeader(const PIByteArray & data);
//! Set footer data, used for PIPacketExtractor::Footer and PIPacketExtractor::HeaderAndFooter algorithms
void setFooter(const PIByteArray & data);
//! Set packet size, used for PIPacketExtractor::Size algorithm
void setPacketSize(int size) {setProperty("packetSize", size); packetSize_ = size;}
//! Set timeout in milliseconds, used for PIPacketExtractor::Timeout algorithm
void setTimeout(double msecs) {setProperty("timeout", msecs); time_ = msecs;}
//! Returns current extract algorithm
SplitMode splitMode() const {return (SplitMode)(property("splitMode").toInt());}
//! Returns current payload size, used for PIPacketExtractor::Header and PIPacketExtractor::Footer algorithms
int payloadSize() const {return property("payloadSize").toInt();}
//! Returns current header data, used for PIPacketExtractor::Header and PIPacketExtractor::HeaderAndFooter algorithms
PIByteArray header() const {return src_header;}
//! Returns current footer data, used for PIPacketExtractor::Footer and PIPacketExtractor::HeaderAndFooter algorithms
PIByteArray footer() const {return src_footer;}
//! Returns current packet size, used for PIPacketExtractor::Size algorithm
int packetSize() const {return property("packetSize").toInt();}
//! Returns current timeout in milliseconds, used for PIPacketExtractor::Timeout algorithm
double timeout() const {return property("timeout").toDouble();}
//! Returns missed by validating functions bytes count
ullong missedBytes() const {return missed;}
// //! Returns missed by validating functions packets count, = missedBytes() / packetSize
ullong missedPackets() const {/*if (packetSize_hf == 0) return missed; return missed / packetSize_hf*/; return missed_packets;}
//! Returns pointer to \a missedBytes() count. Useful for output to PIConsole
const ullong * missedBytes_ptr() const {return &missed;}
// //! Returns pointer to \a missedPackets() count. Useful for output to PIConsole
const ullong * missedPackets_ptr() const {return &missed_packets;}
EVENT2(packetReceived, uchar * , data, int, size)
//! \events
//! \{
//! \fn void packetReceived(uchar * data, int size)
//! \brief Raise on successfull \a packetValidate() function
//! \}
protected:
/** \brief Function to validate header
* \param src Your header content
* \param rec Received header
* \param size Header size
* \details Default implementation returns by-byte "src" with "rec" compare result */
virtual bool validateHeader(uchar * src, uchar * rec, int size) {if (ret_func_header != 0) return ret_func_header(ret_data_, src, rec, size); for (int i = 0; i < size; ++i) if (src[i] != rec[i]) return false; return true;}
/** \brief Function to validate footer
* \param src Your footer content
* \param rec Received footer
* \param size Footer size
* \details Default implementation returns by-byte "src" with "rec" compare result */
virtual bool validateFooter(uchar * src, uchar * rec, int size) {if (ret_func_footer != 0) return ret_func_footer(ret_data_, src, rec, size); for (int i = 0; i < size; ++i) if (src[i] != rec[i]) return false; return true;}
/** \brief Function to validate payload
* \param rec Received payload
* \param size payload size
* \details Default implementation returns \b true */
virtual bool validatePayload(uchar * rec, int size) {if (ret_func_ != 0) return ret_func_(ret_data_, rec, size); return true;}
private:
void construct();
void propertyChanged(const PIString & );
int readDevice(void * read_to, int max_size) {if (dev == 0) return -1; return dev->read(read_to, max_size);}
int writeDevice(const void * data, int max_size) {if (dev == 0) return -1; return dev->write(data, max_size);}
bool threadedRead(uchar * readed, int size);
PIString fullPathPrefix() const {return PIStringAscii("pckext");}
PIString constructFullPathDevice() const;
bool openDevice() {if (dev == 0) return false; return dev->open();}
bool closeDevice() {if (dev == 0) return false; return dev->close();}
DeviceInfoFlags deviceInfoFlags() const {if (dev) return dev->infoFlags(); return 0;}
PIIODevice * dev;
PIByteArray buffer, tmpbuf, src_header, src_footer, trbuf;
PacketExtractorCheckFunc ret_func_header, ret_func_footer;
SplitMode mode_;
int buffer_size, dataSize, packetSize_hf, footerInd, packetSize_;
double time_;
bool header_found;
ullong missed, missed_packets;
};
#endif // PIPACKETEXTRACTOR_H