PIIODevice threaded read refactoring

This commit is contained in:
2022-10-23 16:02:09 +03:00
parent e5777dde6c
commit 9438ab4e53
9 changed files with 157 additions and 121 deletions

View File

@@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.0)
cmake_policy(SET CMP0017 NEW) # need include() with .cmake
project(PIP)
set(PIP_MAJOR 3)
set(PIP_MINOR 3)
set(PIP_MINOR 4)
set(PIP_REVISION 0)
set(PIP_SUFFIX )
set(PIP_COMPANY SHS)

View File

@@ -33,7 +33,7 @@ PICloudClient::PICloudClient(const PIString & path, PIIODevice::DeviceMode mode)
if (is_deleted) return;
bool need_disconn = is_connected;
//piCoutObj << "eth disconnected";
static_cast<PIThread*>(&eth)->stop();
eth.stopThreadedRead();
opened_ = false;
internalDisconnect();
if (need_disconn)
@@ -45,7 +45,7 @@ PICloudClient::PICloudClient(const PIString & path, PIIODevice::DeviceMode mode)
PICloudClient::~PICloudClient() {
//piCoutObj << "~PICloudClient()";
PIThread::stop();
softStopThreadedRead();
//eth.close();
//if (is_connected) disconnected();
close();
@@ -155,7 +155,7 @@ void PICloudClient::_readed(PIByteArray & ba) {
}
break;
case PICloud::TCP::Disconnect:
static_cast<PIThread*>(&eth)->stop();
eth.stopThreadedRead();
opened_ = false;
eth.close();
break;

View File

@@ -29,7 +29,7 @@ PICloudServer::PICloudServer(const PIString & path, PIIODevice::DeviceMode mode)
CONNECTL(&eth, connected, [this](){opened_ = true; piCoutObj << "connected"; tcp.sendStart();});
CONNECTL(&eth, disconnected, [this](bool){
piCoutObj << "disconnected";
static_cast<PIThread*>(&eth)->stop();
eth.stopThreadedRead();
opened_ = false;
ping_timer.stop(false);
piMSleep(100);
@@ -135,7 +135,7 @@ bool PICloudServer::Client::openDevice() {
bool PICloudServer::Client::closeDevice() {
PIThread::stop(false);
softStopThreadedRead();
if (is_connected) {
server->clientDisconnect(client_id);
is_connected = false;

View File

@@ -436,7 +436,7 @@ ssize_t PIBinaryLog::writeDevice(const void * data, ssize_t size) {
void PIBinaryLog::restart() {
bool th = isRunning();
bool th = isThreadedRead();
if (th) stopThreadedRead();
if (!canRead()) return;
logmutex.unlock();

View File

@@ -268,7 +268,7 @@ void PIEthernet::construct() {
#else
setThreadedReadBufferSize(65536);
#endif
setPriority(piHigh);
//setPriority(piHigh);
}

View File

@@ -116,14 +116,14 @@ PIMutex PIIODevice::nfp_mutex;
PIMap<PIString, PIString> PIIODevice::nfp_cache;
PIIODevice::PIIODevice(): PIThread() {
PIIODevice::PIIODevice(): PIObject() {
mode_ = ReadOnly;
_init();
setPath(PIString());
}
PIIODevice::PIIODevice(const PIString & path, PIIODevice::DeviceMode mode): PIThread() {
PIIODevice::PIIODevice(const PIString & path, PIIODevice::DeviceMode mode): PIObject() {
mode_ = mode;
_init();
setPath(path);
@@ -149,6 +149,18 @@ bool PIIODevice::setOption(PIIODevice::DeviceOption o, bool yes) {
}
void PIIODevice::setReopenEnabled(bool yes) {
setProperty("reopenEnabled", yes);
reopen_enabled = yes;
}
void PIIODevice::setReopenTimeout(int msecs) {
setProperty("reopenTimeout", msecs);
reopen_timeout = msecs;
}
//! \~\details
//! \~english
//! Set external static function of threaded read that will be executed
@@ -177,34 +189,72 @@ void PIIODevice::setThreadedReadBufferSize(int new_size) {
}
bool stopThread(PIThread * t, bool hard) {
#ifdef MICRO_PIP
t->stop(true);
#else
if (hard) {
t->terminate();
return true;
} else {
t->stop();
if (!t->waitForFinish(10000)) {
t->terminate();
return true;
bool PIIODevice::isThreadedRead() const {
return read_thread.isRunning();
}
void PIIODevice::startThreadedRead() {
if (!read_thread.isRunning()) {
buffer_tr.resize(threaded_read_buffer_size);
read_thread.start();
}
}
void PIIODevice::startThreadedRead(ReadRetFunc func) {
func_read = func;
startThreadedRead();
}
void PIIODevice::stopThreadedRead() {
if (!isThreadedRead()) return;
#ifdef MICRO_PIP
read_thread.stop(true);
#else
if (reading_now) {
read_thread.terminate();
reading_now = false;
} else {
read_thread.stop();
}
#endif
return false;
}
void PIIODevice::stopThreadedRead(bool hard) {
if (stopThread(this, hard))
threadedReadTerminated();
bool PIIODevice::waitThreadedReadFinished(int timeout_ms) {
return read_thread.waitForFinish(timeout_ms);
}
void PIIODevice::stopThreadedWrite(bool hard) {
if (stopThread(&write_thread, hard))
bool PIIODevice::isThreadedWrite() const {
return write_thread.isRunning();
}
void PIIODevice::startThreadedWrite() {
if (!write_thread.isRunning())
write_thread.startOnce();
}
void PIIODevice::stopThreadedWrite() {
if (!write_thread.isRunning()) return;
#ifdef MICRO_PIP
write_thread.stop(true);
#else
write_thread.stop();
if (!write_thread.waitForFinish(100)) {
write_thread.terminate();
threadedWriteTerminated();
}
#endif
}
bool PIIODevice::waitThreadedWriteFinished(int timeout_ms) {
return write_thread.waitForFinish(timeout_ms);
}
@@ -221,16 +271,31 @@ void PIIODevice::start() {
}
void PIIODevice::stop(bool hard) {
stopThreadedRead(hard);
stopThreadedWrite(hard);
void PIIODevice::stop() {
stopThreadedRead();
stopThreadedWrite();
}
ssize_t PIIODevice::read(void * read_to, ssize_t max_size) {
reading_now = true;
ssize_t ret = readDevice(read_to, max_size);
reading_now = false;
return ret;
}
ssize_t PIIODevice::read(PIMemoryBlock mb) {
return read(mb.data(), mb.size());
}
PIByteArray PIIODevice::read(ssize_t max_size) {
if (max_size <= 0) return PIByteArray();
buffer_in.resize(max_size);
reading_now = true;
ssize_t ret = readDevice(buffer_in.data(), max_size);
reading_now = false;
if (ret < 0) return PIByteArray();
return buffer_in.resized(ret);
}
@@ -243,6 +308,7 @@ ssize_t PIIODevice::write(const void * data, ssize_t max_size) {
void PIIODevice::_init() {
reading_now = false;
opened_ = init_ = thread_started_ = false;
raise_threaded_read_ = true;
func_read = nullptr;
@@ -257,19 +323,10 @@ void PIIODevice::_init() {
#else
threaded_read_buffer_size = 4096;
#endif
timer.setName("__S__.PIIODevice.reopen_timer");
read_thread .setName("__S__.PIIODevice.read_thread" );
write_thread.setName("__S__.PIIODevice.write_thread");
CONNECT2(void, void * , int, &timer, tickEvent, this, check_start);
CONNECT(void, &write_thread, started, this, write_func);
}
void PIIODevice::check_start(void * data, int delim) {
//cout << "check " << tread_started_ << endl;
if (open()) {
thread_started_ = true;
timer.stop(false);
}
read_thread.setSlot([this](void*){read_func();});
}
@@ -297,52 +354,21 @@ PIIODevice * PIIODevice::newDeviceByPrefix(const char * prefix) {
}
void PIIODevice::terminate() {
timer.stop();
thread_started_ = false;
if (!init_) return;
if (isRunning()) {
#ifdef MICRO_PIP
stop(true);
#else
stop();
PIThread::terminate();
#endif
}
}
void PIIODevice::begin() {
//cout << " begin\n";
buffer_tr.resize(threaded_read_buffer_size);
if (threaded_read_buffer_size == 0)
piCoutObj << "Warning: threadedReadBufferSize() == 0, read may be useless!";
thread_started_ = false;
if (!opened_) {
if (open()) {
thread_started_ = true;
//cout << " open && ok\n";
return;
}
} else {
thread_started_ = true;
//cout << " ok\n";
return;
}
if (!timer.isRunning() && isReopenEnabled()) timer.start(reopenTimeout());
}
void PIIODevice::run() {
void PIIODevice::read_func() {
if (!isReadable()) {
//cout << "not readable\n";
PIThread::stop();
read_thread.stop();
return;
}
if (!thread_started_) {
if (!isOpened()) {
piMSleep(10);
//cout << "not started\n";
return;
bool ok = false;
if (reopen_enabled) {
if (reopen_tm.elapsed_m() >= reopen_timeout) {
reopen_tm.reset();
ok = open();
}
}
if (!ok) return;
}
readed_ = read(buffer_tr.data(), buffer_tr.size_s());
if (readed_ <= 0) {
@@ -415,6 +441,7 @@ bool PIIODevice::close() {
return !opened_;
}
ssize_t PIIODevice::write(PIByteArray data) {
return writeDevice(data.data(), data.size_s());
}
@@ -620,3 +647,8 @@ void PIIODevice::configureFromVariantDevice(const PIPropertyStorage & d) {
setPath(d.propertyValueByName("path").toString());
}
void PIIODevice::softStopThreadedRead() {
read_thread.stop();
}

View File

@@ -76,9 +76,9 @@ typedef std::function<bool(const uchar *, int, void *)> ReadRetFunc;
//! \~\brief
//! \~english Base class for input/output devices.
//! \~russian Базовый класс утройств ввода/вывода.
class PIP_EXPORT PIIODevice: public PIThread
class PIP_EXPORT PIIODevice: public PIObject
{
PIOBJECT_SUBCLASS(PIIODevice, PIThread);
PIOBJECT_SUBCLASS(PIIODevice, PIObject);
friend void __DevicePool_threadReadDP(void * ddp);
public:
NO_COPY_CLASS(PIIODevice);
@@ -187,12 +187,11 @@ public:
//! \~english Set calling of \a open() enabled while threaded read on closed device
//! \~russian Устанавливает возможность вызова \a open() при потоковом чтении на закрытом устройстве
void setReopenEnabled(bool yes = true) {setProperty("reopenEnabled", yes);}
void setReopenEnabled(bool yes = true);
//!
//! \~english Set timeout in milliseconds between \a open() tryings if reopen is enabled
//! \~russian Устанавливает задержку в миллисекундах между вызовами \a open() если переоткрытие активно
void setReopenTimeout(int msecs) {setProperty("reopenTimeout", msecs);}
void setReopenTimeout(int msecs);
//! \~english Returns reopen enable
//! \~russian Возвращает активно ли переоткрытие
@@ -230,32 +229,40 @@ public:
//! \~english Returns if threaded read is started
//! \~russian Возвращает запущен ли поток чтения
bool isThreadedRead() const {return isRunning();}
bool isThreadedRead() const;
//! \~english Start threaded read
//! \~russian Запускает потоковое чтение
void startThreadedRead() {if (!isRunning()) PIThread::start();}
void startThreadedRead();
//! \~english Start threaded read and assign threaded read callback to "func"
//! \~russian Запускает потоковое чтение и устанавливает callback потокового чтения в "func"
void startThreadedRead(ReadRetFunc func) {func_read = func; startThreadedRead();}
void startThreadedRead(ReadRetFunc func);
//! \~english Stop threaded read. Hard stop terminate thread, otherwise wait fo 10 seconds
//! \~russian Останавливает потоковое чтение. Жесткая остановка убивает поток, иначе ожидает 10 секунд
void stopThreadedRead(bool hard = true);
//! \~english Stop threaded read.
//! \~russian Останавливает потоковое чтение.
void stopThreadedRead();
//! \~english Wait for threaded read finish no longer than "timeout_ms" milliseconds.
//! \~russian Ожидает завершения потокового чтения в течении не более "timeout_ms" миллисекунд.
bool waitThreadedReadFinished(int timeout_ms = -1);
//! \~english Returns if threaded write is started
//! \~russian Возвращает запущен ли поток записи
bool isThreadedWrite() const {return write_thread.isRunning();}
bool isThreadedWrite() const;
//! \~english Start threaded write
//! \~russian Запускает потоковую запись
void startThreadedWrite() {if (!write_thread.isRunning()) write_thread.startOnce();}
void startThreadedWrite();
//! \~english Stop threaded write. Hard stop terminate thread, otherwise wait fo 10 seconds
//! \~russian Останавливает потоковую запись. Жесткая остановка убивает поток, иначе ожидает 10 секунд
void stopThreadedWrite(bool hard = true);
//! \~english Stop threaded write.
//! \~russian Останавливает потоковую запись.
void stopThreadedWrite();
//! \~english Wait for threaded write finish no longer than "timeout_ms" milliseconds.
//! \~russian Ожидает завершения потоковой записи в течении не более "timeout_ms" миллисекунд.
bool waitThreadedWriteFinished(int timeout_ms = -1);
//! \~english Clear threaded write task queue
//! \~russian Очищает очередь потоковой записи
@@ -266,18 +273,18 @@ public:
//! \~russian Запускает потоковое чтение и запись
void start();
//! \~english Stop both threaded read and threaded write. Hard stop terminate threads, otherwise wait fo 10 seconds
//! \~russian Останавливает потоковое чтение и запись. Жесткая остановка убивает потоки, иначе ожидает 10 секунд
void stop(bool hard = true);
//! \~english Stop both threaded read and threaded write.
//! \~russian Останавливает потоковое чтение и запись.
void stop();
//! \~english Read from device maximum "max_size" bytes to "read_to"
//! \~russian Читает из устройства не более "max_size" байт в "read_to"
ssize_t read(void * read_to, ssize_t max_size) {return readDevice(read_to, max_size);}
ssize_t read(void * read_to, ssize_t max_size);
//! \~english Read from device to memory block "mb"
//! \~russian Читает из устройства в блок памяти "mb"
ssize_t read(PIMemoryBlock mb) {return readDevice(mb.data(), mb.size());}
ssize_t read(PIMemoryBlock mb);
//! \~english Read from device maximum "max_size" bytes and returns them as PIByteArray
//! \~russian Читает из устройства не более "max_size" байт и возвращает данные как PIByteArray
@@ -531,9 +538,9 @@ protected:
//! \~russian Вызывается после жесткой остановки потока записи
virtual void threadedWriteTerminated() {;}
static PIIODevice * newDeviceByPrefix(const char * prefix);
void softStopThreadedRead();
void terminate();
static PIIODevice * newDeviceByPrefix(const char * prefix);
DeviceMode mode_;
@@ -543,27 +550,24 @@ protected:
void * ret_data_;
private:
EVENT_HANDLER2(void, check_start, void * , data, int, delim);
EVENT_HANDLER(void, read_func);
EVENT_HANDLER(void, write_func);
virtual PIIODevice * copy() const {return nullptr;}
PIString fullPathOptions() const;
void _init();
void begin() override;
void run() override;
void end() override {terminate();}
static void cacheFullPath(const PIString & full_path, const PIIODevice * d);
static PIMap<PIConstChars, FabricInfo> & fabrics();
PITimer timer;
PITimeMeasurer tm;
PIThread write_thread;
PITimeMeasurer tm, reopen_tm;
PIThread read_thread, write_thread;
PIByteArray buffer_in, buffer_tr;
PIQueue<PIPair<PIByteArray, ullong> > write_queue;
ullong tri;
ssize_t readed_;
uint threaded_read_buffer_size;
bool init_, thread_started_, raise_threaded_read_;
uint threaded_read_buffer_size, reopen_timeout = 1000;
std::atomic_bool reading_now;
bool init_, thread_started_, raise_threaded_read_, reopen_enabled = true;
static PIMutex nfp_mutex;
static PIMap<PIString, PIString> nfp_cache;

View File

@@ -219,7 +219,7 @@ void PISerial::construct() {
PRIVATE->hCom = 0;
#endif
fd = -1;
setPriority(piHigh);
//setPriority(piHigh);
vtime = 10;
sending = false;
setParameters(0);
@@ -680,9 +680,8 @@ bool PISerial::openDevice() {
bool PISerial::closeDevice() {
if (isRunning() && !isStopping()) {
stop();
PIThread::terminate();
if (isThreadedRead()) {
stopThreadedRead();
}
if (fd != -1) {
#ifdef WINDOWS
@@ -803,7 +802,7 @@ ssize_t PISerial::readDevice(void * read_to, ssize_t max_size) {
DWORD err = GetLastError();
//piCout << err;
if (err == ERROR_BAD_COMMAND || err == ERROR_ACCESS_DENIED) {
PIThread::stop(false);
softStopThreadedRead();
close();
return 0;
}

View File

@@ -25,8 +25,9 @@ PIString DispatcherClient::address() {
return eth->path();
}
void DispatcherClient::close() {
static_cast<PIThread*>(eth)->stop(false);
eth->stopThreadedRead();
eth->close();
}