diff --git a/libs/cloud/picloudclient.cpp b/libs/cloud/picloudclient.cpp index a3388719..5ebf9d70 100644 --- a/libs/cloud/picloudclient.cpp +++ b/libs/cloud/picloudclient.cpp @@ -107,7 +107,6 @@ bool PICloudClient::closeDevice() { ssize_t PICloudClient::readDevice(void * read_to, ssize_t max_size) { if (is_deleted) return -1; //piCoutObj << "readDevice"; - reading_now = true; if (!is_connected && eth.isClosed()) openDevice(); ssize_t sz = -1; mutex_buff.lock(); @@ -119,7 +118,6 @@ ssize_t PICloudClient::readDevice(void * read_to, ssize_t max_size) { } mutex_buff.unlock(); if (!is_connected) opened_ = false; - reading_now = false; //piCoutObj << "readDevice done" << sz; return sz; } @@ -132,6 +130,11 @@ ssize_t PICloudClient::writeDevice(const void * data, ssize_t size) { } +void PICloudClient::stopThreadedReadDevice() { + cond_buff.notifyOne(); +} + + void PICloudClient::internalDisconnect() { is_connected = false; cond_buff.notifyOne(); @@ -177,4 +180,3 @@ void PICloudClient::_readed(PIByteArray & ba) { while (buff.size_s() > threadedReadBufferSize()) piMSleep(100); // FIXME: sleep here is bad //piCoutObj << "_readed done"; } - diff --git a/libs/cloud/picloudserver.cpp b/libs/cloud/picloudserver.cpp index b3e61e79..a5fbe927 100644 --- a/libs/cloud/picloudserver.cpp +++ b/libs/cloud/picloudserver.cpp @@ -90,10 +90,8 @@ bool PICloudServer::closeDevice() { ssize_t PICloudServer::readDevice(void * read_to, ssize_t max_size) { //piCoutObj << "readDevice"; - reading_now = true; if (!opened_) openDevice(); else piMSleep(eth.readTimeout()); - reading_now = false; return -1; } @@ -104,6 +102,11 @@ ssize_t PICloudServer::writeDevice(const void * data, ssize_t max_size) { } +void PICloudServer::stopThreadedReadDevice() { + +} + + void PICloudServer::clientDisconnect(uint client_id) { tcp.sendDisconnected(client_id); } diff --git a/libs/main/cloud/picloudclient.h b/libs/main/cloud/picloudclient.h index 791bae85..a4c55e6f 100644 --- a/libs/main/cloud/picloudclient.h +++ b/libs/main/cloud/picloudclient.h @@ -53,6 +53,7 @@ protected: ssize_t readDevice(void * read_to, ssize_t max_size) override; ssize_t writeDevice(const void * data, ssize_t size) override; DeviceInfoFlags deviceInfoFlags() const override {return PIIODevice::Reliable;} + void stopThreadedReadDevice() override; private: EVENT_HANDLER1(void, _readed, PIByteArray &, data); @@ -65,6 +66,7 @@ private: PIConditionVariable cond_connect; std::atomic_bool is_connected; std::atomic_bool is_deleted; + }; #endif // PICLOUDCLIENT_H diff --git a/libs/main/cloud/picloudserver.h b/libs/main/cloud/picloudserver.h index 593783c2..b253b40e 100644 --- a/libs/main/cloud/picloudserver.h +++ b/libs/main/cloud/picloudserver.h @@ -73,6 +73,7 @@ protected: bool closeDevice() override; ssize_t readDevice(void * read_to, ssize_t max_size) override; ssize_t writeDevice(const void * data, ssize_t max_size) override; + void stopThreadedReadDevice() override; private: EVENT_HANDLER1(void, _readed, PIByteArray &, ba); diff --git a/libs/main/io_devices/piethernet.cpp b/libs/main/io_devices/piethernet.cpp index 7f459ef9..59013625 100644 --- a/libs/main/io_devices/piethernet.cpp +++ b/libs/main/io_devices/piethernet.cpp @@ -752,9 +752,9 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { #ifdef QNX PRIVATE->addr_.sin_len = sizeof(PRIVATE->addr_); #endif - //piCout << "connect to " << path() << "..."; + piCout << "connect to " << path() << "..."; connected_ = connectTCP(); - //piCout << "connect to " << path() << connected_; + piCout << "connect to " << path() << connected_; if (!connected_) piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString(); opened_ = connected_; diff --git a/libs/main/io_devices/piiodevice.cpp b/libs/main/io_devices/piiodevice.cpp index c132b702..155674a6 100644 --- a/libs/main/io_devices/piiodevice.cpp +++ b/libs/main/io_devices/piiodevice.cpp @@ -132,6 +132,7 @@ PIIODevice::PIIODevice(const PIString & path, PIIODevice::DeviceMode mode): PIOb PIIODevice::~PIIODevice() { stop(); + waitThreadedReadFinished(); } @@ -213,11 +214,12 @@ void PIIODevice::stopThreadedRead() { #ifdef MICRO_PIP read_thread.stop(); #else + read_thread.stop(); + read_thread.interrupt(); + stopThreadedReadDevice(); if (reading_now) { read_thread.terminate(); reading_now = false; - } else { - read_thread.stop(); } #endif } diff --git a/libs/main/io_devices/piiodevice.h b/libs/main/io_devices/piiodevice.h index b3af0987..c5f7936d 100644 --- a/libs/main/io_devices/piiodevice.h +++ b/libs/main/io_devices/piiodevice.h @@ -490,6 +490,10 @@ protected: //! \~english Function executed when thread read some data, default implementation execute external callback "ret_func_" //! \~russian Метод вызывается после каждого успешного потокового чтения, по умолчанию вызывает callback "ret_func_" virtual bool threadedRead(const uchar * readed, ssize_t size); + + //! \~english Function executed after PIThread::stop() and PIThread::interrupt() of read thread + //! \~russian Метод вызывается после PIThread::stop() и PIThread::interrupt() потока чтения + virtual void stopThreadedReadDevice() {} //! \~english Reimplement to construct full unambiguous string, describes this device. //! Default implementation returns \a path() diff --git a/libs/main/io_devices/piserial.cpp b/libs/main/io_devices/piserial.cpp index d781d52a..d3c97c44 100644 --- a/libs/main/io_devices/piserial.cpp +++ b/libs/main/io_devices/piserial.cpp @@ -798,9 +798,7 @@ ssize_t PISerial::readDevice(void * read_to, ssize_t max_size) { if (sending) return -1; // piCoutObj << "com event ..."; //piCoutObj << "read ..." << PRIVATE->hCom; - reading_now = true; ReadFile(PRIVATE->hCom, read_to, max_size, &PRIVATE->readed, 0); - reading_now = false; DWORD err = GetLastError(); //piCout << err; if (err == ERROR_BAD_COMMAND || err == ERROR_ACCESS_DENIED) { diff --git a/libs/main/io_utils/piconnection.cpp b/libs/main/io_utils/piconnection.cpp index 79af1255..4c32ad80 100644 --- a/libs/main/io_utils/piconnection.cpp +++ b/libs/main/io_utils/piconnection.cpp @@ -1114,7 +1114,10 @@ PIVector PIConnection::DevicePool::boundedDevices(const PIConnect PIConnection::DevicePool::DeviceData::~DeviceData() { if (rthread) { - rthread->terminate(); + rthread->stop(); + rthread->interrupt(); + if (!rthread->waitForFinish(1000)) + rthread->terminate(); delete rthread; rthread = nullptr; } @@ -1146,8 +1149,13 @@ void __DevicePool_threadReadDP(void * ddp) { } if (dev->isClosed()) { if (!dev->open()) { - piMSleep(dev->reopenTimeout()); - return; + PITimeMeasurer tm; + int timeout = dev->reopenTimeout(); + while (tm.elapsed_m() < timeout) { + if (dd->rthread->isStopping()) + return; + piMSleep(50); + } } } PIByteArray ba; diff --git a/main.cpp b/main.cpp index 78b276d7..f39e4f49 100644 --- a/main.cpp +++ b/main.cpp @@ -109,41 +109,53 @@ int main(int argc, char * argv[]) { delete threads[i]; }*/ - PIEthernet eth(PIEthernet::TCP_Server), seth(PIEthernet::TCP_Client); - //eth.setReadAddress("127.0.0.1", 50000); + PIEthernet eth(PIEthernet::TCP_Client), seth(PIEthernet::TCP_Client); + eth.connect("192.168.1.13", 22); + eth.startThreadedRead(); //piCout << eth.open(); - //PISerial ser; - //ser.setSpeed(PISerial::S9600); - //ser.setOption(PIIODevice::BlockingRead); - //piCout << ser.open("COM3"); + /* + PISerial ser; + ser.setSpeed(PISerial::S9600); + ser.setOption(PIIODevice::BlockingRead); + piCout << ser.open("COM3"); + */ /* PIThread thread; thread.start([&](void*){ piCout << "[T] start" << GetCurrentThreadId(); - //PIByteArray data = ((PIIODevice*)ð)->read(1024); + //PIByteArray data = ((PIIODevice*)&ser)->read(1024); eth.connect("192.168.1.13", 23, false); piCout << "[T] connected" << eth.isConnected() << errorString(); //piCout << "[T] readed" << data.size() << errorString(); piCout << "[T] end"; }); piMSleep(500); - eth.close(); + //eth.close(); //piMSleep(500); - //thread.stop(); - //thread.interrupt(); + thread.stop(); + thread.interrupt(); //seth.send("127.0.0.1", 50000, "string", 7); //thread.interrupt(); - thread.waitForFinish();*/ + thread.waitForFinish(); + */ + + /* eth.listen("127.0.0.1", 50000); - piMSleep(500); seth.connect("127.0.0.1", 50001, false); piMSleep(500); piCout << "connected" << seth.isConnected(); - //eth.close(); - piCout << "main end"; + */ + piMSleep(1000); + piCout << "main stop ..."; + eth.stopThreadedRead(); + piCout << "main wait ..." << eth.isThreadedRead(); + eth.waitThreadedReadFinished(); - eth.close(); + //eth.close(); + piCout << "main end" << eth.isThreadedRead(); + + //ser.close(); return 0; }