Files
pip/libs/main/io_utils/piconnection.cpp

1290 lines
38 KiB
C++

/*
PIP - Platform Independent Primitives
Complex I/O point
Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "piconnection.h"
#include "piconfig.h"
#include "piiostream.h"
/** \class PIConnection
* \brief Complex Input/Output point
*
* \section PIConnection_synopsis Synopsis
* %PIConnection provides abstract layer over physical devices,
* filtering and connecting data streams. Each %PIConnection
* works through Device Pool, so several %PIConnections can
* read from single physical device. General scheme:
* \image html piconnection.png
*
* \section PIConnection_pool Device pool concept
* Device pool is static object, single for each application, which
* contains unique devices. Each %PIConnection works with real devices
* through Device pool. Each device has assosiated thread for read
* and it can be started or stopped with %PIConnection functions
* \a startThreadedRead() and \a stopThreadedRead().
*
* \section PIConnection_filters Filters
* %PIConnection filter is a PIPacketExtractor and assosiated
* array of devices or other filters. When read thread is successfully read
* from device this data can be passed to one or more filters. Each filter
* has name and filter names should be unique. You can use this name for
* access to PIPacketExtractor* with function \a filter(), or get array of
* assosiated devices and filters with function \a filterBoundedDevices().
* One filter can receive data from several sources, and can be bounded to
* several filters.
* \image html piconnection_filters.png
*
* \section PIConnection_diag Diagnostics
* %PIConnection create PIDiagnostics for each device or filter. You can
* access to these objects with functions \a diagnostic().
*
* \section PIConnection_sender Senders
* %PIConnection can send data to devices with named timers ("senders").
* You can create sender or add device to sender with function \a addSender().
* Each sender has internal timer and every tick execute virtual function
* \a senderData(). Returns value of this function sended to bounded devices.
* You can assign fixed send data to sender with function \a setSenderFixedData().
* In this case sender will NOT execute \a senderData(), but send assigned data.
* \image html piconnection_senders.png
*
* \section PIConnection_config Configuration
* You can create %PIConnection from config file section or configure
* it later with function \a configureFromConfig(). Devices describes
* with its full pathes, for details see \ref PIIODevice_sec7. Example:
* \image html piconnection_conf.png
* Also %PIConnection can create PIString with its configuration with
* function \a makeConfig(). This string can be directly inserted into the
* config file.
*
*/
PIVector<PIConnection *> PIConnection::_connections;
PIConnection::PIConnection(const PIString & name): PIObject(name) {
_connections << this;
}
PIConnection::PIConnection(const PIString & config, const PIString & name_): PIObject(name_) {
_connections << this;
configureFromConfig(config, name_);
}
PIConnection::PIConnection(PIString * string, const PIString & name_): PIObject(name_) {
_connections << this;
configureFromString(string, name_);
}
PIConnection::~PIConnection() {
__device_pool__->unboundConnection(this);
removeAllFilters();
_connections.removeAll(this);
}
bool PIConnection::configureFromConfig(const PIString & conf_path, const PIString & name_) {
PIConfig conf(conf_path, PIIODevice::ReadOnly);
return configure(conf, name_);
}
bool PIConnection::configureFromString(PIString * string, const PIString & name_) {
PIConfig conf(string, PIIODevice::ReadOnly);
return configure(conf, name_);
}
bool PIConnection::configure(PIConfig & conf, const PIString & name_) {
if (!conf.isOpened()) return false;
__device_pool__->unboundConnection(this);
removeAllSenders();
removeAllChannels();
removeAllFilters();
removeAllDevices();
setName(name_);
if (name_.isEmpty()) piCoutObj << "Warning, can't configure connection with empty name";
PIConfig::Entry ce(conf.getValue(name_));
PIConfig::Branch db(ce.getValue("device").children()), fb(ce.getValue("filter").children()), cb(ce.getValue("channel").children()),
sb(ce.getValue("sender").children());
PIStringList dev_list(ce.getValue("device").toString());
PIStringList name_list(ce.getValue("device").name());
PIStringList flt_list(ce.getValue("filter").toString());
for (const PIConfig::Entry * e: db) {
dev_list << e->value();
name_list << e->name();
}
for (const PIConfig::Entry * e: fb) {
flt_list << e->name();
}
PISet<PIString> chk_set = (PISet<PIString>(name_list) & PISet<PIString>(flt_list));
// piCout << name_list << flt_list << chk_set;
chk_set.remove("");
if (!chk_set.isEmpty()) {
piCoutObj << "Error," << chk_set.toVector() << "names assigned to both devices and filters!";
return false;
}
PIMap<PIString, PIString> dev_aliases;
for (int i = 0; i < dev_list.size_s(); ++i) {
PIString fn(dev_list[i]);
if (fn.isEmpty()) continue;
PIString & n(name_list[i]);
PIIODevice::DeviceMode dm = PIIODevice::ReadWrite;
PIIODevice::splitFullPath(fn, &fn, &dm);
// piCout << fn;
// piCoutObj << "add" << fn << n;
PIIODevice * dev = addDevice(fn, dm);
if (!dev) continue;
dev_aliases[n] = fn;
setDeviceName(dev, n);
dev->setName(name_ + ".device." + dev_list[i]);
PIConfig::Entry de = ce.getValue("device." + n);
dev->setThreadedReadBufferSize(de.getValue("bufferSize", dev->threadedReadBufferSize()).toInt());
PIDiagnostics * diag = diags_.value(dev, nullptr);
if (diag) diag->setDisconnectTimeout(de.getValue("disconnectTimeout", diag->disconnectTimeout()).toFloat());
}
int added(0), padded(-1), tries(0);
bool pdebug = debug();
setDebug(false);
PIStringList filter_fails;
while (added != padded && tries < 100) {
padded = added;
added = 0;
++tries;
for (const PIConfig::Entry * e: fb) {
PIPacketExtractor::SplitMode sm = PIPacketExtractor::None;
PIString sms(e->getValue("splitMode").value());
int smi = sms.toInt();
if (smi >= 1 && smi <= 5)
sm = (PIPacketExtractor::SplitMode)smi;
else {
sms = sms.trim().toLowerCase();
if (sms.find("header") >= 0 && sms.find("footer") >= 0)
sm = PIPacketExtractor::HeaderAndFooter;
else {
if (sms.find("header") >= 0)
sm = PIPacketExtractor::Header;
else {
if (sms.find("footer") >= 0)
sm = PIPacketExtractor::Footer;
else {
if (sms.find("time") >= 0)
sm = PIPacketExtractor::Timeout;
else {
if (sms.find("size") >= 0) sm = PIPacketExtractor::Size;
}
}
}
}
}
PIStringList devs(e->value());
PIConfig::Branch db(e->getValue("device").children());
devs << e->getValue("device", "").value();
for (const PIConfig::Entry * e2: db) {
devs << e2->value();
}
devs.removeStrings("");
if (devs.isEmpty()) continue;
PIString dname = dev_aliases.value(devs.front(), devs.front());
PIPacketExtractor * pe = addFilter(e->name(), dname, sm);
if (!pe) {
if (!filter_fails.contains(dname)) filter_fails << dname;
continue;
} else {
filter_fails.removeAll(dname);
}
++added;
for (int i = 1; i < devs.size_s(); ++i) {
dname = dev_aliases.value(devs[i], devs[i]);
if (addFilter(e->name(), dname, sm)) {
filter_fails.removeAll(dname);
++added;
} else {
if (!filter_fails.contains(dname)) filter_fails << dname;
}
}
PIDiagnostics * diag = diags_.value(pe, nullptr);
if (diag) diag->setDisconnectTimeout(e->getValue("disconnectTimeout", diag->disconnectTimeout()).toFloat());
pe->setPayloadSize(e->getValue("payloadSize", pe->payloadSize()).toInt());
pe->setTimeout(e->getValue("timeout", pe->timeout()).toDouble());
pe->setHeader(PIByteArray::fromUserInput(e->getValue("header", "").toString()));
pe->setFooter(PIByteArray::fromUserInput(e->getValue("footer", "").toString()));
}
}
setDebug(pdebug);
for (const PIString & f: filter_fails) {
piCoutObj << "\"addFilter\" error: no such device \"" << f << "\"!";
}
for (const PIConfig::Entry * e: cb) {
PIString f(e->getValue("from").value()), t(e->getValue("to").value());
addChannel(dev_aliases.value(f, f), dev_aliases.value(t, t));
}
for (const PIConfig::Entry * e: sb) {
PIStringList devs(e->value());
PIConfig::Branch db(e->getValue("device").children());
devs << e->getValue("device", "").value();
for (const PIConfig::Entry * e2: db) {
devs << e2->value();
}
devs.removeStrings("");
if (devs.isEmpty()) continue;
float freq = e->getValue("frequency").toFloat();
for (const PIString & d: devs) {
addSender(e->name(), dev_aliases.value(d, d), freq);
}
PIByteArray fd(PIByteArray::fromUserInput(e->getValue("fixedData").toString()));
setSenderFixedData(e->name(), fd);
}
return true;
}
PIString PIConnection::makeConfig() const {
PIString ret;
PIIOTextStream ts(&ret, PIIODevice::WriteOnly);
ts << "[" << name() << "]\n";
PIVector<PIIODevice *> devs(boundedDevices());
int dn(-1);
for (const PIIODevice * d: devs) {
PIStringList dnl(deviceNames(d));
if (dnl.isEmpty()) dnl << PIString::fromNumber(++dn);
for (const PIString & dname: dnl) {
ts << "device." << dname << " = " << d->constructFullPath() << " #s\n";
ts << "device." << dname << ".bufferSize = " << d->threadedReadBufferSize() << " #n\n";
PIDiagnostics * diag = diags_.value(const_cast<PIIODevice *>(d), nullptr);
if (diag) ts << "device." << dname << ".disconnectTimeout = " << diag->disconnectTimeout() << " #f\n";
}
}
auto ite = extractors.makeIterator();
while (ite.next()) {
if (!ite.value()) continue;
if (!ite.value()->extractor) continue;
PIString prefix = "filter." + ite.key();
for (int i = 0; i < ite.value()->devices.size_s(); ++i) {
PIString dname = device_names.key(ite.value()->devices[i]);
if (dname.isEmpty()) dname = devPath(ite.value()->devices[i]);
ts << prefix << ".device." << i << " = " << dname << " #s\n";
}
PIDiagnostics * diag = diags_.value(ite.value()->extractor, nullptr);
if (diag) ts << prefix << ".disconnectTimeout = " << diag->disconnectTimeout() << " #f\n";
ts << prefix << ".splitMode = ";
switch (ite.value()->extractor->splitMode()) {
case PIPacketExtractor::None: ts << "none"; break;
case PIPacketExtractor::Header: ts << "header"; break;
case PIPacketExtractor::Footer: ts << "footer"; break;
case PIPacketExtractor::HeaderAndFooter: ts << "header & footer"; break;
case PIPacketExtractor::Size: ts << "size"; break;
case PIPacketExtractor::Timeout: ts << "timeout"; break;
}
ts << " #s\n";
ts << prefix << ".payloadSize = " << ite.value()->extractor->payloadSize() << " #n\n";
ts << prefix << ".timeout = " << ite.value()->extractor->timeout() << " #f\n";
ts << prefix << ".header = " << ite.value()->extractor->header().toString() << " #s\n";
ts << prefix << ".footer = " << ite.value()->extractor->footer().toString() << " #s\n";
}
dn = 0;
auto itc = channels_.makeIterator();
while (itc.next()) {
for (const PIIODevice * d: itc.value()) {
PIString prefix = "channel." + PIString::fromNumber(dn);
++dn;
PIString dname = device_names.key(itc.key());
if (dname.isEmpty()) dname = devPath(itc.key());
ts << prefix << ".from = " << dname << " #s\n";
dname = device_names.key(const_cast<PIIODevice *>(d));
if (dname.isEmpty()) dname = devPath(d);
ts << prefix << ".to = " << dname << " #s\n";
}
}
auto its = senders.makeIterator();
while (its.next()) {
if (!its.value()) continue;
PIString prefix = "sender." + its.value()->name();
for (int i = 0; i < its.value()->devices.size_s(); ++i) {
PIString dname = device_names.key(its.value()->devices[i]);
if (dname.isEmpty()) dname = devPath(its.value()->devices[i]);
ts << prefix << ".device." << i << " = " << dname << " #s\n";
}
double int_ = its.value()->int_;
if (int_ > 0.) ts << prefix << ".frequency = " << (1000. / int_) << " #f\n";
if (!its.value()->sdata.isEmpty()) ts << prefix << ".fixedData = " << its.value()->sdata.toString() << " #s\n";
}
ts << "[]\n";
return ret;
}
PIIODevice * PIConnection::addDevice(const PIString & full_path, PIIODevice::DeviceMode mode, bool start) {
PIString fp(PIIODevice::normalizeFullPath(full_path));
PIIODevice * dev = __device_pool__->addDevice(this, fp, mode, start);
if (dev) {
dev->setName(name() + ".device." + fp);
device_modes[dev] = mode;
__device_pool__->lock();
if (!diags_.value(dev, nullptr)) {
PIDiagnostics * d = new PIDiagnostics(false);
d->setInterval(100.);
diags_[dev] = d;
CONNECT2(void, PIDiagnostics::Quality, PIDiagnostics::Quality, d, qualityChanged, this, diagQualityChanged);
__device_pool__->init();
}
__device_pool__->unlock();
}
return dev;
}
void PIConnection::setDeviceName(PIIODevice * dev, const PIString & name) {
if (!dev) return;
device_names[name] = dev;
}
PIStringList PIConnection::deviceNames(const PIIODevice * dev) const {
PIStringList ret;
auto it = device_names.makeIterator();
while (it.next()) {
if (it.value() == dev) ret << it.key();
}
return ret;
}
bool PIConnection::removeDevice(const PIString & full_path) {
PIString fp(PIIODevice::normalizeFullPath(full_path));
PIIODevice * dev = __device_pool__->device(fp);
if (!dev) return false;
PIStringList dntd(deviceNames(dev));
for (const PIString & n: dntd) {
device_names.remove(n);
}
for (auto s = senders.begin(); s != senders.end(); s++) {
if (!s.value()) continue;
s.value()->lock();
s.value()->devices.removeAll(dev);
s.value()->unlock();
}
device_modes.remove(dev);
for (auto i = extractors.begin(); i != extractors.end(); i++) {
if (!i.value()) continue;
i.value()->devices.removeAll(dev);
}
bounded_extractors.remove(dev);
channels_.remove(dev);
auto it = channels_.makeIterator();
while (it.next()) {
it.value().removeAll(dev);
}
__device_pool__->lock();
PIDiagnostics * dg = diags_.value(dev, nullptr);
if (dg) delete dg;
diags_.remove(dev);
__device_pool__->unlock();
return __device_pool__->removeDevice(this, fp);
}
void PIConnection::removeAllDevices() {
device_names.clear();
PIVector<PIIODevice *> bdevs(__device_pool__->boundedDevices(this));
__device_pool__->lock();
for (PIIODevice * d: bdevs) {
for (auto s = senders.begin(); s != senders.end(); s++) {
if (!s.value()) continue;
s.value()->lock();
s.value()->devices.removeAll(d);
s.value()->unlock();
}
channels_.remove(d);
auto it = channels_.makeIterator();
while (it.next()) {
it.value().removeAll(d);
}
PIDiagnostics * dg = diags_.value(d, nullptr);
if (dg) delete dg;
diags_.remove(d);
}
__device_pool__->unboundConnection(this);
__device_pool__->unlock();
device_modes.clear();
bounded_extractors.clear();
for (auto i = extractors.begin(); i != extractors.end(); i++) {
if (!i.value()) continue;
i.value()->devices.clear();
}
}
PIIODevice * PIConnection::deviceByFullPath(const PIString & full_path) const {
PIString fp(PIIODevice::normalizeFullPath(full_path));
DevicePool::DeviceData * dd = __device_pool__->devices.value(fp, nullptr);
if (!dd) return nullptr;
if (!dd->dev) return nullptr;
if (!dd->listeners.contains(const_cast<PIConnection *>(this))) return nullptr;
return dd->dev;
}
PIIODevice * PIConnection::deviceByName(const PIString & name) const {
return device_names.value(name, nullptr);
}
PIVector<PIIODevice *> PIConnection::boundedDevices() const {
return __device_pool__->boundedDevices(this);
}
PIPacketExtractor * PIConnection::addFilter(const PIString & name_, const PIString & full_path, PIPacketExtractor::SplitMode mode) {
PIString fname_ = name_.trimmed();
Extractor * e = extractors.value(fname_, nullptr);
if (full_path.isEmpty()) return (e ? e->extractor : nullptr);
PIIODevice * dev = devByString(full_path);
PIPacketExtractor * pe = nullptr;
if (extractors.value(full_path, nullptr)) pe = extractors.value(full_path, nullptr)->extractor;
if (pe) dev = pe;
if (!dev) {
piCoutObj << "\"addFilter\" error: no such device or filter \"" << full_path << "\"!";
return nullptr;
}
if (!e) {
e = new Extractor();
extractors[fname_] = e;
}
if (!e->extractor) {
e->extractor = new PIPacketExtractor(nullptr, mode);
e->extractor->setName(fname_);
__device_pool__->lock();
if (!diags_.value(e->extractor, nullptr)) {
PIDiagnostics * d = new PIDiagnostics(false);
d->setInterval(100.);
diags_[e->extractor] = d;
CONNECT2(void, PIDiagnostics::Quality, PIDiagnostics::Quality, d, qualityChanged, this, diagQualityChanged);
}
__device_pool__->unlock();
CONNECT2(void, const uchar *, int, e->extractor, packetReceived, this, packetExtractorReceived)
}
if (!e->devices.contains(dev)) {
bounded_extractors[dev] << e->extractor;
// if (PIString(dev->className()) == "PIPacketExtractor") dev->setThreadSafe(false);
e->devices << dev;
}
return e->extractor;
}
PIPacketExtractor * PIConnection::addFilter(PIPacketExtractor * filter, const PIString & full_path) {
Extractor * e = nullptr;
if (full_path.isEmpty()) return nullptr;
PIIODevice * dev = devByString(full_path);
PIPacketExtractor * pe = nullptr;
e = extractors.value(full_path, nullptr);
if (e) pe = e->extractor;
if (pe) {
dev = pe;
} else {
piCoutObj << "\"addFilter\" error: no such device or filter \"" << full_path << "\"!";
return nullptr;
}
if (!e) {
e = new Extractor();
extractors[filter->name()] = e;
}
if (!e->extractor) {
e->extractor = filter;
__device_pool__->lock();
if (diags_.value(e->extractor, 0) == 0) {
PIDiagnostics * d = new PIDiagnostics(false);
d->setInterval(100.);
diags_[e->extractor] = d;
CONNECT2(void, PIDiagnostics::Quality, PIDiagnostics::Quality, d, qualityChanged, this, diagQualityChanged);
}
__device_pool__->unlock();
CONNECT2(void, const uchar *, int, e->extractor, packetReceived, this, packetExtractorReceived)
}
if (!e->devices.contains(dev)) {
bounded_extractors[dev] << e->extractor;
e->devices << dev;
}
return e->extractor;
}
bool PIConnection::removeFilter(const PIString & name_, const PIString & full_path) {
return removeFilter(name_, devByString(full_path));
}
bool PIConnection::removeFilter(const PIString & name_, const PIIODevice * dev) {
if (!dev) return false;
Extractor * p = extractors.value(name_.trimmed(), nullptr);
if (!p) return false;
bool ret = false;
for (int i = 0; i < p->devices.size_s(); ++i) {
if (p->devices[i] == dev) {
bounded_extractors[p->devices[i]].removeAll(p->extractor);
p->devices.remove(i);
--i;
ret = true;
}
}
if (p->devices.isEmpty()) {
unboundExtractor(p->extractor);
delete p;
}
return ret;
}
bool PIConnection::removeFilter(const PIString & name_) {
Extractor * p = extractors.value(name_.trimmed(), nullptr);
if (!p) return false;
unboundExtractor(p->extractor);
delete p;
return true;
}
void PIConnection::removeAllFilters() {
__device_pool__->lock();
for (auto i = extractors.begin(); i != extractors.end(); i++) {
if (!i.value()) continue;
channels_.remove(i.value()->extractor);
auto it = channels_.makeIterator();
while (it.next()) {
it.value().removeAll(i.value()->extractor);
}
if (diags_.value(i.value()->extractor)) {
delete diags_.value(i.value()->extractor);
}
diags_.remove(i.value()->extractor);
delete i.value();
}
extractors.clear();
bounded_extractors.clear();
__device_pool__->unlock();
}
PIVector<PIPacketExtractor *> PIConnection::filters() const {
PIVector<PIPacketExtractor *> ret;
for (auto i = extractors.begin(); i != extractors.end(); i++) {
if (i.value()) {
if (i.value()->extractor) ret << i.value()->extractor;
}
}
return ret;
}
PIStringList PIConnection::filterNames() const {
PIStringList ret;
for (auto i = extractors.begin(); i != extractors.end(); i++) {
if (i.value())
if (i.value()->extractor) ret << i.key();
}
return ret;
}
PIPacketExtractor * PIConnection::filter(const PIString & name_) const {
PIString fname_ = name_.trimmed();
for (auto i = extractors.begin(); i != extractors.end(); i++) {
if (i.value()) {
if ((i.value()->extractor) && (i.key() == fname_)) return i.value()->extractor;
}
}
return nullptr;
}
PIVector<PIIODevice *> PIConnection::filterBoundedDevices(const PIString & name_) const {
PIVector<PIIODevice *> ret;
Extractor * p = extractors.value(name_.trimmed());
if (!p) return ret;
return p->devices;
}
bool PIConnection::addChannel(const PIString & name0, const PIString & name1) {
// piCout << "addChannel" << name0 << name1;
if (name0.isEmpty() || name1.isEmpty()) return false;
PIIODevice * dev0 = devByString(name0);
PIIODevice * dev1 = devByString(name1);
Extractor * p0 = extractors.value(name0, nullptr);
Extractor * p1 = extractors.value(name1, nullptr);
PIPacketExtractor * pe0 = nullptr;
PIPacketExtractor * pe1 = nullptr;
if (p0) pe0 = p0->extractor;
if (p1) pe1 = p1->extractor;
if (pe0) dev0 = pe0;
if (pe1) dev1 = pe1;
if (!dev0 || !dev1) {
if (!dev0) piCoutObj << "\"addChannel\" error: no such device \"" << name0 << "\"!";
if (!dev1) piCoutObj << "\"addChannel\" error: no such device \"" << name1 << "\"!";
return false;
}
if (!channels_[dev0].contains(dev1)) channels_[dev0] << dev1;
return true;
}
bool PIConnection::removeChannel(const PIString & name0, const PIString & name1) {
PIIODevice * dev0 = devByString(name0);
PIIODevice * dev1 = devByString(name1);
Extractor * p0 = extractors.value(name0, nullptr);
Extractor * p1 = extractors.value(name1, nullptr);
PIPacketExtractor * pe0 = nullptr;
PIPacketExtractor * pe1 = nullptr;
if (p0) pe0 = p0->extractor;
if (p1) pe1 = p1->extractor;
if (pe0) dev0 = pe0;
if (pe1) dev1 = pe1;
if (!dev0 || !dev1) return false;
channels_[dev0].removeAll(dev1);
return true;
}
bool PIConnection::removeChannel(const PIString & name0) {
PIIODevice * dev0 = devByString(name0);
Extractor * p0 = extractors.value(name0, nullptr);
PIPacketExtractor * pe0 = nullptr;
if (p0) pe0 = p0->extractor;
if (pe0) dev0 = pe0;
if (!dev0) return false;
channels_.remove(dev0);
auto it = channels_.makeIterator();
while (it.next()) {
it.value().removeAll(dev0);
}
return true;
}
void PIConnection::removeAllChannels() {
channels_.clear();
}
PIString PIConnection::devPath(const PIIODevice * d) const {
if (!d) return PIString();
if (strcmp(d->className(), "PIPacketExtractor") == 0) return d->name();
return d->constructFullPath();
}
PIString PIConnection::devFPath(const PIIODevice * d) const {
if (!d) return PIString();
if (d->isPropertyExists("__fullPath__")) return d->property("__fullPath__").toString();
return d->name();
}
PIIODevice * PIConnection::devByString(const PIString & s) const {
if (s.isEmpty()) return nullptr;
PIIODevice * ret = deviceByName(s);
if (!ret) ret = deviceByFullPath(s);
return ret;
}
PIVector<PIPair<PIString, PIString>> PIConnection::channels() const {
PIVector<PIPair<PIString, PIString>> ret;
auto it = channels_.makeIterator();
while (it.next()) {
PIString fp0(devFPath(it.key()));
for (const PIIODevice * d: it.value()) {
ret << PIPair<PIString, PIString>(fp0, devFPath(d));
}
}
return ret;
}
void PIConnection::addSender(const PIString & name_, const PIString & full_path_name, float frequency, bool start_) {
PIString fname_ = name_.trimmed();
if (full_path_name.isEmpty() || frequency <= 0.) return;
Sender * s = senders.value(fname_, nullptr);
if (!s) {
s = new Sender(this);
s->setName(fname_);
s->int_ = 1000. / frequency;
senders[fname_] = s;
}
PIIODevice * dev = devByString(full_path_name);
if (!dev) {
piCoutObj << "\"addSender\" error: no such device \"" << full_path_name << "\"!";
return;
}
if (!s->isRunning() && start_) {
// piCoutObj << name_ << "start" << 1000. / frequency;
if (!__device_pool__->fake) s->start(s->int_);
}
s->lock();
if (!s->devices.contains(dev)) s->devices << dev;
s->unlock();
}
bool PIConnection::removeSender(const PIString & name, const PIString & full_path_name) {
Sender * s = senders.value(name, nullptr);
PIIODevice * d = devByString(full_path_name);
if (!s || !d) return false;
s->lock();
bool ret = s->devices.contains(d);
if (ret) s->devices.removeAll(d);
s->unlock();
return ret;
}
bool PIConnection::removeSender(const PIString & name) {
Sender * s = senders.value(name, nullptr);
if (!s) return false;
delete s;
senders.remove(name);
return true;
}
bool PIConnection::setSenderFixedData(const PIString & name, const PIByteArray & data) {
Sender * s = senders.value(name, nullptr);
if (!s) return false;
s->lock();
s->sdata = data;
s->unlock();
return true;
}
bool PIConnection::clearSenderFixedData(const PIString & name) {
Sender * s = senders.value(name, nullptr);
if (!s) return false;
s->lock();
s->sdata.clear();
s->unlock();
return true;
}
PIByteArray PIConnection::senderFixedData(const PIString & name) const {
Sender * s = senders.value(name, nullptr);
if (!s) return PIByteArray();
return s->sdata;
}
float PIConnection::senderFrequency(const PIString & name) const {
Sender * s = senders.value(name, nullptr);
if (!s) return -1.f;
double i = s->interval();
if (i == 0.) return 0.f;
return 1000. / s->interval();
}
void PIConnection::removeAllSenders() {
for (auto s = senders.begin(); s != senders.end(); s++) {
if (s.value()) delete s.value();
}
senders.clear();
}
void PIConnection::startThreadedRead(const PIString & full_path_name) {
DevicePool::DeviceData * dd = __device_pool__->deviceData(devByString(full_path_name));
if (!dd) return;
if (!dd->dev) return;
if (dd->started || dd->dev->mode() == PIIODevice::WriteOnly) return;
if (!__device_pool__->fake) dd->rthread->start();
dd->started = true;
}
void PIConnection::startAllThreadedReads() {
for (auto d = __device_pool__->devices.begin(); d != __device_pool__->devices.end(); d++) {
startThreadedRead(d.key());
}
}
void PIConnection::startSender(const PIString & name) {
Sender * s = senders.value(name, nullptr);
if (!s) return;
if (!s->isRunning() && !__device_pool__->fake) s->start(s->int_);
}
void PIConnection::startAllSenders() {
for (auto s = senders.begin(); s != senders.end(); s++) {
if (!s.value()) continue;
if (!s.value()->isRunning() && !__device_pool__->fake) {
s.value()->start(s.value()->int_);
}
}
}
void PIConnection::stopThreadedRead(const PIString & full_path_name) {
DevicePool::DeviceData * dd = __device_pool__->deviceData(devByString(full_path_name));
if (!dd) return;
if (!dd->dev) return;
if (!dd->started || dd->dev->mode() == PIIODevice::WriteOnly) return;
dd->rthread->stop();
dd->started = false;
}
void PIConnection::stopAllThreadedReads() {
for (auto d = __device_pool__->devices.begin(); d != __device_pool__->devices.end(); d++) {
stopThreadedRead(d.key());
}
}
void PIConnection::stopSender(const PIString & name) {
Sender * s = senders.value(name, nullptr);
if (!s) return;
if (s->isRunning()) s->stop();
}
void PIConnection::stopAllSenders() {
for (auto s = senders.begin(); s != senders.end(); s++) {
if (!s.value()) continue;
if (s.value()->isRunning()) s.value()->stop();
}
}
PIDiagnostics * PIConnection::diagnostic(const PIString & full_path_name) const {
PIIODevice * dev = devByString(full_path_name);
Extractor * e = extractors.value(full_path_name, nullptr);
PIPacketExtractor * pe = nullptr;
if (e) pe = e->extractor;
if (pe) dev = pe;
if (!dev) return 0;
return diags_.value(dev, nullptr);
}
int PIConnection::writeByFullPath(const PIString & full_path, const PIByteArray & data) {
PIString fp = PIIODevice::normalizeFullPath(full_path);
PIIODevice * dev = __device_pool__->device(fp);
// piCout << "SEND" << full_path << fp;
if (!dev) {
piCoutObj << "No such full path \"" << full_path << "\"!";
return -1;
}
return write(dev, data);
}
int PIConnection::writeByName(const PIString & name_, const PIByteArray & data) {
PIIODevice * dev = deviceByName(name_);
if (!dev) {
piCoutObj << "No such device \"" << name_ << "\"!";
return -1;
}
return write(dev, data);
}
int PIConnection::write(PIIODevice * dev, const PIByteArray & data) {
if (!dev) {
piCoutObj << "Null Device!";
return -1;
}
if (!dev->isOpened()) return -1;
if (!dev->canWrite()) {
piCoutObj << "Device \"" << dev->constructFullPath() << "\" can`t write!";
return -1;
}
int ret = dev->write(data);
PIDiagnostics * diag = diags_.value(dev, nullptr);
if (diag && ret > 0) diag->sended(ret);
return ret;
}
PIVector<PIConnection *> PIConnection::allConnections() {
return _connections;
}
PIVector<PIIODevice *> PIConnection::allDevices() {
return __device_pool__->boundedDevices();
}
bool PIConnection::setFakeMode(bool yes) {
bool ret = isFakeMode();
__device_pool__->fake = yes;
return ret;
}
bool PIConnection::isFakeMode() {
return __device_pool__->fake;
}
PIConnection::DevicePool::DevicePool(): PIThread(false, 10) {
setName("PIConnection::DevicePool");
needLockRun(true);
fake = false;
}
PIConnection::DevicePool::~DevicePool() {}
void PIConnection::DevicePool::init() {
if (!isRunning()) start(100);
}
PIIODevice * PIConnection::DevicePool::addDevice(PIConnection * parent, const PIString & fp, PIIODevice::DeviceMode mode, bool start) {
DeviceData * dd = devices[fp];
int pmode = 0;
bool need_start = false;
if (!dd) {
dd = new DeviceData();
devices[fp] = dd;
}
if (!dd->dev) {
// piCout << "new device" << fp;
dd->dev = PIIODevice::createFromFullPath(fp);
if (!dd->dev) {
piCoutObj << "Error: can`t create device \"" << fp << "\"!"; //:" << errorString();
return nullptr;
}
dd->dev->setProperty("__fullPath__", fp);
} else {
pmode = dd->dev->mode();
}
if (!dd->listeners.contains(parent)) dd->listeners << parent;
if (pmode == mode || pmode == PIIODevice::ReadWrite) return dd->dev;
if ((mode & PIIODevice::ReadOnly) > 0) {
if (dd->rthread) {
delete dd->rthread;
dd->rthread = nullptr;
dd->started = false;
}
dd->rthread = new PIThread(dd, __DevicePool_threadReadDP);
dd->rthread->setName("__S__connection_" + fp + "_read_thread");
need_start = true;
pmode |= PIIODevice::ReadOnly;
}
if ((mode & PIIODevice::WriteOnly) > 0) pmode |= PIIODevice::WriteOnly;
if (!fake) {
dd->dev->close();
dd->dev->open((PIIODevice::DeviceMode)pmode);
} else {
dd->dev->setMode((PIIODevice::DeviceMode)pmode);
}
if (need_start && start) {
if (!fake) dd->rthread->start();
dd->started = true;
}
return dd->dev;
}
bool PIConnection::DevicePool::removeDevice(PIConnection * parent, const PIString & fp) {
DeviceData * dd = devices.value(fp, nullptr);
if (!dd) return false;
if (!dd->dev) return false;
bool ok = dd->listeners.contains(parent);
dd->listeners.removeAll(parent);
if (dd->listeners.isEmpty()) {
delete dd;
devices.remove(fp);
}
return ok;
}
void PIConnection::DevicePool::unboundConnection(PIConnection * parent) {
PIStringList rem;
for (auto i = devices.begin(); i != devices.end(); i++) {
if (!i.value()) {
rem << i.key();
continue;
}
i.value()->listeners.removeAll(parent);
if (i.value()->listeners.isEmpty()) rem << i.key();
}
for (const PIString & i: rem) {
DeviceData * dd = devices.value(i, nullptr);
if (!dd) continue;
delete dd;
devices.remove(i);
}
}
PIIODevice * PIConnection::DevicePool::device(const PIString & fp) const {
DeviceData * dd = devices.value(fp, nullptr);
if (!dd) return nullptr;
return dd->dev;
}
PIConnection::DevicePool::DeviceData * PIConnection::DevicePool::deviceData(PIIODevice * d) const {
if (!d) return nullptr;
auto it = devices.makeIterator();
while (it.next()) {
if (!it.value()) continue;
if (it.value()->dev == d) return it.value();
}
return nullptr;
}
PIVector<PIConnection *> PIConnection::DevicePool::boundedConnections() const {
PIVector<PIConnection *> ret;
auto it = devices.makeIterator();
while (it.next()) {
if (!it.value()) continue;
ret << it.value()->listeners;
}
for (int i = 0; i < ret.size_s(); ++i) {
for (int j = i + 1; j < ret.size_s(); ++j) {
if (ret[i] == ret[j]) {
ret.remove(j);
--j;
}
}
}
return ret;
}
PIVector<PIIODevice *> PIConnection::DevicePool::boundedDevices() const {
PIVector<PIIODevice *> ret;
auto it = devices.makeIterator();
while (it.next()) {
if (!it.value()) continue;
if (!it.value()->dev) continue;
ret << it.value()->dev;
}
return ret;
}
PIVector<PIIODevice *> PIConnection::DevicePool::boundedDevices(const PIConnection * parent) const {
PIVector<PIIODevice *> ret;
auto it = devices.makeIterator();
while (it.next()) {
if (!it.value()) continue;
if (!it.value()->dev) continue;
if (it.value()->listeners.contains(const_cast<PIConnection *>(parent))) {
ret << it.value()->dev;
}
}
return ret;
}
PIConnection::DevicePool::DeviceData::~DeviceData() {
if (rthread) {
rthread->stop();
if (dev) dev->interrupt();
if (!rthread->waitForFinish(1000)) rthread->terminate();
delete rthread;
rthread = nullptr;
}
if (dev) {
dev->close();
delete dev;
dev = nullptr;
}
}
void PIConnection::DevicePool::run() {
PIVector<PIConnection *> conns(PIConnection::allConnections());
for (PIConnection * c: conns) {
for (auto d = c->diags_.begin(); d != c->diags_.end(); d++) {
if (!d.value()) continue;
d.value()->tick(0, 1);
}
}
}
void __DevicePool_threadReadDP(void * ddp) {
PIConnection::DevicePool::DeviceData * dd((PIConnection::DevicePool::DeviceData *)ddp);
PIIODevice * dev = dd->dev;
if (!dev) {
piMSleep(100);
return;
}
if (dev->isClosed()) {
if (!dev->open()) {
PITimeMeasurer tm;
int timeout = dev->reopenTimeout();
while (tm.elapsed_m() < timeout) {
if (dd->rthread->isStopping()) return;
piMSleep(50);
}
}
}
PIByteArray ba;
ba = dev->read(dev->threadedReadBufferSize());
if (ba.isEmpty()) {
piMSleep(10);
return;
}
dev->threadedRead(ba.data(), ba.size_s());
dev->threadedReadEvent(ba.data(), ba.size_s());
// piCout << "Readed from" << dd->dev->path() << Hex << ba;
__device_pool__->deviceReaded(dd, ba);
}
void PIConnection::DevicePool::deviceReaded(PIConnection::DevicePool::DeviceData * dd, const PIByteArray & data) {
PIString from = dd->dev->property("__fullPath__").toString();
for (PIConnection * ld: dd->listeners) {
ld->rawReceived(dd->dev, from, data);
}
}
void PIConnection::rawReceived(PIIODevice * dev, const PIString & from, const PIByteArray & data) {
dataReceived(from, data);
dataReceivedEvent(from, data);
PIVector<PIPacketExtractor *> be(bounded_extractors.value(dev));
// piCout << be;
for (PIPacketExtractor * i: be) {
i->threadedRead(data.data(), data.size_s());
}
PIVector<PIIODevice *> chd(channels_.value(dev));
for (PIIODevice * d: chd) {
int ret = d->write(data);
PIDiagnostics * diag = diags_.value(d, nullptr);
if (diag && ret > 0) diag->sended(ret);
}
PIDiagnostics * diag = diags_.value(dev, nullptr);
if (diag) diag->received(data.size_s());
}
PIByteArray PIConnection::senderData(const PIString & sender_name) {
return PIByteArray();
}
PIConnection::Extractor::~Extractor() {
if (extractor) {
delete extractor;
extractor = nullptr;
}
}
PIConnection::Sender::Sender(PIConnection * parent_): parent(parent_), int_(0.f) {
setName("__S__.PIConnection.Sender");
needLockRun(true);
}
void PIConnection::Sender::tick(void *, int) {
if (!parent) return;
PIByteArray data;
if (!sdata.isEmpty())
data = sdata;
else
data = parent->senderData(name());
if (data.isEmpty()) return;
// piCoutObj << "write"<<data.size()<<"bytes to"<<devices.size()<<"devices";
for (PIIODevice * d: devices) {
int ret = d->write(data);
PIDiagnostics * diag = parent->diags_.value(d, nullptr);
if (diag && ret > 0) diag->sended(ret);
}
}
void PIConnection::unboundExtractor(PIPacketExtractor * pe) {
if (!pe) return;
channels_.remove(pe);
auto it = channels_.makeIterator();
while (it.next()) {
it.value().removeAll(pe);
}
bounded_extractors.remove(pe);
PIVector<PIIODevice *> k = bounded_extractors.keys();
for (PIIODevice * i: k) {
PIVector<PIPacketExtractor *> & be(bounded_extractors[i]);
be.removeAll(pe);
if (be.isEmpty()) bounded_extractors.remove(i);
}
__device_pool__->lock();
if (diags_.value(pe, nullptr)) delete diags_.value(pe);
diags_.remove(pe);
extractors.remove(pe->name());
__device_pool__->unlock();
}
void PIConnection::packetExtractorReceived(const uchar * data, int size) {
PIString from(emitter() ? emitter()->name() : PIString());
PIIODevice * cd = (PIIODevice *)emitter();
// piCout << "packetExtractorReceived" << from << cd;
if (cd) {
PIVector<PIPacketExtractor *> be(bounded_extractors.value(cd));
// piCout << be << (void*)data << size;
for (PIPacketExtractor * i: be) {
i->threadedRead(data, size);
}
PIVector<PIIODevice *> chd(channels_.value(cd));
for (PIIODevice * d: chd) {
int ret = d->write(data, size);
PIDiagnostics * diag = diags_.value(d);
if (diag) diag->sended(ret);
}
PIDiagnostics * diag = diags_.value(cd);
if (diag) diag->received(size);
}
packetReceived(from, PIByteArray(data, size));
packetReceivedEvent(from, PIByteArray(data, size));
}
void PIConnection::diagQualityChanged(PIDiagnostics::Quality new_quality, PIDiagnostics::Quality old_quality) {
qualityChanged(diags_.key((PIDiagnostics *)emitter()), new_quality, old_quality);
}
PIConnection::DevicePool * __device_pool__;
bool __DevicePoolContainer__::inited_(false);
__DevicePoolContainer__::__DevicePoolContainer__() {
if (inited_) return;
inited_ = true;
__device_pool__ = new PIConnection::DevicePool();
}