From 4584d9c6390791f5ea16d376005fafeeb542e4cd Mon Sep 17 00:00:00 2001 From: peri4 Date: Wed, 7 Apr 2021 22:13:56 +0300 Subject: [PATCH] PIObject::deleted now has 1 argument PIIODevice small refactoring new PIIODevice virtual methods: threadedReadTerminated() and threadedWriteTerminated() PIIODevice::stop now accept bool "hard" instead of "wait" PIStreamPacker new features: packet size crypt and aggressive optimization --- CMakeLists.txt | 4 +- libs/cloud/picloudserver.cpp | 37 ++++++++++- libs/io_utils/pistreampacker.cpp | 49 ++++++++++++-- libs/main/cloud/picloudserver.h | 6 +- libs/main/core/piobject.cpp | 2 +- libs/main/core/piobject.h | 6 +- libs/main/io_devices/pibinarylog.cpp | 4 +- libs/main/io_devices/pibinarylog.h | 1 + libs/main/io_devices/piethernet.cpp | 4 +- libs/main/io_devices/piethernet.h | 2 +- libs/main/io_devices/piiodevice.cpp | 96 +++++++++++++++++++++++++--- libs/main/io_devices/piiodevice.h | 36 ++++++----- libs/main/io_utils/pistreampacker.h | 22 +++++-- 13 files changed, 219 insertions(+), 50 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 499999bb..0b6751dc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,9 +2,9 @@ cmake_minimum_required(VERSION 3.0) cmake_policy(SET CMP0017 NEW) # need include() with .cmake project(pip) set(pip_MAJOR 2) -set(pip_MINOR 20) +set(pip_MINOR 21) set(pip_REVISION 0) -set(pip_SUFFIX ) +set(pip_SUFFIX alpha) set(pip_COMPANY SHS) set(pip_DOMAIN org.SHS) diff --git a/libs/cloud/picloudserver.cpp b/libs/cloud/picloudserver.cpp index e28d4e5f..6547e84e 100644 --- a/libs/cloud/picloudserver.cpp +++ b/libs/cloud/picloudserver.cpp @@ -47,6 +47,12 @@ void PICloudServer::setServerName(const PIString & server_name) { } +PIVector PICloudServer::clients() const { + PIMutexLocker _ml(clients_mutex); + return clients_; +} + + bool PICloudServer::openDevice() { piCout << "PICloudServer open device" << path(); bool op = eth.connect(path(), false); @@ -61,10 +67,12 @@ bool PICloudServer::openDevice() { bool PICloudServer::closeDevice() { eth.stop(); - for (auto c : clients) { + clients_mutex.lock(); + for (auto c : clients_) { c->stop(); c->close(); } + clients_mutex.unlock(); eth.close(); return true; } @@ -103,6 +111,7 @@ PICloudServer::Client::~Client() { is_connected = false; cond_buff.notifyOne(); } + stop(); close(); } @@ -156,32 +165,56 @@ void PICloudServer::_readed(PIByteArray & ba) { switch (hdr.first) { case PICloud::TCP::Connect: { uint id = tcp.parseConnect(ba); + clients_mutex.lock(); Client * oc = index_clients.value(id, nullptr); if (oc) { + clients_mutex.unlock(); tcp.sendDisconnected(id); } else { piCoutObj << "new Client" << id; Client * c = new Client(this, id); - clients << c; + CONNECTU(c, deleted, this, clientDeleted) + clients_ << c; index_clients.insert(id, c); + clients_mutex.unlock(); newConnection(c); } } break; case PICloud::TCP::Disconnect: { uint id = tcp.parseDisconnect(ba); + clients_mutex.lock(); Client * oc = index_clients.value(id, nullptr); if (oc) { oc->is_connected = false; oc->close(); } + clients_mutex.unlock(); } break; case PICloud::TCP::Data: { PIPair d = tcp.parseDataServer(ba); + clients_mutex.lock(); Client * oc = index_clients.value(d.first, nullptr); if (oc && !d.second.isEmpty()) oc->pushBuffer(d.second); + clients_mutex.unlock(); } break; default: break; } } } + + +void PICloudServer::clientDeleted(PIObject * o) { + PICloudServer::Client * c = (PICloudServer::Client*)o; + clients_mutex.lock(); + clients_.removeOne(c); + auto it = index_clients.makeIterator(); + while (it.hasNext()) { + it.next(); + if (it.value() == c) { + index_clients.remove(it.key()); + break; + } + } + clients_mutex.unlock(); +} diff --git a/libs/io_utils/pistreampacker.cpp b/libs/io_utils/pistreampacker.cpp index 6d0f4ab9..5e8ef86e 100644 --- a/libs/io_utils/pistreampacker.cpp +++ b/libs/io_utils/pistreampacker.cpp @@ -47,8 +47,10 @@ PIStreamPacker::PIStreamPacker(PIIODevice * dev): PIObject() { - crypt_frag = false; + crypt_frag = crypt_size = false; + aggressive_optimization = true; packet_size = -1; + size_crypted_size = sizeof(int); crypt_frag_size = 1024*1024; max_packet_size = 1400; packet_sign = 0xAFBE; @@ -56,6 +58,16 @@ PIStreamPacker::PIStreamPacker(PIIODevice * dev): PIObject() { } +void PIStreamPacker::setCryptSizeEnabled(bool on) { + crypt_size = on; + if (crypt_size) { + PIByteArray ba; ba << int(0); + size_crypted_size = cryptData(ba).size_s(); + } else + size_crypted_size = sizeof(int); +} + + void PIStreamPacker::send(const PIByteArray & data) { if (data.isEmpty()) return; PIByteArray cd; @@ -74,7 +86,12 @@ void PIStreamPacker::send(const PIByteArray & data) { } //piCout << "crypt" << data.size() << "->" << cd.size() << key().size(); PIByteArray hdr, part; - hdr << packet_sign << int(cd.size_s()); + hdr << packet_sign; + if (crypt_size) { + PIByteArray crsz; crsz << int(cd.size_s()); + hdr.append(cryptData(crsz)); + } else + hdr << int(cd.size_s()); cd.insert(0, hdr); int pcnt = (cd.size_s() - 1) / max_packet_size + 1, pst = 0; if (pcnt > 1) { @@ -114,21 +131,37 @@ void PIStreamPacker::received(uchar * readed, int size) { void PIStreamPacker::received(const PIByteArray & data) { stream.append(data); //piCout << "rec" << data.size(); - while (stream.size_s() >= 6) { + while (!stream.isEmpty()) { + int hdr_size = sizeof(packet_sign) + size_crypted_size; if (packet_size < 0) { + if (stream.size_s() < hdr_size) return; ushort sign(0); memcpy(&sign, stream.data(), 2); if (sign != packet_sign) { - stream.pop_front(); + if (aggressive_optimization) stream.clear(); + else stream.pop_front(); continue; } int sz = -1; - memcpy(&sz, stream.data(2), 4); + if (crypt_size) { + PIByteArray crsz((uint)size_crypted_size); + memcpy(crsz.data(), stream.data(2), size_crypted_size); + crsz = decryptData(crsz); + if (crsz.size() < sizeof(sz)) { + if (aggressive_optimization) stream.clear(); + else stream.pop_front(); + continue; + } + crsz >> sz; + } else { + memcpy(&sz, stream.data(2), size_crypted_size); + } if (sz < 0) { - stream.pop_front(); + if (aggressive_optimization) stream.clear(); + else stream.pop_front(); continue; } - stream.remove(0, 6); + stream.remove(0, hdr_size); packet.clear(); packet_size = sz; if (packet_size == 0) @@ -189,6 +222,8 @@ void PIStreamPacker::received(const PIByteArray & data) { void PIStreamPacker::assignDevice(PIIODevice * dev) { if (!dev) return; + if (!dev->infoFlags()[PIIODevice::Reliable]) + piCoutObj << "Warning! Not recommended to use with non-reliable" << dev; CONNECTU(dev, threadedReadEvent, this, received); CONNECTU(this, sendRequest, dev, write); } diff --git a/libs/main/cloud/picloudserver.h b/libs/main/cloud/picloudserver.h index e363b288..79409c09 100644 --- a/libs/main/cloud/picloudserver.h +++ b/libs/main/cloud/picloudserver.h @@ -59,6 +59,8 @@ public: void setServerName(const PIString & server_name); + PIVector clients() const;; + EVENT1(newConnection, PICloudServer::Client * , client) protected: @@ -69,11 +71,13 @@ protected: private: EVENT_HANDLER1(void, _readed, PIByteArray &, ba); + EVENT_HANDLER1(void, clientDeleted, PIObject *, o); void clientDisconnect(uint client_id); int sendData(const PIByteArray & data, uint client_id); - PIVector clients; + PIVector clients_; PIMap index_clients; + mutable PIMutex clients_mutex; }; #endif // PICLOUDSERVER_H diff --git a/libs/main/core/piobject.cpp b/libs/main/core/piobject.cpp index 06fe88d9..9ce40b4a 100644 --- a/libs/main/core/piobject.cpp +++ b/libs/main/core/piobject.cpp @@ -405,7 +405,7 @@ void PIObject::piDisconnect(PIObject * src, const PIString & sig) { void PIObject::piDisconnect(PIObject * src) { - src->deleted(); + src->deleted(src); PIMutexLocker _ml(src->mutex_connect); PIVector cv = src->connectors.toVector(); piForeach (PIObject * o, cv) { diff --git a/libs/main/core/piobject.h b/libs/main/core/piobject.h index 939ebb0e..aaf7851f 100644 --- a/libs/main/core/piobject.h +++ b/libs/main/core/piobject.h @@ -414,14 +414,14 @@ protected: //! Virtual function executes after property with name "name" has been changed virtual void propertyChanged(const PIString & name) {} - EVENT(deleted) + EVENT1(deleted, PIObject *, o) //! \events //! \{ - /** \fn void deleted() + /** \fn void deleted(PIObject * o) * \brief Raise before object delete - * \note This event raised from destructor, so use only emitter() value, + * \note This event raised from destructor, so use only "o" value, * don`t try to cast deleted object to some subclass! */ //! \} diff --git a/libs/main/io_devices/pibinarylog.cpp b/libs/main/io_devices/pibinarylog.cpp index d346e32c..07552d2e 100644 --- a/libs/main/io_devices/pibinarylog.cpp +++ b/libs/main/io_devices/pibinarylog.cpp @@ -151,6 +151,8 @@ bool PIBinaryLog::openDevice() { bool PIBinaryLog::closeDevice() { + stopThreadedRead(); + pausemutex.unlock(); moveIndex(-1); is_indexed = false; index.clear(); @@ -177,8 +179,8 @@ bool PIBinaryLog::threadedRead(uchar *readed, int size) { case PlayRealTime: pausemutex.lock(); if (is_pause) { - piMSleep(100); pausemutex.unlock(); + piMSleep(100); return false; } else if (pause_time > PISystemTime()) { startlogtime += pause_time; diff --git a/libs/main/io_devices/pibinarylog.h b/libs/main/io_devices/pibinarylog.h index fa1252e8..dc73e675 100644 --- a/libs/main/io_devices/pibinarylog.h +++ b/libs/main/io_devices/pibinarylog.h @@ -289,6 +289,7 @@ protected: bool closeDevice(); void propertyChanged(const PIString &); bool threadedRead(uchar *readed, int size); + void threadedReadTerminated() {pausemutex.unlock();} DeviceInfoFlags deviceInfoFlags() const {return PIIODevice::Reliable;} private: diff --git a/libs/main/io_devices/piethernet.cpp b/libs/main/io_devices/piethernet.cpp index 98544bf1..1de2c930 100644 --- a/libs/main/io_devices/piethernet.cpp +++ b/libs/main/io_devices/piethernet.cpp @@ -806,9 +806,9 @@ PIIODevice::DeviceInfoFlags PIEthernet::deviceInfoFlags() const { } -void PIEthernet::clientDeleted() { +void PIEthernet::clientDeleted(PIObject * o) { clients_mutex.lock(); - clients_.removeOne((PIEthernet*)emitter()); + clients_.removeOne((PIEthernet*)o); clients_mutex.unlock(); } diff --git a/libs/main/io_devices/piethernet.h b/libs/main/io_devices/piethernet.h index 266134a0..4f80e90f 100644 --- a/libs/main/io_devices/piethernet.h +++ b/libs/main/io_devices/piethernet.h @@ -494,7 +494,7 @@ protected: PIStringList mcast_groups; private: - EVENT_HANDLER(void, clientDeleted); + EVENT_HANDLER1(void, clientDeleted, PIObject *, o); static void server_func(void * eth); void setType(Type t, bool reopen = true) {setProperty(PIStringAscii("type"), (int)t); if (reopen && isOpened()) {closeDevice(); init(); openDevice();}} diff --git a/libs/main/io_devices/piiodevice.cpp b/libs/main/io_devices/piiodevice.cpp index 9dea9dd6..e62e0dad 100644 --- a/libs/main/io_devices/piiodevice.cpp +++ b/libs/main/io_devices/piiodevice.cpp @@ -154,21 +154,62 @@ bool PIIODevice::setOption(PIIODevice::DeviceOption o, bool yes) { } -void PIIODevice::stopThreadedRead() { +bool stopThread(PIThread * t, bool hard) { #ifdef FREERTOS - PIThread::stop(true); + t->stop(true); #else - PIThread::terminate(); + if (hard) { + t->terminate(); + return true; + } else { + t->stop(); + if (t->waitForFinish(10000)) { + t->terminate(); + return true; + } + } #endif + return false; } -void PIIODevice::stopThreadedWrite() { -#ifdef FREERTOS - write_thread.stop(true); -#else - write_thread.terminate(); -#endif +void PIIODevice::stopThreadedRead(bool hard) { + if (stopThread(this, hard)) + threadedReadTerminated(); +} + + +void PIIODevice::stopThreadedWrite(bool hard) { + if (stopThread(&write_thread, hard)) + threadedWriteTerminated(); +} + + +void PIIODevice::clearThreadedWriteQueue() { + write_thread.lock(); + write_queue.clear(); + write_thread.unlock(); +} + + +void PIIODevice::start() { + startThreadedRead(); + startThreadedWrite(); +} + + +void PIIODevice::stop(bool hard) { + stopThreadedRead(hard); + stopThreadedWrite(hard); +} + + +PIByteArray PIIODevice::read(int max_size) { + buffer_in.resize(max_size); + int ret = readDevice(buffer_in.data(), max_size); + if (ret < 0) + return PIByteArray(); + return buffer_in.resized(ret); } @@ -183,7 +224,7 @@ void PIIODevice::_init() { setReopenTimeout(1000); #ifdef FREERTOS threaded_read_buffer_size = 512; -// setThreadedReadBufferSize(512); + //setThreadedReadBufferSize(512); #else threaded_read_buffer_size = 4096; #endif @@ -314,6 +355,41 @@ ullong PIIODevice::writeThreaded(const PIByteArray & data) { } +bool PIIODevice::open() { + if (!init_) init(); + buffer_tr.resize(threaded_read_buffer_size); + opened_ = openDevice(); + if (opened_) opened(); + return opened_; +} + + +bool PIIODevice::open(const PIString & _path) { + setPath(_path); + return open(); +} + + +bool PIIODevice::open(DeviceMode _mode) { + mode_ = _mode; + return open(); +} + + +bool PIIODevice::open(const PIString & _path, DeviceMode _mode) { + setPath(_path); + mode_ = _mode; + return open(); +} + + +bool PIIODevice::close() { + opened_ = !closeDevice(); + if (!opened_) closed(); + return !opened_; +} + + bool PIIODevice::configure(const PIString & config_file, const PIString & section, bool parent_section) { PIConfig conf(config_file, PIIODevice::ReadOnly); if (!conf.isOpened()) return false; diff --git a/libs/main/io_devices/piiodevice.h b/libs/main/io_devices/piiodevice.h index a9d1d227..3af7b587 100644 --- a/libs/main/io_devices/piiodevice.h +++ b/libs/main/io_devices/piiodevice.h @@ -175,10 +175,10 @@ public: void startThreadedRead() {if (!isRunning()) PIThread::start();} //! Start threaded read and assign "threaded read slot" to "func" - void startThreadedRead(ReadRetFunc func) {ret_func_ = func; if (!isRunning()) PIThread::start();} + void startThreadedRead(ReadRetFunc func) {ret_func_ = func; startThreadedRead();} - //! Stop threaded read - void stopThreadedRead(); + //! Stop threaded read. Hard stop terminate thread, otherwise wait fo 10 seconds + void stopThreadedRead(bool hard = true); //! Return \b true if threaded write is started @@ -187,25 +187,25 @@ public: //! Start threaded write void startThreadedWrite() {if (!write_thread.isRunning()) write_thread.startOnce();} - //! Stop threaded write - void stopThreadedWrite(); + //! Stop threaded write. Hard stop terminate thread, otherwise wait fo 10 seconds + void stopThreadedWrite(bool hard = true); //! Clear threaded write task queue - void clearThreadedWriteQueue() {write_thread.lock(); write_queue.clear(); write_thread.unlock();} + void clearThreadedWriteQueue(); //! Start both threaded read and threaded write - void start() {startThreadedRead(); startThreadedWrite();} + void start(); - //! Stop both threaded read and threaded write and if "wait" block until both threads are stop - void stop(bool wait = false) {stopThreadedRead(); stopThreadedWrite(); if (wait) while (write_thread.isRunning() || isRunning()) msleep(PIP_MIN_MSLEEP);} + //! Stop both threaded read and threaded write. Hard stop terminate threads, otherwise wait fo 10 seconds + void stop(bool hard = true); //! Read from device maximum "max_size" bytes to "read_to" int read(void * read_to, int max_size) {return readDevice(read_to, max_size);} //! Read from device maximum "max_size" bytes and return them as PIByteArray - PIByteArray read(int max_size) {buffer_in.resize(max_size); int ret = readDevice(buffer_in.data(), max_size); if (ret < 0) return PIByteArray(); return buffer_in.resized(ret);} + PIByteArray read(int max_size); //! Write maximum "max_size" bytes of "data" to device int write(const void * data, int max_size) {return writeDevice(data, max_size);} @@ -258,11 +258,11 @@ public: static PIStringList availablePrefixes(); - EVENT_HANDLER(bool, open) {if (!init_) init(); buffer_tr.resize(threaded_read_buffer_size); opened_ = openDevice(); if (opened_) opened(); return opened_;} - EVENT_HANDLER1(bool, open, const PIString &, _path) {setPath(_path); if (!init_) init(); buffer_tr.resize(threaded_read_buffer_size); opened_ = openDevice(); if (opened_) opened(); return opened_;} - bool open(DeviceMode _mode) {mode_ = _mode; if (!init_) init(); buffer_tr.resize(threaded_read_buffer_size); opened_ = openDevice(); if (opened_) opened(); return opened_;} - EVENT_HANDLER2(bool, open, const PIString &, _path, DeviceMode, _mode) {setPath(_path); mode_ = _mode; if (!init_) init(); buffer_tr.resize(threaded_read_buffer_size); opened_ = openDevice(); if (opened_) opened(); return opened_;} - EVENT_HANDLER(bool, close) {opened_ = !closeDevice(); if (!opened_) closed(); return !opened_;} + EVENT_HANDLER(bool, open); + EVENT_HANDLER1(bool, open, const PIString &, _path); + bool open(DeviceMode _mode); + EVENT_HANDLER2(bool, open, const PIString &, _path, DeviceMode, _mode); + EVENT_HANDLER(bool, close); EVENT_HANDLER1(int, write, PIByteArray, data) {return writeDevice(data.data(), data.size_s());} EVENT_VHANDLER(void, flush) {;} @@ -377,6 +377,12 @@ protected: //! Reimplement to apply new \a threadedReadBufferSize() virtual void threadedReadBufferSizeChanged() {;} + //! Invoked after hard read thread stop + virtual void threadedReadTerminated() {;} + + //! Invoked after hard write thread stop + virtual void threadedWriteTerminated() {;} + static PIIODevice * newDeviceByPrefix(const PIString & prefix); void terminate(); diff --git a/libs/main/io_utils/pistreampacker.h b/libs/main/io_utils/pistreampacker.h index 9b6d2fbd..ac879366 100644 --- a/libs/main/io_utils/pistreampacker.h +++ b/libs/main/io_utils/pistreampacker.h @@ -57,26 +57,38 @@ public: void setMaxPacketSize(int max_size) {max_packet_size = max_size;} //! Returns maximum size of single packet, default 1400 bytes - int maxPacketSize() {return max_packet_size;} + int maxPacketSize() const {return max_packet_size;} //! Set packet sinature void setPacketSign(ushort sign_) {packet_sign = sign_;} //! Returns packet sinature, default 0xAFBE - ushort packetSign() {return packet_sign;} + ushort packetSign() const {return packet_sign;} + + + //! Set receive aggressive optimization. If yes then %PIStreamPacker doesn`t + //! check every byte in incoming stream but check only begin of each read() + //! result. Default is \b true. + void setaAggressiveOptimization(bool yes) {aggressive_optimization = yes;} + + //! Returns aggressive optimization + bool aggressiveOptimization() const {return aggressive_optimization;} bool cryptFragmentationEnabled() const {return crypt_frag;} void setCryptFragmentationEnabled(bool on) {crypt_frag = on;} int cryptFragmentationSize() const {return crypt_frag_size;} void setCryptFragmentationSize(int size_) {crypt_frag_size = size_;} + bool cryptSizeEnabled() const {return crypt_size;} + void setCryptSizeEnabled(bool on); //! Prepare data for send and raise \a sendRequest() events void send(const PIByteArray & data); - //! Receive data part. If packet is ready, raise \a received() event + //! Receive data part. If packet is ready, raise \a packetReceiveEvent() event + //! and \a packetReceived() virtual method void received(const PIByteArray & data); EVENT_HANDLER2(void, received, uchar * , readed, int, size); @@ -123,10 +135,10 @@ protected: private: PIByteArray stream, packet; - bool crypt_frag; + bool crypt_frag, crypt_size, aggressive_optimization; int packet_size, crypt_frag_size; ushort packet_sign; - int max_packet_size; + int max_packet_size, size_crypted_size; Progress prog_s, prog_r; mutable PIMutex prog_s_mutex, prog_r_mutex;