From 9438ab4e533f8a9cc54cd8107125e28723d8d1a4 Mon Sep 17 00:00:00 2001 From: peri4 Date: Sun, 23 Oct 2022 16:02:09 +0300 Subject: [PATCH] PIIODevice threaded read refactoring --- CMakeLists.txt | 2 +- libs/cloud/picloudclient.cpp | 6 +- libs/cloud/picloudserver.cpp | 4 +- libs/main/io_devices/pibinarylog.cpp | 2 +- libs/main/io_devices/piethernet.cpp | 2 +- libs/main/io_devices/piiodevice.cpp | 180 ++++++++++++-------- libs/main/io_devices/piiodevice.h | 70 ++++---- libs/main/io_devices/piserial.cpp | 9 +- utils/cloud_dispatcher/dispatcherclient.cpp | 3 +- 9 files changed, 157 insertions(+), 121 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 418ccc99..fb832128 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.0) cmake_policy(SET CMP0017 NEW) # need include() with .cmake project(PIP) set(PIP_MAJOR 3) -set(PIP_MINOR 3) +set(PIP_MINOR 4) set(PIP_REVISION 0) set(PIP_SUFFIX ) set(PIP_COMPANY SHS) diff --git a/libs/cloud/picloudclient.cpp b/libs/cloud/picloudclient.cpp index 3b811af1..cf3f0532 100644 --- a/libs/cloud/picloudclient.cpp +++ b/libs/cloud/picloudclient.cpp @@ -33,7 +33,7 @@ PICloudClient::PICloudClient(const PIString & path, PIIODevice::DeviceMode mode) if (is_deleted) return; bool need_disconn = is_connected; //piCoutObj << "eth disconnected"; - static_cast(ð)->stop(); + eth.stopThreadedRead(); opened_ = false; internalDisconnect(); if (need_disconn) @@ -45,7 +45,7 @@ PICloudClient::PICloudClient(const PIString & path, PIIODevice::DeviceMode mode) PICloudClient::~PICloudClient() { //piCoutObj << "~PICloudClient()"; - PIThread::stop(); + softStopThreadedRead(); //eth.close(); //if (is_connected) disconnected(); close(); @@ -155,7 +155,7 @@ void PICloudClient::_readed(PIByteArray & ba) { } break; case PICloud::TCP::Disconnect: - static_cast(ð)->stop(); + eth.stopThreadedRead(); opened_ = false; eth.close(); break; diff --git a/libs/cloud/picloudserver.cpp b/libs/cloud/picloudserver.cpp index 9f372509..74698379 100644 --- a/libs/cloud/picloudserver.cpp +++ b/libs/cloud/picloudserver.cpp @@ -29,7 +29,7 @@ PICloudServer::PICloudServer(const PIString & path, PIIODevice::DeviceMode mode) CONNECTL(ð, connected, [this](){opened_ = true; piCoutObj << "connected"; tcp.sendStart();}); CONNECTL(ð, disconnected, [this](bool){ piCoutObj << "disconnected"; - static_cast(ð)->stop(); + eth.stopThreadedRead(); opened_ = false; ping_timer.stop(false); piMSleep(100); @@ -135,7 +135,7 @@ bool PICloudServer::Client::openDevice() { bool PICloudServer::Client::closeDevice() { - PIThread::stop(false); + softStopThreadedRead(); if (is_connected) { server->clientDisconnect(client_id); is_connected = false; diff --git a/libs/main/io_devices/pibinarylog.cpp b/libs/main/io_devices/pibinarylog.cpp index d220b7f2..f3a7ad74 100644 --- a/libs/main/io_devices/pibinarylog.cpp +++ b/libs/main/io_devices/pibinarylog.cpp @@ -436,7 +436,7 @@ ssize_t PIBinaryLog::writeDevice(const void * data, ssize_t size) { void PIBinaryLog::restart() { - bool th = isRunning(); + bool th = isThreadedRead(); if (th) stopThreadedRead(); if (!canRead()) return; logmutex.unlock(); diff --git a/libs/main/io_devices/piethernet.cpp b/libs/main/io_devices/piethernet.cpp index a4592351..afa1ea04 100644 --- a/libs/main/io_devices/piethernet.cpp +++ b/libs/main/io_devices/piethernet.cpp @@ -268,7 +268,7 @@ void PIEthernet::construct() { #else setThreadedReadBufferSize(65536); #endif - setPriority(piHigh); + //setPriority(piHigh); } diff --git a/libs/main/io_devices/piiodevice.cpp b/libs/main/io_devices/piiodevice.cpp index 19ac35cf..540a2e3d 100644 --- a/libs/main/io_devices/piiodevice.cpp +++ b/libs/main/io_devices/piiodevice.cpp @@ -116,14 +116,14 @@ PIMutex PIIODevice::nfp_mutex; PIMap PIIODevice::nfp_cache; -PIIODevice::PIIODevice(): PIThread() { +PIIODevice::PIIODevice(): PIObject() { mode_ = ReadOnly; _init(); setPath(PIString()); } -PIIODevice::PIIODevice(const PIString & path, PIIODevice::DeviceMode mode): PIThread() { +PIIODevice::PIIODevice(const PIString & path, PIIODevice::DeviceMode mode): PIObject() { mode_ = mode; _init(); setPath(path); @@ -149,6 +149,18 @@ bool PIIODevice::setOption(PIIODevice::DeviceOption o, bool yes) { } +void PIIODevice::setReopenEnabled(bool yes) { + setProperty("reopenEnabled", yes); + reopen_enabled = yes; +} + + +void PIIODevice::setReopenTimeout(int msecs) { + setProperty("reopenTimeout", msecs); + reopen_timeout = msecs; +} + + //! \~\details //! \~english //! Set external static function of threaded read that will be executed @@ -177,34 +189,72 @@ void PIIODevice::setThreadedReadBufferSize(int new_size) { } -bool stopThread(PIThread * t, bool hard) { +bool PIIODevice::isThreadedRead() const { + return read_thread.isRunning(); +} + + +void PIIODevice::startThreadedRead() { + if (!read_thread.isRunning()) { + buffer_tr.resize(threaded_read_buffer_size); + read_thread.start(); + } +} + + +void PIIODevice::startThreadedRead(ReadRetFunc func) { + func_read = func; + startThreadedRead(); +} + + +void PIIODevice::stopThreadedRead() { + if (!isThreadedRead()) return; #ifdef MICRO_PIP - t->stop(true); + read_thread.stop(true); #else - if (hard) { - t->terminate(); - return true; + if (reading_now) { + read_thread.terminate(); + reading_now = false; } else { - t->stop(); - if (!t->waitForFinish(10000)) { - t->terminate(); - return true; - } + read_thread.stop(); } #endif - return false; } -void PIIODevice::stopThreadedRead(bool hard) { - if (stopThread(this, hard)) - threadedReadTerminated(); +bool PIIODevice::waitThreadedReadFinished(int timeout_ms) { + return read_thread.waitForFinish(timeout_ms); } -void PIIODevice::stopThreadedWrite(bool hard) { - if (stopThread(&write_thread, hard)) +bool PIIODevice::isThreadedWrite() const { + return write_thread.isRunning(); +} + + +void PIIODevice::startThreadedWrite() { + if (!write_thread.isRunning()) + write_thread.startOnce(); +} + + +void PIIODevice::stopThreadedWrite() { + if (!write_thread.isRunning()) return; +#ifdef MICRO_PIP + write_thread.stop(true); +#else + write_thread.stop(); + if (!write_thread.waitForFinish(100)) { + write_thread.terminate(); threadedWriteTerminated(); + } +#endif +} + + +bool PIIODevice::waitThreadedWriteFinished(int timeout_ms) { + return write_thread.waitForFinish(timeout_ms); } @@ -221,16 +271,31 @@ void PIIODevice::start() { } -void PIIODevice::stop(bool hard) { - stopThreadedRead(hard); - stopThreadedWrite(hard); +void PIIODevice::stop() { + stopThreadedRead(); + stopThreadedWrite(); +} + + +ssize_t PIIODevice::read(void * read_to, ssize_t max_size) { + reading_now = true; + ssize_t ret = readDevice(read_to, max_size); + reading_now = false; + return ret; +} + + +ssize_t PIIODevice::read(PIMemoryBlock mb) { + return read(mb.data(), mb.size()); } PIByteArray PIIODevice::read(ssize_t max_size) { if (max_size <= 0) return PIByteArray(); buffer_in.resize(max_size); + reading_now = true; ssize_t ret = readDevice(buffer_in.data(), max_size); + reading_now = false; if (ret < 0) return PIByteArray(); return buffer_in.resized(ret); } @@ -243,6 +308,7 @@ ssize_t PIIODevice::write(const void * data, ssize_t max_size) { void PIIODevice::_init() { + reading_now = false; opened_ = init_ = thread_started_ = false; raise_threaded_read_ = true; func_read = nullptr; @@ -257,19 +323,10 @@ void PIIODevice::_init() { #else threaded_read_buffer_size = 4096; #endif - timer.setName("__S__.PIIODevice.reopen_timer"); + read_thread .setName("__S__.PIIODevice.read_thread" ); write_thread.setName("__S__.PIIODevice.write_thread"); - CONNECT2(void, void * , int, &timer, tickEvent, this, check_start); CONNECT(void, &write_thread, started, this, write_func); -} - - -void PIIODevice::check_start(void * data, int delim) { - //cout << "check " << tread_started_ << endl; - if (open()) { - thread_started_ = true; - timer.stop(false); - } + read_thread.setSlot([this](void*){read_func();}); } @@ -297,52 +354,21 @@ PIIODevice * PIIODevice::newDeviceByPrefix(const char * prefix) { } -void PIIODevice::terminate() { - timer.stop(); - thread_started_ = false; - if (!init_) return; - if (isRunning()) { -#ifdef MICRO_PIP - stop(true); -#else - stop(); - PIThread::terminate(); -#endif - } -} - - -void PIIODevice::begin() { - //cout << " begin\n"; - buffer_tr.resize(threaded_read_buffer_size); - if (threaded_read_buffer_size == 0) - piCoutObj << "Warning: threadedReadBufferSize() == 0, read may be useless!"; - thread_started_ = false; - if (!opened_) { - if (open()) { - thread_started_ = true; - //cout << " open && ok\n"; - return; - } - } else { - thread_started_ = true; - //cout << " ok\n"; - return; - } - if (!timer.isRunning() && isReopenEnabled()) timer.start(reopenTimeout()); -} - - -void PIIODevice::run() { +void PIIODevice::read_func() { if (!isReadable()) { - //cout << "not readable\n"; - PIThread::stop(); + read_thread.stop(); return; } - if (!thread_started_) { + if (!isOpened()) { piMSleep(10); - //cout << "not started\n"; - return; + bool ok = false; + if (reopen_enabled) { + if (reopen_tm.elapsed_m() >= reopen_timeout) { + reopen_tm.reset(); + ok = open(); + } + } + if (!ok) return; } readed_ = read(buffer_tr.data(), buffer_tr.size_s()); if (readed_ <= 0) { @@ -415,6 +441,7 @@ bool PIIODevice::close() { return !opened_; } + ssize_t PIIODevice::write(PIByteArray data) { return writeDevice(data.data(), data.size_s()); } @@ -620,3 +647,8 @@ void PIIODevice::configureFromVariantDevice(const PIPropertyStorage & d) { setPath(d.propertyValueByName("path").toString()); } + +void PIIODevice::softStopThreadedRead() { + read_thread.stop(); +} + diff --git a/libs/main/io_devices/piiodevice.h b/libs/main/io_devices/piiodevice.h index 503f1f70..f5f0a3cf 100644 --- a/libs/main/io_devices/piiodevice.h +++ b/libs/main/io_devices/piiodevice.h @@ -76,9 +76,9 @@ typedef std::function ReadRetFunc; //! \~\brief //! \~english Base class for input/output devices. //! \~russian Базовый класс утройств ввода/вывода. -class PIP_EXPORT PIIODevice: public PIThread +class PIP_EXPORT PIIODevice: public PIObject { - PIOBJECT_SUBCLASS(PIIODevice, PIThread); + PIOBJECT_SUBCLASS(PIIODevice, PIObject); friend void __DevicePool_threadReadDP(void * ddp); public: NO_COPY_CLASS(PIIODevice); @@ -187,12 +187,11 @@ public: //! \~english Set calling of \a open() enabled while threaded read on closed device //! \~russian Устанавливает возможность вызова \a open() при потоковом чтении на закрытом устройстве - void setReopenEnabled(bool yes = true) {setProperty("reopenEnabled", yes);} + void setReopenEnabled(bool yes = true); - //! //! \~english Set timeout in milliseconds between \a open() tryings if reopen is enabled //! \~russian Устанавливает задержку в миллисекундах между вызовами \a open() если переоткрытие активно - void setReopenTimeout(int msecs) {setProperty("reopenTimeout", msecs);} + void setReopenTimeout(int msecs); //! \~english Returns reopen enable //! \~russian Возвращает активно ли переоткрытие @@ -230,33 +229,41 @@ public: //! \~english Returns if threaded read is started //! \~russian Возвращает запущен ли поток чтения - bool isThreadedRead() const {return isRunning();} + bool isThreadedRead() const; //! \~english Start threaded read //! \~russian Запускает потоковое чтение - void startThreadedRead() {if (!isRunning()) PIThread::start();} + void startThreadedRead(); //! \~english Start threaded read and assign threaded read callback to "func" //! \~russian Запускает потоковое чтение и устанавливает callback потокового чтения в "func" - void startThreadedRead(ReadRetFunc func) {func_read = func; startThreadedRead();} + void startThreadedRead(ReadRetFunc func); - //! \~english Stop threaded read. Hard stop terminate thread, otherwise wait fo 10 seconds - //! \~russian Останавливает потоковое чтение. Жесткая остановка убивает поток, иначе ожидает 10 секунд - void stopThreadedRead(bool hard = true); + //! \~english Stop threaded read. + //! \~russian Останавливает потоковое чтение. + void stopThreadedRead(); + + //! \~english Wait for threaded read finish no longer than "timeout_ms" milliseconds. + //! \~russian Ожидает завершения потокового чтения в течении не более "timeout_ms" миллисекунд. + bool waitThreadedReadFinished(int timeout_ms = -1); //! \~english Returns if threaded write is started //! \~russian Возвращает запущен ли поток записи - bool isThreadedWrite() const {return write_thread.isRunning();} + bool isThreadedWrite() const; //! \~english Start threaded write //! \~russian Запускает потоковую запись - void startThreadedWrite() {if (!write_thread.isRunning()) write_thread.startOnce();} - - //! \~english Stop threaded write. Hard stop terminate thread, otherwise wait fo 10 seconds - //! \~russian Останавливает потоковую запись. Жесткая остановка убивает поток, иначе ожидает 10 секунд - void stopThreadedWrite(bool hard = true); + void startThreadedWrite(); + //! \~english Stop threaded write. + //! \~russian Останавливает потоковую запись. + void stopThreadedWrite(); + + //! \~english Wait for threaded write finish no longer than "timeout_ms" milliseconds. + //! \~russian Ожидает завершения потоковой записи в течении не более "timeout_ms" миллисекунд. + bool waitThreadedWriteFinished(int timeout_ms = -1); + //! \~english Clear threaded write task queue //! \~russian Очищает очередь потоковой записи void clearThreadedWriteQueue(); @@ -266,18 +273,18 @@ public: //! \~russian Запускает потоковое чтение и запись void start(); - //! \~english Stop both threaded read and threaded write. Hard stop terminate threads, otherwise wait fo 10 seconds - //! \~russian Останавливает потоковое чтение и запись. Жесткая остановка убивает потоки, иначе ожидает 10 секунд - void stop(bool hard = true); + //! \~english Stop both threaded read and threaded write. + //! \~russian Останавливает потоковое чтение и запись. + void stop(); //! \~english Read from device maximum "max_size" bytes to "read_to" //! \~russian Читает из устройства не более "max_size" байт в "read_to" - ssize_t read(void * read_to, ssize_t max_size) {return readDevice(read_to, max_size);} + ssize_t read(void * read_to, ssize_t max_size); //! \~english Read from device to memory block "mb" //! \~russian Читает из устройства в блок памяти "mb" - ssize_t read(PIMemoryBlock mb) {return readDevice(mb.data(), mb.size());} + ssize_t read(PIMemoryBlock mb); //! \~english Read from device maximum "max_size" bytes and returns them as PIByteArray //! \~russian Читает из устройства не более "max_size" байт и возвращает данные как PIByteArray @@ -531,10 +538,10 @@ protected: //! \~russian Вызывается после жесткой остановки потока записи virtual void threadedWriteTerminated() {;} + void softStopThreadedRead(); + static PIIODevice * newDeviceByPrefix(const char * prefix); - void terminate(); - DeviceMode mode_; DeviceOptions options_; @@ -543,27 +550,24 @@ protected: void * ret_data_; private: - EVENT_HANDLER2(void, check_start, void * , data, int, delim); + EVENT_HANDLER(void, read_func); EVENT_HANDLER(void, write_func); virtual PIIODevice * copy() const {return nullptr;} PIString fullPathOptions() const; void _init(); - void begin() override; - void run() override; - void end() override {terminate();} static void cacheFullPath(const PIString & full_path, const PIIODevice * d); static PIMap & fabrics(); - PITimer timer; - PITimeMeasurer tm; - PIThread write_thread; + PITimeMeasurer tm, reopen_tm; + PIThread read_thread, write_thread; PIByteArray buffer_in, buffer_tr; PIQueue > write_queue; ullong tri; ssize_t readed_; - uint threaded_read_buffer_size; - bool init_, thread_started_, raise_threaded_read_; + uint threaded_read_buffer_size, reopen_timeout = 1000; + std::atomic_bool reading_now; + bool init_, thread_started_, raise_threaded_read_, reopen_enabled = true; static PIMutex nfp_mutex; static PIMap nfp_cache; diff --git a/libs/main/io_devices/piserial.cpp b/libs/main/io_devices/piserial.cpp index 44322f64..644c90bf 100644 --- a/libs/main/io_devices/piserial.cpp +++ b/libs/main/io_devices/piserial.cpp @@ -219,7 +219,7 @@ void PISerial::construct() { PRIVATE->hCom = 0; #endif fd = -1; - setPriority(piHigh); + //setPriority(piHigh); vtime = 10; sending = false; setParameters(0); @@ -680,9 +680,8 @@ bool PISerial::openDevice() { bool PISerial::closeDevice() { - if (isRunning() && !isStopping()) { - stop(); - PIThread::terminate(); + if (isThreadedRead()) { + stopThreadedRead(); } if (fd != -1) { #ifdef WINDOWS @@ -803,7 +802,7 @@ ssize_t PISerial::readDevice(void * read_to, ssize_t max_size) { DWORD err = GetLastError(); //piCout << err; if (err == ERROR_BAD_COMMAND || err == ERROR_ACCESS_DENIED) { - PIThread::stop(false); + softStopThreadedRead(); close(); return 0; } diff --git a/utils/cloud_dispatcher/dispatcherclient.cpp b/utils/cloud_dispatcher/dispatcherclient.cpp index 38a9c273..9e6118a9 100644 --- a/utils/cloud_dispatcher/dispatcherclient.cpp +++ b/utils/cloud_dispatcher/dispatcherclient.cpp @@ -25,8 +25,9 @@ PIString DispatcherClient::address() { return eth->path(); } + void DispatcherClient::close() { - static_cast(eth)->stop(false); + eth->stopThreadedRead(); eth->close(); }