diff --git a/libs/cloud/picloudclient.cpp b/libs/cloud/picloudclient.cpp index df32160d..db0729c1 100644 --- a/libs/cloud/picloudclient.cpp +++ b/libs/cloud/picloudclient.cpp @@ -45,10 +45,7 @@ PICloudClient::PICloudClient(const PIString & path, PIIODevice::DeviceMode mode) PICloudClient::~PICloudClient() { //piCoutObj << "~PICloudClient()"; - //softStopThreadedRead(); - //eth.close(); - //if (is_connected) disconnected(); - //close(); + stopAndWait(); //piCoutObj << "~PICloudClient() closed"; internalDisconnect(); @@ -71,7 +68,8 @@ void PICloudClient::setKeepConnection(bool on) { void PICloudClient::interrupt() { - eth.interrupt(); + cond_buff.notifyOne(); + cond_connect.notifyOne(); } @@ -87,7 +85,7 @@ bool PICloudClient::openDevice() { mutex_connect.unlock(); if (!conn_ok) { mutex_connect.lock(); - eth.stop(); + eth.stopAndWait(); eth.close(); mutex_connect.unlock(); } @@ -104,7 +102,7 @@ bool PICloudClient::closeDevice() { if (is_connected) { internalDisconnect(); } - eth.stop(); + eth.stopAndWait(); eth.close(); return true; } @@ -116,11 +114,15 @@ ssize_t PICloudClient::readDevice(void * read_to, ssize_t max_size) { if (!is_connected && eth.isClosed()) openDevice(); ssize_t sz = -1; mutex_buff.lock(); - cond_buff.wait(mutex_buff, [this](){return !buff.isEmpty() || !is_connected;}); + cond_buff.wait(mutex_buff); if (is_connected) { - sz = piMini(max_size, buff.size()); - memcpy(read_to, buff.data(), sz); - buff.remove(0, sz); + if (buff.isEmpty()) { + sz = 0; + } else { + sz = piMini(max_size, buff.size()); + memcpy(read_to, buff.data(), sz); + buff.remove(0, sz); + } } mutex_buff.unlock(); if (!is_connected) opened_ = false; @@ -136,11 +138,6 @@ ssize_t PICloudClient::writeDevice(const void * data, ssize_t size) { } -void PICloudClient::stopThreadedReadDevice() { - cond_buff.notifyOne(); -} - - void PICloudClient::internalDisconnect() { is_connected = false; cond_buff.notifyOne(); diff --git a/libs/cloud/picloudserver.cpp b/libs/cloud/picloudserver.cpp index 931038bc..c968bff1 100644 --- a/libs/cloud/picloudserver.cpp +++ b/libs/cloud/picloudserver.cpp @@ -102,11 +102,6 @@ ssize_t PICloudServer::writeDevice(const void * data, ssize_t max_size) { } -void PICloudServer::stopThreadedReadDevice() { - -} - - void PICloudServer::interrupt() { eth.interrupt(); } @@ -159,11 +154,15 @@ ssize_t PICloudServer::Client::readDevice(void * read_to, ssize_t max_size) { if (!is_connected) return -1; ssize_t sz = -1; mutex_buff.lock(); - cond_buff.wait(mutex_buff, [this](){return !buff.isEmpty() || !is_connected;}); + cond_buff.wait(mutex_buff); if (is_connected) { - sz = piMini(max_size, buff.size()); - memcpy(read_to, buff.data(), sz); - buff.remove(0, sz); + if (buff.isEmpty()) { + sz = 0; + } else { + sz = piMini(max_size, buff.size()); + memcpy(read_to, buff.data(), sz); + buff.remove(0, sz); + } } mutex_buff.unlock(); return sz; diff --git a/libs/main/cloud/picloudclient.h b/libs/main/cloud/picloudclient.h index 4fb58d9b..e9217dd3 100644 --- a/libs/main/cloud/picloudclient.h +++ b/libs/main/cloud/picloudclient.h @@ -54,7 +54,6 @@ protected: ssize_t readDevice(void * read_to, ssize_t max_size) override; ssize_t writeDevice(const void * data, ssize_t size) override; DeviceInfoFlags deviceInfoFlags() const override {return PIIODevice::Reliable;} - void stopThreadedReadDevice() override; private: EVENT_HANDLER1(void, _readed, PIByteArray &, data); diff --git a/libs/main/cloud/picloudserver.h b/libs/main/cloud/picloudserver.h index 4f8d110d..a0fca4bd 100644 --- a/libs/main/cloud/picloudserver.h +++ b/libs/main/cloud/picloudserver.h @@ -74,7 +74,6 @@ protected: bool closeDevice() override; ssize_t readDevice(void * read_to, ssize_t max_size) override; ssize_t writeDevice(const void * data, ssize_t max_size) override; - void stopThreadedReadDevice() override; void interrupt() override; private: diff --git a/libs/main/core/piwaitevent_p.cpp b/libs/main/core/piwaitevent_p.cpp index f4a832dc..d576ba31 100644 --- a/libs/main/core/piwaitevent_p.cpp +++ b/libs/main/core/piwaitevent_p.cpp @@ -45,7 +45,7 @@ void PIWaitEvent::create() { piCout << "Error with CreateEventA:" << errorString(); } #else - for (int i = 0; i < 3; ++i) memset(&(fds[i]), 0, sizeof(fds[i])); + for (int i = 0; i < sizeof(fds); ++i) memset(&(fds[i]), 0, sizeof(fds[i])); if (::pipe(pipe_fd) < 0) { piCout << "Error with pipe:" << errorString(); } else { @@ -62,7 +62,7 @@ void PIWaitEvent::destroy() { event = NULL; } #else - for (int i = 0; i < 2; ++i) { + for (int i = 0; i < sizeof(pipe_fd); ++i) { if (pipe_fd[i] != 0) { ::close(pipe_fd[i]); pipe_fd[i] = 0; @@ -72,35 +72,24 @@ void PIWaitEvent::destroy() { } -#ifdef WINDOWS -bool PIWaitEvent::wait() { -#else -bool PIWaitEvent::wait(int fd, PIWaitEvent::SelectRole role) { -#endif +bool PIWaitEvent::wait(int fd, CheckRole role) { if (!isCreate()) return false; #ifdef WINDOWS DWORD ret = WaitForSingleObjectEx(event, INFINITE, TRUE); ResetEvent(event); if (ret == WAIT_IO_COMPLETION || ret == WAIT_FAILED) return false; #else + if (fd == -1) return false; int nfds = piMaxi(pipe_fd[ReadEnd], fd) + 1; - int fd_index = 0; - switch (role) { - case CheckRead: - fd_index = 0; - break; - case CheckWrite: - fd_index = 1; - break; - } - for (int i = 0; i < 3; ++i) FD_ZERO(&(fds[i])); - FD_SET(pipe_fd[ReadEnd], &(fds[0])); - FD_SET(fd, &(fds[2])); - FD_SET(fd, &(fds[fd_index])); - ::select(nfds, &(fds[0]), &(fds[1]), &(fds[2]), nullptr); + int fd_index = role; + for (int i = 0; i < sizeof(fds); ++i) FD_ZERO(&(fds[i])); + FD_SET(pipe_fd[ReadEnd], &(fds[CheckRead])); + FD_SET(fd, &(fds[CheckExeption])); + if (fd_index != CheckExeption) FD_SET(fd, &(fds[fd_index])); + ::select(nfds, &(fds[CheckRead]), &(fds[CheckWrite]), &(fds[CheckExeption]), nullptr); int buf = 0; while (::read(pipe_fd[ReadEnd], &buf, sizeof(buf)) > 0); - if (FD_ISSET(fd, &(fds[2]))) return false; + if (FD_ISSET(fd, &(fds[CheckExeption]))) return false; return FD_ISSET(fd, &(fds[fd_index])); #endif return true; @@ -124,3 +113,8 @@ bool PIWaitEvent::isCreate() const { return pipe_fd[ReadEnd] != 0; #endif } + + +void * PIWaitEvent::getEvent() const { + return event; +} diff --git a/libs/main/core/piwaitevent_p.h b/libs/main/core/piwaitevent_p.h index e5c7ee73..e0f18299 100644 --- a/libs/main/core/piwaitevent_p.h +++ b/libs/main/core/piwaitevent_p.h @@ -37,22 +37,23 @@ public: void create(); void destroy(); -#ifdef WINDOWS - bool wait(); -#else - enum SelectRole { + + enum CheckRole { // UNIX only CheckRead, - CheckWrite + CheckWrite, + CheckExeption }; - bool wait(int fd, SelectRole role = CheckRead); -#endif + bool wait(int fd = -1, CheckRole role = CheckRead); + void interrupt(); bool isCreate() const; -#ifdef WINDOWS - HANDLE event = NULL; -#else + void * getEvent() const; // WINDOWS only + private: +#ifdef WINDOWS + void * event = nullptr; +#else int pipe_fd[2] = {0, 0}; fd_set fds[3]; enum { diff --git a/libs/main/io_devices/piethernet.cpp b/libs/main/io_devices/piethernet.cpp index 18d0f4e2..8106921e 100644 --- a/libs/main/io_devices/piethernet.cpp +++ b/libs/main/io_devices/piethernet.cpp @@ -977,14 +977,14 @@ bool PIEthernet::connectTCP() { #ifdef WINDOWS long PIEthernet::waitForEvent(long mask) { if (!PRIVATE->event.isCreate() || sock < 0) return 0; - WSAEventSelect(sock, PRIVATE->event.event, mask); + WSAEventSelect(sock, PRIVATE->event.getEvent(), mask); if (PRIVATE->event.wait()) { //DWORD wr = WSAWaitForMultipleEvents(1, &(PRIVATE->read_event), FALSE, WSA_INFINITE, TRUE); //piCout << "wait result" << wr; //if (wr == WSA_WAIT_EVENT_0) { WSANETWORKEVENTS events; memset(&events, 0, sizeof(events)); - WSAEnumNetworkEvents(sock, PRIVATE->event.event, &events); + WSAEnumNetworkEvents(sock, PRIVATE->event.getEvent(), &events); //piCout << "wait result" << events.lNetworkEvents; return events.lNetworkEvents; } diff --git a/libs/main/io_devices/pifile.cpp b/libs/main/io_devices/pifile.cpp index 69bd4c44..20dae7c3 100644 --- a/libs/main/io_devices/pifile.cpp +++ b/libs/main/io_devices/pifile.cpp @@ -439,9 +439,7 @@ PIByteArray PIFile::get() { ssize_t PIFile::readDevice(void * read_to, ssize_t max_size) { if (!canRead() || PRIVATE->fd == 0) return -1; - reading_now = true; ssize_t ret = fread(read_to, 1, max_size, PRIVATE->fd); - reading_now = false; return ret; } diff --git a/libs/main/io_devices/piiodevice.cpp b/libs/main/io_devices/piiodevice.cpp index 90c4d95b..bbcfc092 100644 --- a/libs/main/io_devices/piiodevice.cpp +++ b/libs/main/io_devices/piiodevice.cpp @@ -217,11 +217,8 @@ void PIIODevice::stopThreadedRead() { read_thread.stop(); if (!destroying) { interrupt(); - stopThreadedReadDevice(); - } - if (reading_now) { - read_thread.terminate(); - reading_now = false; + } else { + piCoutObj << "Error: Device is running after destructor!"; } #endif } @@ -307,7 +304,6 @@ ssize_t PIIODevice::write(const void * data, ssize_t max_size) { void PIIODevice::_init() { - reading_now = false; setOptions(0); setReopenEnabled(true); setReopenTimeout(1000); diff --git a/libs/main/io_devices/piiodevice.h b/libs/main/io_devices/piiodevice.h index 3bd8f3b9..2ce101ef 100644 --- a/libs/main/io_devices/piiodevice.h +++ b/libs/main/io_devices/piiodevice.h @@ -498,10 +498,6 @@ protected: //! \~english Function executed when thread read some data, default implementation execute external callback "ret_func_" //! \~russian Метод вызывается после каждого успешного потокового чтения, по умолчанию вызывает callback "ret_func_" virtual bool threadedRead(const uchar * readed, ssize_t size); - - //! \~english Function executed after PIThread::stop() and PIThread::interrupt() of read thread - //! \~russian Метод вызывается после PIThread::stop() и PIThread::interrupt() потока чтения - virtual void stopThreadedReadDevice() {} //! \~english Reimplement to construct full unambiguous string, describes this device. //! Default implementation returns \a path() @@ -549,10 +545,6 @@ protected: bool opened_ = false; void * ret_data_ = nullptr; - //! \~english Set this flag while blocking operations - //! \~russian Устанавливайте этот флаг во время блокирующих операций - std::atomic_bool reading_now; - private: EVENT_HANDLER(void, read_func); EVENT_HANDLER(void, write_func); diff --git a/libs/main/io_devices/piserial.cpp b/libs/main/io_devices/piserial.cpp index a9437a73..b7590f7e 100644 --- a/libs/main/io_devices/piserial.cpp +++ b/libs/main/io_devices/piserial.cpp @@ -814,7 +814,7 @@ ssize_t PISerial::readDevice(void * read_to, ssize_t max_size) { if (sending) return -1; //piCoutObj << "read ..." << PRIVATE->hCom; memset(&(PRIVATE->overlap), 0, sizeof(PRIVATE->overlap)); - PRIVATE->overlap.hEvent = PRIVATE->event.event; + PRIVATE->overlap.hEvent = PRIVATE->event.getEvent(); ReadFile(PRIVATE->hCom, read_to, max_size, NULL, &(PRIVATE->overlap)); PRIVATE->readed = 0; if (PRIVATE->event.wait()) { @@ -860,7 +860,7 @@ ssize_t PISerial::writeDevice(const void * data, ssize_t max_size) { //piCoutObj << "send ..." << max_size;// << ": " << PIString((char*)data, max_size); sending = true; memset(&(PRIVATE->overlap_write), 0, sizeof(PRIVATE->overlap_write)); - PRIVATE->overlap_write.hEvent = PRIVATE->event_write.event; + PRIVATE->overlap_write.hEvent = PRIVATE->event_write.getEvent(); WriteFile(PRIVATE->hCom, data, max_size, NULL, &(PRIVATE->overlap_write)); if (PRIVATE->event_write.wait()) { GetOverlappedResult(PRIVATE->hCom, &(PRIVATE->overlap_write), &wrote, FALSE);