diff --git a/CMakeLists.txt b/CMakeLists.txt index 697dcaf5..f6e9daec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -492,6 +492,8 @@ if (NOT CROSSTOOLS) #target_link_libraries(pip_plugin pip) add_executable(pip_test "main.cpp") target_link_libraries(pip_test pip) + add_executable(pip_cloud_test "main_picloud_test.cpp") + target_link_libraries(pip_cloud_test pip_cloud) endif() else() diff --git a/libs/cloud/picloudclient.cpp b/libs/cloud/picloudclient.cpp index a3388719..b259e025 100644 --- a/libs/cloud/picloudclient.cpp +++ b/libs/cloud/picloudclient.cpp @@ -33,28 +33,22 @@ PICloudClient::PICloudClient(const PIString & path, PIIODevice::DeviceMode mode) if (is_deleted) return; bool need_disconn = is_connected; //piCoutObj << "eth disconnected"; - eth.softStopThreadedRead(); + eth.stop(); opened_ = false; internalDisconnect(); - if (need_disconn) - disconnected(); + if (need_disconn) disconnected(); //piCoutObj << "eth disconnected done"; }); } PICloudClient::~PICloudClient() { - //piCoutObj << "~PICloudClient()"; - softStopThreadedRead(); - //eth.close(); - //if (is_connected) disconnected(); - close(); - //piCoutObj << "~PICloudClient() closed"; - internalDisconnect(); -// stop(false); + //piCoutObj << "~PICloudClient() ..." << this; is_deleted = true; + stopAndWait(); + close(); internalDisconnect(); - //piCoutObj << "~PICloudClient() done"; + //piCoutObj << "~PICloudClient() done" << this; } @@ -69,6 +63,12 @@ void PICloudClient::setKeepConnection(bool on) { } +void PICloudClient::interrupt() { + cond_buff.notifyOne(); + cond_connect.notifyOne(); +} + + bool PICloudClient::openDevice() { //piCoutObj << "open";// << path(); bool op = eth.connect(PIEthernet::Address::resolve(path()), false); @@ -81,7 +81,7 @@ bool PICloudClient::openDevice() { mutex_connect.unlock(); if (!conn_ok) { mutex_connect.lock(); - eth.stop(); + eth.stopAndWait(); eth.close(); mutex_connect.unlock(); } @@ -98,41 +98,44 @@ bool PICloudClient::closeDevice() { if (is_connected) { internalDisconnect(); } - eth.stop(); + eth.stopAndWait(); eth.close(); return true; } ssize_t PICloudClient::readDevice(void * read_to, ssize_t max_size) { - if (is_deleted) return -1; - //piCoutObj << "readDevice"; - reading_now = true; + if (is_deleted || max_size <= 0) return -1; + //piCoutObj << "readDevice ..."; if (!is_connected && eth.isClosed()) openDevice(); ssize_t sz = -1; mutex_buff.lock(); - cond_buff.wait(mutex_buff, [this](){return !buff.isEmpty() || !is_connected;}); if (is_connected) { - sz = piMini(max_size, buff.size()); - memcpy(read_to, buff.data(), sz); - buff.remove(0, sz); + if (buff.isEmpty()) { + sz = 0; + } else { + sz = piMin(max_size, buff.size_s()); + memcpy(read_to, buff.data(), sz); + buff.remove(0, sz); + } + if (sz == 0) cond_buff.wait(mutex_buff); } mutex_buff.unlock(); if (!is_connected) opened_ = false; - reading_now = false; //piCoutObj << "readDevice done" << sz; return sz; } ssize_t PICloudClient::writeDevice(const void * data, ssize_t size) { - if (is_deleted) return -1; -// piCoutObj << "writeDevice"; + if (is_deleted || !is_connected) return -1; + //piCoutObj << "writeDevice" << size; return tcp.sendData(PIByteArray(data, size)); } void PICloudClient::internalDisconnect() { + //piCoutObj << "internalDisconnect"; is_connected = false; cond_buff.notifyOne(); cond_connect.notifyOne(); @@ -157,13 +160,18 @@ void PICloudClient::_readed(PIByteArray & ba) { } break; case PICloud::TCP::Disconnect: - eth.softStopThreadedRead(); + eth.stop(); opened_ = false; eth.close(); break; case PICloud::TCP::Data: if (is_connected) { mutex_buff.lock(); + if (buff.size_s() > threadedReadBufferSize()) { + piCoutObj << "Error: buffer overflow, drop" << ba.size() << "bytes"; + mutex_buff.unlock(); + return; + } buff.append(ba); mutex_buff.unlock(); cond_buff.notifyOne(); @@ -174,7 +182,6 @@ void PICloudClient::_readed(PIByteArray & ba) { } //piCoutObj << "readed" << ba.toHex(); } - while (buff.size_s() > threadedReadBufferSize()) piMSleep(100); // FIXME: sleep here is bad + if (buff.size_s() > threadedReadBufferSize()) piMSleep(100); // FIXME: sleep here is bad //piCoutObj << "_readed done"; } - diff --git a/libs/cloud/picloudserver.cpp b/libs/cloud/picloudserver.cpp index b3e61e79..f5a54766 100644 --- a/libs/cloud/picloudserver.cpp +++ b/libs/cloud/picloudserver.cpp @@ -25,13 +25,22 @@ PICloudServer::PICloudServer(const PIString & path, PIIODevice::DeviceMode mode) tcp.setRole(PICloud::TCP::Server); tcp.setServerName(server_name); setName("cloud_server__" + server_name); + is_deleted = false; + eth.setReopenEnabled(false); CONNECT1(void, PIByteArray, &streampacker, packetReceiveEvent, this, _readed); - CONNECTL(ð, connected, [this](){opened_ = true; piCoutObj << "connected"; tcp.sendStart();}); + CONNECTL(ð, connected, [this](){ + opened_ = true; + //piCoutObj << "connected" << ð + tcp.sendStart(); + }); CONNECTL(ð, disconnected, [this](bool){ - piCoutObj << "disconnected"; - eth.softStopThreadedRead(); + if (is_deleted) return; + //piCoutObj << "disconnected" << ð + for (auto c : clients_) { + delete c; + } opened_ = false; - ping_timer.stop(false); + ping_timer.stop(); piMSleep(100); }); CONNECTL(&ping_timer, tickEvent, [this] (void *, int){ @@ -41,8 +50,13 @@ PICloudServer::PICloudServer(const PIString & path, PIIODevice::DeviceMode mode) PICloudServer::~PICloudServer() { + //piCoutObj << "~PICloudServer ..." << this; + is_deleted = true; stop(); close(); + //piCout << "wait"; + waitThreadedReadFinished(); + //piCoutObj << "~PICloudServer done" << this; } @@ -65,35 +79,32 @@ bool PICloudServer::openDevice() { eth.startThreadedRead(); ping_timer.start(5000); return true; + } else { + ping_timer.stop(); + eth.close(); + return false; } - ping_timer.stop(false); - eth.close(); - return false; } bool PICloudServer::closeDevice() { - eth.stop(); - ping_timer.stop(false); - clients_mutex.lock(); - for (auto c : clients_) { - c->close(); - c->stop(); - } - clients_mutex.unlock(); + //piCoutObj << "closeDevice" << this; + eth.stopAndWait(); + ping_timer.stop(); eth.close(); - for (auto c : clients_) + for (auto c : clients_) { delete c; + } + clients_.clear(); return true; } ssize_t PICloudServer::readDevice(void * read_to, ssize_t max_size) { + if (is_deleted) return -1; //piCoutObj << "readDevice"; - reading_now = true; if (!opened_) openDevice(); - else piMSleep(eth.readTimeout()); - reading_now = false; + //else piMSleep(eth.readTimeout()); return -1; } @@ -104,12 +115,18 @@ ssize_t PICloudServer::writeDevice(const void * data, ssize_t max_size) { } +void PICloudServer::interrupt() { + eth.interrupt(); +} + + void PICloudServer::clientDisconnect(uint client_id) { tcp.sendDisconnected(client_id); } int PICloudServer::sendData(const PIByteArray & data, uint client_id) { + if (!opened_) return -1; return tcp.sendData(data, client_id); } @@ -122,12 +139,10 @@ PICloudServer::Client::Client(PICloudServer * srv, uint id) : server(srv), clien PICloudServer::Client::~Client() { - if (is_connected) { - is_connected = false; - cond_buff.notifyOne(); - } + //piCoutObj << "~PICloudServer::Client..." << this; close(); - stop(); + stopAndWait(); + //piCoutObj << "~PICloudServer::Client done" << this; } @@ -137,7 +152,7 @@ bool PICloudServer::Client::openDevice() { bool PICloudServer::Client::closeDevice() { - softStopThreadedRead(); + //piCoutObj << "closeDevice" << this; if (is_connected) { server->clientDisconnect(client_id); is_connected = false; @@ -151,11 +166,15 @@ ssize_t PICloudServer::Client::readDevice(void * read_to, ssize_t max_size) { if (!is_connected) return -1; ssize_t sz = -1; mutex_buff.lock(); - cond_buff.wait(mutex_buff, [this](){return !buff.isEmpty() || !is_connected;}); if (is_connected) { - sz = piMini(max_size, buff.size()); - memcpy(read_to, buff.data(), sz); - buff.remove(0, sz); + if (buff.isEmpty()) { + sz = 0; + } else { + sz = piMini(max_size, buff.size()); + memcpy(read_to, buff.data(), sz); + buff.remove(0, sz); + } + if (sz == 0) cond_buff.wait(mutex_buff); } mutex_buff.unlock(); return sz; @@ -163,21 +182,32 @@ ssize_t PICloudServer::Client::readDevice(void * read_to, ssize_t max_size) { ssize_t PICloudServer::Client::writeDevice(const void * data, ssize_t size) { + if (!is_connected) return -1; return server->sendData(PIByteArray(data, size), client_id); } +void PICloudServer::Client::interrupt() { + cond_buff.notifyOne(); +} + + void PICloudServer::Client::pushBuffer(const PIByteArray & ba) { if (!is_connected) return; mutex_buff.lock(); + if (buff.size_s() > threadedReadBufferSize()) { + piCoutObj << "Error: buffer overflow, drop" << ba.size() << "bytes"; + mutex_buff.unlock(); + return; + } buff.append(ba); cond_buff.notifyOne(); mutex_buff.unlock(); - while (buff.size_s() > threadedReadBufferSize()) piMSleep(100); // FIXME: sleep here is bad } void PICloudServer::_readed(PIByteArray & ba) { + if (is_deleted) return; PIPair hdr = tcp.parseHeader(ba); if (hdr.second == tcp.role()) { switch (hdr.first) { @@ -189,8 +219,8 @@ void PICloudServer::_readed(PIByteArray & ba) { if (oc) { tcp.sendDisconnected(id); } else { - //piCoutObj << "new Client" << id; Client * c = new Client(this, id); + //piCoutObj << "new Client" << id << c; CONNECT1(void, PIObject *, c, deleted, this, clientDeleted); clients_mutex.lock(); clients_ << c; @@ -206,8 +236,9 @@ void PICloudServer::_readed(PIByteArray & ba) { Client * oc = index_clients.value(id, nullptr); clients_mutex.unlock(); if (oc) { + oc->stopAndWait(); oc->is_connected = false; - oc->close(); + delete oc; } } break; case PICloud::TCP::Data: { @@ -215,7 +246,7 @@ void PICloudServer::_readed(PIByteArray & ba) { clients_mutex.lock(); Client * oc = index_clients.value(d.first, nullptr); clients_mutex.unlock(); - //piCoutObj << "data for" << d.first << d.second.toHex(); + //piCoutObj << "data for" << d.first << d.second.size(); if (oc && !d.second.isEmpty()) oc->pushBuffer(d.second); } break; default: break; @@ -226,11 +257,11 @@ void PICloudServer::_readed(PIByteArray & ba) { void PICloudServer::clientDeleted(PIObject * o) { PICloudServer::Client * c = (PICloudServer::Client*)o; + //piCoutObj << "clientDeleted" << c; clients_mutex.lock(); clients_.removeOne(c); auto it = index_clients.makeIterator(); - while (it.hasNext()) { - it.next(); + while (it.next()) { if (it.value() == c) { index_clients.remove(it.key()); break; diff --git a/libs/console/piscreen.cpp b/libs/console/piscreen.cpp index abbbf657..fba57853 100644 --- a/libs/console/piscreen.cpp +++ b/libs/console/piscreen.cpp @@ -588,7 +588,7 @@ void PIScreen::waitForFinish() { void PIScreen::stop(bool clear) { - PIThread::stop(true); + PIThread::stopAndWait(); if (clear) console.clearScreen(); #ifndef WINDOWS fflush(0); diff --git a/libs/main/cloud/picloudclient.h b/libs/main/cloud/picloudclient.h index 791bae85..e9217dd3 100644 --- a/libs/main/cloud/picloudclient.h +++ b/libs/main/cloud/picloudclient.h @@ -43,6 +43,7 @@ public: void setKeepConnection(bool on); bool isConnected() const {return is_connected;} ssize_t bytesAvailable() const override {return buff.size();} + void interrupt() override; EVENT(connected); EVENT(disconnected); @@ -65,6 +66,7 @@ private: PIConditionVariable cond_connect; std::atomic_bool is_connected; std::atomic_bool is_deleted; + }; #endif // PICLOUDCLIENT_H diff --git a/libs/main/cloud/picloudserver.h b/libs/main/cloud/picloudserver.h index 593783c2..59f8509e 100644 --- a/libs/main/cloud/picloudserver.h +++ b/libs/main/cloud/picloudserver.h @@ -51,6 +51,7 @@ public: ssize_t writeDevice(const void * data, ssize_t size) override; DeviceInfoFlags deviceInfoFlags() const override {return PIIODevice::Reliable;} ssize_t bytesAvailable() const override {return buff.size();} + void interrupt() override; private: void pushBuffer(const PIByteArray & ba); @@ -73,6 +74,7 @@ protected: bool closeDevice() override; ssize_t readDevice(void * read_to, ssize_t max_size) override; ssize_t writeDevice(const void * data, ssize_t max_size) override; + void interrupt() override; private: EVENT_HANDLER1(void, _readed, PIByteArray &, ba); @@ -84,6 +86,7 @@ private: PIMap index_clients; PITimer ping_timer; mutable PIMutex clients_mutex; + std::atomic_bool is_deleted; }; #endif // PICLOUDSERVER_H diff --git a/libs/main/core/pibase.h b/libs/main/core/pibase.h index c8eff9d8..b066e6cd 100644 --- a/libs/main/core/pibase.h +++ b/libs/main/core/pibase.h @@ -248,6 +248,10 @@ typedef long time_t; #endif +#ifdef POSIX_SIGNALS +# define PIP_INTERRUPT_SIGNAL SIGTERM +#endif + #ifdef LINUX # define environ __environ #endif diff --git a/libs/main/core/piincludes.h b/libs/main/core/piincludes.h index e867a756..c348f2ea 100644 --- a/libs/main/core/piincludes.h +++ b/libs/main/core/piincludes.h @@ -44,6 +44,7 @@ class PIInit; #endif class PIChar; class PICout; +class PIWaitEvent; struct lconv; diff --git a/libs/main/core/piincludes_p.h b/libs/main/core/piincludes_p.h index e3c622ca..d35baa6e 100644 --- a/libs/main/core/piincludes_p.h +++ b/libs/main/core/piincludes_p.h @@ -22,6 +22,10 @@ #include "picout.h" #ifdef WINDOWS +# ifdef _WIN32_WINNT +# undef _WIN32_WINNT +# define _WIN32_WINNT 0x0600 +# endif # include # include # include diff --git a/libs/main/core/piinit.cpp b/libs/main/core/piinit.cpp index c8354cfb..3c768797 100644 --- a/libs/main/core/piinit.cpp +++ b/libs/main/core/piinit.cpp @@ -92,9 +92,20 @@ void __sighandler__(PISignals::Signal s) { } -#ifdef ANDROID -void android_thread_exit_handler(int sig) { - pthread_exit(0); +#ifdef POSIX_SIGNALS +void pipThreadSignalHandler(int sig) { +//# ifdef ANDROID +// pthread_exit(0); +//# endif +} +void pipInitThreadSignals() { + struct sigaction actions; + memset(&actions, 0, sizeof(actions)); + sigemptyset(&actions.sa_mask); + actions.sa_flags = 0; + actions.sa_handler = pipThreadSignalHandler; + if (sigaction(PIP_INTERRUPT_SIGNAL, &actions, 0) != 0) + piCout << "sigaction error:" << errorString(); } #endif @@ -166,14 +177,10 @@ PIInit::PIInit() { setlocale(LC_ALL, ""); setlocale(LC_NUMERIC, "C"); # endif //HAS_LOCALE -#else //ANDROID - struct sigaction actions; - memset(&actions, 0, sizeof(actions)); - sigemptyset(&actions.sa_mask); - actions.sa_flags = 0; - actions.sa_handler = android_thread_exit_handler; - sigaction(SIGTERM, &actions, 0); #endif //ANDROID +#ifdef POSIX_SIGNALS + pipInitThreadSignals(); +#endif PRIVATE->delete_locs = false; __syslocname__ = __sysoemname__ = 0; __utf8name__ = const_cast("UTF-8"); diff --git a/libs/main/core/piobject.cpp b/libs/main/core/piobject.cpp index 2297b449..dd4eed43 100644 --- a/libs/main/core/piobject.cpp +++ b/libs/main/core/piobject.cpp @@ -839,7 +839,7 @@ PIObject::Deleter::Deleter() { PRIVATE->thread.setSlot([this](){ PIVector oq; PRIVATE->thread.lock(); - while(PRIVATE->obj_queue.isEmpty()) PRIVATE->cond_var.wait(PRIVATE->thread.mutex()); + if (PRIVATE->obj_queue.isEmpty()) PRIVATE->cond_var.wait(PRIVATE->thread.mutex()); oq.swap(PRIVATE->obj_queue); PRIVATE->thread.unlock(); for (PIObject * o : oq) deleteObject(o); diff --git a/libs/main/core/piwaitevent_p.cpp b/libs/main/core/piwaitevent_p.cpp new file mode 100644 index 00000000..72d85aee --- /dev/null +++ b/libs/main/core/piwaitevent_p.cpp @@ -0,0 +1,144 @@ +/* + PIP - Platform Independent Primitives + Private PIP wait object + Ivan Pelipenko peri4ko@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "piwaitevent_p.h" +#ifdef WINDOWS +//# ifdef _WIN32_WINNT +//# undef _WIN32_WINNT +//# define _WIN32_WINNT 0x0600 +//# endif +# include +#else +# include +# include +#endif +#include "pistring.h" + + +PIWaitEvent::~PIWaitEvent() { + destroy(); +} + + +void PIWaitEvent::create() { + destroy(); +#ifdef WINDOWS + event = CreateEventA(NULL, TRUE, FALSE, NULL); + if (!event) { + piCout << "Error with CreateEventA:" << errorString(); + } +#else + for (int i = 0; i < 3; ++i) memset(&(fds[i]), 0, sizeof(fds[i])); + if (::pipe(pipe_fd) < 0) { + piCout << "Error with pipe:" << errorString(); + } else { + fcntl(pipe_fd[ReadEnd], F_SETFL, O_NONBLOCK); + } +#endif +} + + +void PIWaitEvent::destroy() { +#ifdef WINDOWS + if (event) { + CloseHandle(event); + event = NULL; + } +#else + for (int i = 0; i < 2; ++i) { + if (pipe_fd[i] != 0) { + ::close(pipe_fd[i]); + pipe_fd[i] = 0; + } + } +#endif +} + + +bool PIWaitEvent::wait(int fd, CheckRole role) { + if (!isCreate()) return false; +#ifdef WINDOWS + DWORD ret = WaitForSingleObjectEx(event, INFINITE, TRUE); + ResetEvent(event); + if (ret == WAIT_IO_COMPLETION || ret == WAIT_FAILED) return false; +#else + if (fd == -1) return false; + int nfds = piMaxi(pipe_fd[ReadEnd], fd) + 1; + int fd_index = role; + for (int i = 0; i < 3; ++i) FD_ZERO(&(fds[i])); + FD_SET(pipe_fd[ReadEnd], &(fds[CheckRead])); + FD_SET(fd, &(fds[CheckExeption])); + if (fd_index != CheckExeption) FD_SET(fd, &(fds[fd_index])); + ::select(nfds, &(fds[CheckRead]), &(fds[CheckWrite]), &(fds[CheckExeption]), nullptr); + int buf = 0; + while (::read(pipe_fd[ReadEnd], &buf, sizeof(buf)) > 0); + if (FD_ISSET(fd, &(fds[CheckExeption]))) return false; + return FD_ISSET(fd, &(fds[fd_index])); +#endif + return true; +} + + +bool PIWaitEvent::sleep(int us) { + if (!isCreate()) return false; +#ifdef WINDOWS + DWORD ret = WaitForSingleObjectEx(event, us / 1000, TRUE); + ResetEvent(event); + return ret == WAIT_TIMEOUT; +#else + int nfds = pipe_fd[ReadEnd] + 1; + FD_ZERO(&(fds[CheckRead])); + FD_SET(pipe_fd[ReadEnd], &(fds[CheckRead])); + timeval timeout; + timeout.tv_sec = us / 1000000; + timeout.tv_usec = us % 1000000; + int ret = ::select(nfds, &(fds[CheckRead]), nullptr, nullptr, &timeout); + int buf = 0; + while (::read(pipe_fd[ReadEnd], &buf, sizeof(buf)) > 0); + return ret == 0; +#endif +} + + +void PIWaitEvent::interrupt() { + if (!isCreate()) return; +#ifdef WINDOWS + SetEvent(event); +#else + ::write(pipe_fd[WriteEnd], "", 1); +#endif +} + + +bool PIWaitEvent::isCreate() const { +#ifdef WINDOWS + return event; +#else + return pipe_fd[ReadEnd] != 0; +#endif +} + + +void * PIWaitEvent::getEvent() const { +#ifdef WINDOWS + return event; +#else + return nullptr; +#endif +} diff --git a/libs/main/core/piwaitevent_p.h b/libs/main/core/piwaitevent_p.h new file mode 100644 index 00000000..3dbdb137 --- /dev/null +++ b/libs/main/core/piwaitevent_p.h @@ -0,0 +1,67 @@ +/* + PIP - Platform Independent Primitives + Private PIP wait object + Ivan Pelipenko peri4ko@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef PIWAITEVENT_P_H +#define PIWAITEVENT_P_H + +#include "pibase.h" +#ifdef WINDOWS +# include +# include +# include +#else +# include +# include +#endif + + +class PIP_EXPORT PIWaitEvent { +public: + ~PIWaitEvent(); + + enum CheckRole { // UNIX only + CheckRead, + CheckWrite, + CheckExeption + }; + + void create(); + void destroy(); + bool wait(int fd = -1, CheckRole role = CheckRead); + bool sleep(int us); // return if sleep done + void interrupt(); + bool isCreate() const; + void * getEvent() const; // WINDOWS only + +private: +#ifdef WINDOWS + void * event = nullptr; +#else + int pipe_fd[2] = {0, 0}; + fd_set fds[3]; + enum { + ReadEnd = 0, + WriteEnd = 1 + }; +#endif + +}; + + +#endif // PIWAITEVENT_P_H diff --git a/libs/main/io_devices/pican.cpp b/libs/main/io_devices/pican.cpp index 66381091..7f42774e 100644 --- a/libs/main/io_devices/pican.cpp +++ b/libs/main/io_devices/pican.cpp @@ -18,7 +18,7 @@ */ #include "pican.h" #include "pipropertystorage.h" -#include "piincludes_p.h" +#include "piwaitevent_p.h" #if !defined(WINDOWS) && !defined(MAC_OS) && !defined(MICRO_PIP) # define PIP_CAN #endif @@ -39,17 +39,24 @@ REGISTER_DEVICE(PICAN) +PRIVATE_DEFINITION_START(PICAN) + PIWaitEvent event; +PRIVATE_DEFINITION_END(PICAN) + + PICAN::PICAN(const PIString & path, PIIODevice::DeviceMode mode) : PIIODevice(path, mode) { setThreadedReadBufferSize(256); setPath(path); can_id = 0; sock = 0; + PRIVATE->event.create(); } PICAN::~PICAN() { - stop(); + stopAndWait(); close(); + PRIVATE->event.destroy(); } @@ -92,6 +99,7 @@ bool PICAN::openDevice() { bool PICAN::closeDevice() { #ifdef PIP_CAN + interrupt(); if (sock > 0) ::close(sock); #endif return true; @@ -103,9 +111,8 @@ ssize_t PICAN::readDevice(void * read_to, ssize_t max_size) { //piCout << "PICAN read"; can_frame frame; ssize_t ret = 0; - reading_now = true; - ret = ::read(sock, &frame, sizeof(can_frame)); - reading_now = false; + if (PRIVATE->event.wait(sock)) + ret = ::read(sock, &frame, sizeof(can_frame)); if (ret < 0) {/*piCoutObj << "Error while read CAN frame " << ret;*/ return -1;} //piCoutObj << "receive CAN frame Id =" << frame.can_id; memcpy(read_to, frame.data, piMini(frame.can_dlc, max_size)); @@ -148,6 +155,11 @@ int PICAN::readedCANID() const { } +void PICAN::interrupt() { + PRIVATE->event.interrupt(); +} + + PIString PICAN::constructFullPathDevice() const { PIString ret; ret += path() + ":" + PIString::fromNumber(CANID(), 16); diff --git a/libs/main/io_devices/pican.h b/libs/main/io_devices/pican.h index 8afab4d7..199ccf5f 100644 --- a/libs/main/io_devices/pican.h +++ b/libs/main/io_devices/pican.h @@ -39,6 +39,7 @@ public: void setCANID(int id); int CANID() const; int readedCANID() const; + void interrupt() override; protected: bool openDevice() override; @@ -52,6 +53,7 @@ protected: DeviceInfoFlags deviceInfoFlags() const override {return PIIODevice::Reliable;} private: + PRIVATE_DECLARATION(PIP_EXPORT) int sock; int can_id, readed_id; }; diff --git a/libs/main/io_devices/piethernet.cpp b/libs/main/io_devices/piethernet.cpp index 3ad67450..f38b4efc 100644 --- a/libs/main/io_devices/piethernet.cpp +++ b/libs/main/io_devices/piethernet.cpp @@ -67,6 +67,7 @@ # endif # endif #endif +#include "piwaitevent_p.h" #include @@ -212,6 +213,7 @@ PRIVATE_DEFINITION_START(PIEthernet) sockaddr_in addr_; sockaddr_in saddr_; sockaddr_in raddr_; + PIWaitEvent event; PRIVATE_DEFINITION_END(PIEthernet) @@ -240,15 +242,18 @@ PIEthernet::PIEthernet(int sock_, PIString ip_port): PIIODevice("", ReadWrite) { setParameters(PIEthernet::ReuseAddress | PIEthernet::MulticastLoop); setType(TCP_Client, false); setPath(ip_port); + ethNonblocking(sock); + PRIVATE->event.create(); //piCoutObj << "new tcp client" << sock_; } PIEthernet::~PIEthernet() { - //piCout << "~PIEthernet" << uint(this); - stop(); + //piCout << "~PIEthernet"; + stopAndWait(); close(); - //piCoutObj << "~PIEthernet done"; + PRIVATE->event.destroy(); + //piCout << "~PIEthernet done"; } @@ -274,7 +279,9 @@ void PIEthernet::construct() { bool PIEthernet::init() { if (isOpened()) return true; - //cout << "init " << type_ << endl; + if (sock != -1) return true; + //piCout << "init " << type(); + PRIVATE->event.destroy(); if (sock_s == sock) sock_s = -1; closeSocket(sock); @@ -289,6 +296,8 @@ bool PIEthernet::init() { } PIFlags params = parameters(); sock = ::socket(AF_INET, st, pr); + ethNonblocking(sock); + PRIVATE->event.create(); if (params[SeparateSockets]) sock_s = ::socket(AF_INET, st, pr); else @@ -301,7 +310,7 @@ bool PIEthernet::init() { if (params[PIEthernet::Broadcast]) ethSetsockoptBool(sock, SOL_SOCKET, SO_BROADCAST); applyTimeouts(); applyOptInt(IPPROTO_IP, IP_TTL, TTL()); -// piCoutObj << "inited" << path(); + //piCoutObj << "inited" << path(); return true; } @@ -355,11 +364,12 @@ PIEthernet::Address PIEthernet::getBroadcast(const PIEthernet::Address & ip, con bool PIEthernet::openDevice() { if (connected_) return true; + //piCoutObj << "open"; init(); if (sock == -1 || path().isEmpty()) return false; addr_r.set(path()); - if (type() == TCP_Client) - connecting_ = true; + //if (type() == TCP_Client) + // connecting_ = true; if (type() != UDP || mode() == PIIODevice::WriteOnly) return true; memset(&PRIVATE->addr_, 0, sizeof(PRIVATE->addr_)); @@ -392,21 +402,26 @@ bool PIEthernet::openDevice() { bool PIEthernet::closeDevice() { - //cout << "close\n"; + //piCoutObj << "close"; + bool ned = connected_; + connected_ = connecting_ = false; + server_thread_.stop(); + PRIVATE->event.interrupt(); if (server_thread_.isRunning()) { - server_thread_.stop(); - if (!server_thread_.waitForFinish(100)) + if (!server_thread_.waitForFinish(1000)) server_thread_.terminate(); } + PRIVATE->event.destroy(); if (sock_s == sock) sock_s = -1; closeSocket(sock); closeSocket(sock_s); while (!clients_.isEmpty()) delete clients_.back(); - bool ned = connected_; - connected_ = connecting_ = false; - if (ned) disconnected(false); + if (ned) { + piCoutObj << "Disconnect on close"; + disconnected(false); + } return true; } @@ -556,6 +571,7 @@ bool PIEthernet::connect(bool threaded) { connecting_ = true; return true; } + if (connected_) return false; if (sock == -1) init(); if (sock == -1) return false; memset(&PRIVATE->addr_, 0, sizeof(PRIVATE->addr_)); @@ -566,13 +582,14 @@ bool PIEthernet::connect(bool threaded) { #ifdef QNX PRIVATE->addr_.sin_len = sizeof(PRIVATE->addr_); #endif - connected_ = (::connect(sock, (sockaddr * )&PRIVATE->addr_, sizeof(PRIVATE->addr_)) == 0); + connecting_ = true; + connected_ = connectTCP(); + connecting_ = false; if (!connected_) { piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString(); } opened_ = connected_; if (connected_) { - connecting_ = false; connected(); } return connected_; @@ -618,7 +635,7 @@ bool PIEthernet::listen(bool threaded) { return false; } opened_ = server_bounded = true; - //piCoutObj << "listen on " << ip_ << ":" << port_; + piCoutObj << "listen on" << path(); server_thread_.start(server_func); return true; } @@ -675,40 +692,21 @@ bool PIEthernet::send(const PIEthernet::Address & addr, const PIByteArray & data } +void PIEthernet::interrupt() { + //piCout << "interrupt"; + PRIVATE->event.interrupt(); +} + + ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { //piCout << "read" << sock; if (sock == -1) init(); if (sock == -1 || read_to == 0) return -1; - int rs = 0, s = 0, lerr = 0; - sockaddr_in client_addr; - socklen_t slen = sizeof(client_addr); -// piCoutObj << "read from " << ip_ << ":" << port_; + int rs = 0, lerr = 0; + //piCoutObj << "read from " << path() << connecting_; switch (type()) { - case TCP_SingleTCP: - reading_now = true; - ::listen(sock, 64); - reading_now = false; - s = accept(sock, (sockaddr * )&client_addr, &slen); - if (s == -1) { - //piCoutObj << "Can`t accept new connection, " << ethErrorString(); - piMinSleep(); - return -1; - } - reading_now = true; - rs = ethRecv(s, read_to, max_size); - reading_now = false; - closeSocket(s); - return rs; case TCP_Client: if (connecting_) { -#ifdef ANDROID - /*if (sock_s == sock) - sock_s = -1; - closeSocket(sock); - closeSocket(sock_s); - init(); - qDebug() << "init() in read thread";*/ -#endif addr_r.set(path()); memset(&PRIVATE->addr_, 0, sizeof(PRIVATE->addr_)); PRIVATE->addr_.sin_port = htons(addr_r.port()); @@ -717,11 +715,9 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { #ifdef QNX PRIVATE->addr_.sin_len = sizeof(PRIVATE->addr_); #endif - //piCout << "connect to " << path() << "..."; - reading_now = true; - connected_ = (::connect(sock, (sockaddr * )&(PRIVATE->addr_), sizeof(PRIVATE->addr_)) == 0); - reading_now = false; - //piCout << "connect to " << path() << connected_; + piCoutObj << "connect to " << path() << "..."; + connected_ = connectTCP(); + piCoutObj << "connect to " << path() << connected_; if (!connected_) piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString(); opened_ = connected_; @@ -734,9 +730,25 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { } if (!connected_) return -1; errorClear(); - reading_now = true; - rs = ethRecv(sock, read_to, max_size); - reading_now = false; +#ifdef WINDOWS + { + long wr = waitForEvent(PRIVATE->event, FD_READ | FD_CLOSE); + switch (wr) { + case FD_READ: + //piCout << "fd_read ..."; + rs = ethRecv(sock, read_to, max_size); + break; + case FD_CLOSE: + //piCout << "fd_close ..."; + rs = -1; + break; + default: break; + } + } +#else + if (PRIVATE->event.wait(sock)) + rs = ethRecv(sock, read_to, max_size); +#endif //piCoutObj << "readed" << rs; if (rs <= 0) { lerr = ethErrorCore(); @@ -744,30 +756,47 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { #ifdef WINDOWS if ((lerr == WSAEWOULDBLOCK || lerr == WSAETIMEDOUT) && !parameters()[DisonnectOnTimeout]) { #elif defined(ANDROID) - if ((lerr == EWOULDBLOCK || lerr == EAGAIN || lerr == EINTR) && !parameters()[DisonnectOnTimeout]) { + if ((lerr == EWOULDBLOCK || lerr == EAGAIN || lerr == EINTR) && !parameters()[DisonnectOnTimeout] && rs < 0) { #else - if ((lerr == EWOULDBLOCK || lerr == EAGAIN) && !parameters()[DisonnectOnTimeout]) { + if ((lerr == EWOULDBLOCK || lerr == EAGAIN) && !parameters()[DisonnectOnTimeout] && rs < 0) { #endif //piCoutObj << errorString(); return -1; } - if (connected_) { + if (connected_.exchange(false)) { + opened_ = false; piCoutObj << "Disconnect on read," << ethErrorString(); - opened_ = connected_ = false; + closeSocket(sock); init(); disconnected(rs < 0); } - if (parameters()[KeepConnection]) + if (parameters()[KeepConnection]) { connect(); + } //piCoutObj << "eth" << ip_ << "disconnected"; } if (rs > 0) received(read_to, rs); return rs; - case UDP: + case UDP: { memset(&PRIVATE->raddr_, 0, sizeof(PRIVATE->raddr_)); - reading_now = true; + //piCoutObj << "read from" << path() << "..."; +#ifdef WINDOWS + long wr = waitForEvent(PRIVATE->event, FD_READ | FD_CLOSE); + switch (wr) { + case FD_READ: + //piCout << "fd_read ..."; + rs = ethRecvfrom(sock, read_to, max_size, 0, (sockaddr*)&PRIVATE->raddr_); + break; + case FD_CLOSE: + //piCout << "fd_close ..."; + rs = -1; + break; + default: break; + } +#else rs = ethRecvfrom(sock, read_to, max_size, 0, (sockaddr*)&PRIVATE->raddr_); - reading_now = false; +#endif + //piCoutObj << "read from" << path() << rs << "bytes"; if (rs > 0) { addr_lr.set(uint(PRIVATE->raddr_.sin_addr.s_addr), ntohs(PRIVATE->raddr_.sin_port)); //piCoutObj << "read from" << ip_r << ":" << port_r << rs << "bytes"; @@ -775,6 +804,7 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { } //else piCoutObj << "read returt" << rs << ", error" << ethErrorString(); return rs; + } default: break; } return -1; @@ -790,26 +820,6 @@ ssize_t PIEthernet::writeDevice(const void * data, ssize_t max_size) { //piCoutObj << "sending to " << ip_s << ":" << port_s << " " << max_size << " bytes"; int ret = 0; switch (type()) { - case TCP_SingleTCP: - memset(&PRIVATE->addr_, 0, sizeof(PRIVATE->addr_)); - PRIVATE->addr_.sin_port = htons(addr_s.port()); - PRIVATE->addr_.sin_addr.s_addr = addr_s.ip(); - PRIVATE->addr_.sin_family = AF_INET; -#ifdef QNX - PRIVATE->addr_.sin_len = sizeof(PRIVATE->addr_); -#endif - //piCoutObj << "connect SingleTCP" << ip_s << ":" << port_s << "..."; - if (::connect(sock, (sockaddr * )&PRIVATE->addr_, sizeof(PRIVATE->addr_)) != 0) { - //piCoutObj << "Can`t connect to " << ip_s << ":" << port_s << ", " << ethErrorString(); - piMinSleep(); - return -1; - } - //piCoutObj << "ok, write SingleTCP" << int(data) << max_size << "bytes ..."; - ret = ::send(sock, (const char *)data, max_size, 0); - //piCoutObj << "ok, ret" << ret; - closeSocket(sock); - init(); - return ret; case UDP: PRIVATE->saddr_.sin_port = htons(addr_s.port()); PRIVATE->saddr_.sin_addr.s_addr = addr_s.ip(); @@ -823,7 +833,7 @@ ssize_t PIEthernet::writeDevice(const void * data, ssize_t max_size) { #endif , (sockaddr * )&PRIVATE->saddr_, sizeof(PRIVATE->saddr_)); //piCout << "[PIEth] write to" << ip_s << ":" << port_s << "ok"; - case TCP_Client: + case TCP_Client: { if (connecting_) { memset(&PRIVATE->addr_, 0, sizeof(PRIVATE->addr_)); addr_r.set(path()); @@ -834,7 +844,7 @@ ssize_t PIEthernet::writeDevice(const void * data, ssize_t max_size) { PRIVATE->addr_.sin_len = sizeof(PRIVATE->addr_); #endif //piCoutObj << "connect to " << ip << ":" << port_; - connected_ = (::connect(sock, (sockaddr * )&PRIVATE->addr_, sizeof(PRIVATE->addr_)) == 0); + connected_ = connectTCP(); if (!connected_) piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString(); opened_ = connected_; @@ -844,14 +854,46 @@ ssize_t PIEthernet::writeDevice(const void * data, ssize_t max_size) { } } if (!connected_) return -1; - ret = ::send(sock, (const char *)data, max_size, 0); - if (ret < 0) { - piCoutObj << "Disconnect on write," << ethErrorString(); - opened_ = connected_ = false; - init(); - disconnected(true); + auto disconnectFunc = [this](){ + if (connected_.exchange(false)) { + opened_ = false; + piCoutObj << "Disconnect on write," << ethErrorString(); + closeSocket(sock); + init(); + disconnected(true); + } + }; + if (!isOptionSet(BlockingWrite)) { + ret = ::send(sock, (const char *)data, max_size, 0); + if (ret < 0) { + disconnectFunc(); + return -1; + } + } else { + ssize_t remain_size = max_size; + const char * remain_data = (const char *)data; + while (remain_size > 0) { + int sr = ::send(sock, remain_data, remain_size, 0); + if (sr < 0) { + int err = ethErrorCore(); +#ifdef WINDOWS + if (err == WSAEWOULDBLOCK) { +#else + if (err == EAGAIN || err == EWOULDBLOCK) { +#endif + piMinSleep(); + //piCoutObj << "wait for write"; + continue; + } else { + disconnectFunc(); + return -1; + } + } + remain_data += sr; + remain_size -= sr; + } } - return ret; + return ret;} default: break; } return -1; @@ -862,8 +904,7 @@ PIIODevice::DeviceInfoFlags PIEthernet::deviceInfoFlags() const { switch (type()) { case UDP: return 0; case TCP_Client: - case TCP_Server: - case TCP_SingleTCP: return Sequential | Reliable; + case TCP_Server: return Sequential | Reliable; default: break; } return 0; @@ -890,7 +931,21 @@ void PIEthernet::server_func(void * eth) { } sockaddr_in client_addr; socklen_t slen = sizeof(client_addr); +#ifdef WINDOWS + long wr = ce->waitForEvent(ce->PRIVATEWB->event, FD_ACCEPT | FD_CLOSE); + if (wr != FD_ACCEPT) { + piMSleep(10); + return; + } +#else + if (!ce->PRIVATEWB->event.wait(ce->sock)) { + piMSleep(10); + return; + } +#endif + //piCout << "server" << "accept ..."; int s = accept(ce->sock, (sockaddr * )&client_addr, &slen); + //piCout << "server" << "accept done" << ethErrorString(); if (s == -1) { int lerr = ethErrorCore(); #ifdef WINDOWS @@ -930,6 +985,49 @@ void PIEthernet::setType(Type t, bool reopen) { } +bool PIEthernet::connectTCP() { + ::connect(sock, (sockaddr * )&(PRIVATE->addr_), sizeof(PRIVATE->addr_)); + //piCout << errorString(); +#ifdef WINDOWS + long wr = waitForEvent(PRIVATE->event, FD_CONNECT | FD_CLOSE); + switch (wr) { + case FD_CONNECT: + //piCout << "fd_connect ..."; + return ethIsWriteable(sock); + default: break; + } +#else + if (PRIVATE->event.wait(sock, PIWaitEvent::CheckWrite)) { + if (ethIsWriteable(sock)) return true; + else { + closeSocket(sock); + init(); + } + } +#endif + return false; +} + + +#ifdef WINDOWS +long PIEthernet::waitForEvent(PIWaitEvent & event, long mask) { + if (!event.isCreate() || sock < 0) return 0; + WSAEventSelect(sock, event.getEvent(), mask); + if (event.wait()) { + //DWORD wr = WSAWaitForMultipleEvents(1, &(PRIVATE->read_event), FALSE, WSA_INFINITE, TRUE); + //piCout << "wait result" << wr; + //if (wr == WSA_WAIT_EVENT_0) { + WSANETWORKEVENTS events; + memset(&events, 0, sizeof(events)); + WSAEnumNetworkEvents(sock, event.getEvent(), &events); + //piCout << "wait result" << events.lNetworkEvents; + return events.lNetworkEvents; + } + return 0; +} +#endif + + bool PIEthernet::configureDevice(const void * e_main, const void * e_parent) { PIConfig::Entry * em = (PIConfig::Entry * )e_main; PIConfig::Entry * ep = (PIConfig::Entry * )e_parent; @@ -1333,3 +1431,43 @@ int PIEthernet::ethSetsockoptBool(int sock, int level, int optname, bool value) so = (value ? 1 : 0); return ethSetsockopt(sock, level, optname, &so, sizeof(so)); } + + +void PIEthernet::ethNonblocking(int sock) { + if (sock < 0) return; +#ifdef WINDOWS + u_long mode = 1; + ioctlsocket(sock, FIONBIO, &mode); +#else + fcntl(sock, F_SETFL, O_NONBLOCK); +#endif +} + + +bool PIEthernet::ethIsWriteable(int sock) { +/* fd_set fd_test; + FD_ZERO(&fd_test); + FD_SET(sock, &fd_test); + int fds = 0; +#ifndef WINDOWS + fds = sock + 1; +#endif + timeval timeout; + timeout.tv_sec = timeout.tv_usec = 0; + ::select(fds, nullptr, &fd_test, nullptr, &timeout); + return FD_ISSET(sock, &fd_test);*/ +#ifdef WINDOWS + fd_set fd_test; + FD_ZERO(&fd_test); + FD_SET(sock, &fd_test); + timeval timeout; + timeout.tv_sec = timeout.tv_usec = 0; + ::select(0, nullptr, &fd_test, nullptr, &timeout); + return FD_ISSET(sock, &fd_test); +#else + int ret = 0; + socklen_t len = sizeof(ret); + getsockopt(sock, SOL_SOCKET, SO_ERROR, (char*)&ret, &len); + return ret == 0; +#endif +} diff --git a/libs/main/io_devices/piethernet.h b/libs/main/io_devices/piethernet.h index 95f350bc..06b2dbb9 100644 --- a/libs/main/io_devices/piethernet.h +++ b/libs/main/io_devices/piethernet.h @@ -49,8 +49,7 @@ public: enum Type { UDP /** UDP - User Datagram Protocol */ , TCP_Client /** TCP client - allow connection to TCP server */ , - TCP_Server /** TCP server - receive connections from TCP clients */ , - TCP_SingleTCP /** TCP client single mode - connect & send & disconnect, on each packet */ + TCP_Server /** TCP server - receive connections from TCP clients */ }; //! \brief Parameters of %PIEthernet @@ -315,6 +314,8 @@ public: bool canWrite() const override {return mode() & WriteOnly;} + void interrupt() override; + int socket() const {return sock;} EVENT1(newConnection, PIEthernet * , client); @@ -487,7 +488,7 @@ protected: PRIVATE_DECLARATION(PIP_EXPORT) int sock, sock_s; - bool connected_, connecting_, listen_threaded, server_bounded; + std::atomic_bool connected_, connecting_, listen_threaded, server_bounded; mutable Address addr_r, addr_s, addr_lr; Type eth_type; PIThread server_thread_; @@ -500,6 +501,10 @@ private: EVENT_HANDLER1(void, clientDeleted, PIObject *, o); static void server_func(void * eth); void setType(Type t, bool reopen = true); + bool connectTCP(); +#ifdef WINDOWS + long waitForEvent(PIWaitEvent & event, long mask); +#endif static int ethErrorCore(); static PIString ethErrorString(); @@ -510,6 +515,8 @@ private: static int ethSetsockopt(int sock, int level, int optname, const void * optval, int optlen); static int ethSetsockoptInt(int sock, int level, int optname, int value = 1); static int ethSetsockoptBool(int sock, int level, int optname, bool value = true); + static void ethNonblocking(int sock); + static bool ethIsWriteable(int sock); }; diff --git a/libs/main/io_devices/pifile.cpp b/libs/main/io_devices/pifile.cpp index 69bd4c44..20dae7c3 100644 --- a/libs/main/io_devices/pifile.cpp +++ b/libs/main/io_devices/pifile.cpp @@ -439,9 +439,7 @@ PIByteArray PIFile::get() { ssize_t PIFile::readDevice(void * read_to, ssize_t max_size) { if (!canRead() || PRIVATE->fd == 0) return -1; - reading_now = true; ssize_t ret = fread(read_to, 1, max_size, PRIVATE->fd); - reading_now = false; return ret; } diff --git a/libs/main/io_devices/piiodevice.cpp b/libs/main/io_devices/piiodevice.cpp index c132b702..4cc8b183 100644 --- a/libs/main/io_devices/piiodevice.cpp +++ b/libs/main/io_devices/piiodevice.cpp @@ -131,7 +131,8 @@ PIIODevice::PIIODevice(const PIString & path, PIIODevice::DeviceMode mode): PIOb PIIODevice::~PIIODevice() { - stop(); + destroying = true; + stopAndWait(); } @@ -213,11 +214,11 @@ void PIIODevice::stopThreadedRead() { #ifdef MICRO_PIP read_thread.stop(); #else - if (reading_now) { - read_thread.terminate(); - reading_now = false; + read_thread.stop(); + if (!destroying) { + interrupt(); } else { - read_thread.stop(); + piCoutObj << "Error: Device is running after destructor!"; } #endif } @@ -269,6 +270,13 @@ void PIIODevice::stop() { } +void PIIODevice::stopAndWait(int timeout_ms) { + stop(); + waitThreadedReadFinished(timeout_ms); + waitThreadedWriteFinished(timeout_ms); +} + + ssize_t PIIODevice::read(void * read_to, ssize_t max_size) { ssize_t ret = readDevice(read_to, max_size); return ret; @@ -296,7 +304,6 @@ ssize_t PIIODevice::write(const void * data, ssize_t max_size) { void PIIODevice::_init() { - reading_now = false; setOptions(0); setReopenEnabled(true); setReopenTimeout(1000); @@ -630,7 +637,3 @@ void PIIODevice::configureFromVariantDevice(const PIPropertyStorage & d) { } -void PIIODevice::softStopThreadedRead() { - read_thread.stop(); -} - diff --git a/libs/main/io_devices/piiodevice.h b/libs/main/io_devices/piiodevice.h index b3af0987..d75b69c4 100644 --- a/libs/main/io_devices/piiodevice.h +++ b/libs/main/io_devices/piiodevice.h @@ -277,6 +277,14 @@ public: //! \~russian Останавливает потоковое чтение и запись. void stop(); + //! \~english Stop both threaded read and threaded write and wait for finish. + //! \~russian Останавливает потоковое чтение и запись и ожидает завершения. + void stopAndWait(int timeout_ms = -1); + + //! \~english Interrupt blocking operation. + //! \~russian Прерывает блокирующую операцию. + virtual void interrupt() {} + //! \~english Read from device maximum "max_size" bytes to "read_to" //! \~russian Читает из устройства не более "max_size" байт в "read_to" @@ -382,8 +390,6 @@ public: //! \~russian Пишет в устройство блок памяти "mb" ssize_t write(const PIMemoryBlock & mb) {return write(mb.data(), mb.size());} - void softStopThreadedRead(); - EVENT_VHANDLER(void, flush) {;} EVENT(opened); @@ -537,10 +543,6 @@ protected: bool opened_ = false; void * ret_data_ = nullptr; - //! \~english Set this flag while blocking operations - //! \~russian Устанавливайте этот флаг во время блокирующих операций - std::atomic_bool reading_now; - private: EVENT_HANDLER(void, read_func); EVENT_HANDLER(void, write_func); @@ -557,7 +559,7 @@ private: PIQueue > write_queue; ullong tri = 0; uint threaded_read_buffer_size, reopen_timeout = 1000; - bool reopen_enabled = true; + bool reopen_enabled = true, destroying = false; static PIMutex nfp_mutex; static PIMap nfp_cache; diff --git a/libs/main/io_devices/pipeer.cpp b/libs/main/io_devices/pipeer.cpp index 34d04004..92177ed5 100644 --- a/libs/main/io_devices/pipeer.cpp +++ b/libs/main/io_devices/pipeer.cpp @@ -192,7 +192,7 @@ PIPeer::~PIPeer() { if (p._data) { p._data->dt_in.stop(); p._data->dt_out.stop(); - p._data->t.stop(true); + p._data->t.stopAndWait(); } destroyEths(); piForeach (PIEthernet * i, eths_mcast) { diff --git a/libs/main/io_devices/piserial.cpp b/libs/main/io_devices/piserial.cpp index d781d52a..80bcd7b2 100644 --- a/libs/main/io_devices/piserial.cpp +++ b/libs/main/io_devices/piserial.cpp @@ -22,6 +22,7 @@ #include "piconfig.h" #include "pidir.h" #include "pipropertystorage.h" +#include "piwaitevent_p.h" #include #if defined(MICRO_PIP) @@ -169,24 +170,22 @@ REGISTER_DEVICE(PISerial) PRIVATE_DEFINITION_START(PISerial) + PIWaitEvent event; #ifdef WINDOWS + PIWaitEvent event_write; DCB desc, sdesc; - void * hCom; - DWORD readed, mask; + HANDLE hCom = nullptr; + DWORD readed = 0, mask = 0; + OVERLAPPED overlap, overlap_write; #else termios desc, sdesc; - uint readed; + uint readed = 0; #endif PRIVATE_DEFINITION_END(PISerial) -PISerial::DeviceInfo::DeviceInfo() { - vID = pID = 0; -} - - PIString PISerial::DeviceInfo::id() const { return PIString::fromNumber(vID, 16).toLowerCase().expandLeftTo(4, '0') + ":" + PIString::fromNumber(pID, 16).toLowerCase().expandLeftTo(4, '0'); @@ -209,19 +208,18 @@ PISerial::PISerial(const PIString & device_, PISerial::Speed speed_, PIFlagsevent.destroy(); +#ifdef WINDOWS + PRIVATE->event_write.destroy(); +#endif } void PISerial::construct() { -#ifdef WINDOWS - PRIVATE->hCom = 0; -#endif - fd = -1; - //setPriority(piHigh); - vtime = 10; sending = false; + //setPriority(piHigh); setParameters(0); setSpeed(S115200); setDataBitsCount(8); @@ -628,6 +626,15 @@ bool PISerial::send(const void * data, int size) { } +void PISerial::interrupt() { + //piCoutObj << "interrupt"; + PRIVATE->event.interrupt(); +#ifdef WINDOWS + PRIVATE->event_write.interrupt(); +#endif +} + + bool PISerial::openDevice() { PIString p = path(); //piCout << "ser open" << p; @@ -651,7 +658,7 @@ bool PISerial::openDevice() { if (isReadable()) {ds |= GENERIC_READ; sm |= FILE_SHARE_READ;} if (isWriteable()) {ds |= GENERIC_WRITE; sm |= FILE_SHARE_WRITE;} PIString wp = "//./" + p; - PRIVATE->hCom = CreateFileA(wp.dataAscii(), ds, sm, 0, OPEN_EXISTING, FILE_ATTRIBUTE_SYSTEM, 0); + PRIVATE->hCom = CreateFileA(wp.dataAscii(), ds, sm, 0, OPEN_EXISTING, FILE_ATTRIBUTE_SYSTEM | FILE_FLAG_OVERLAPPED, 0); if (PRIVATE->hCom == INVALID_HANDLE_VALUE) { piCoutObj << "Unable to open \"" << p << "\""; fd = -1; @@ -675,6 +682,11 @@ bool PISerial::openDevice() { //piCoutObj << "Initialized " << p; #endif applySettings(); + PRIVATE->event.create(); +#ifdef WINDOWS + PRIVATE->event_write.create(); +#endif + return true; } @@ -696,6 +708,10 @@ bool PISerial::closeDevice() { #endif fd = -1; } + PRIVATE->event.destroy(); +#ifdef WINDOWS + PRIVATE->event_write.destroy(); +#endif return true; } @@ -796,15 +812,22 @@ ssize_t PISerial::readDevice(void * read_to, ssize_t max_size) { #ifdef WINDOWS if (!canRead()) return -1; if (sending) return -1; -// piCoutObj << "com event ..."; //piCoutObj << "read ..." << PRIVATE->hCom; - reading_now = true; - ReadFile(PRIVATE->hCom, read_to, max_size, &PRIVATE->readed, 0); - reading_now = false; + memset(&(PRIVATE->overlap), 0, sizeof(PRIVATE->overlap)); + PRIVATE->overlap.hEvent = PRIVATE->event.getEvent(); + ReadFile(PRIVATE->hCom, read_to, max_size, NULL, &(PRIVATE->overlap)); + PRIVATE->readed = 0; + if (PRIVATE->event.wait()) { + GetOverlappedResult(PRIVATE->hCom, &(PRIVATE->overlap), &(PRIVATE->readed), FALSE); + } else + return -1; + //piCoutObj << "read done" << PRIVATE->readed; DWORD err = GetLastError(); - //piCout << err; + if (err == ERROR_TIMEOUT && PRIVATE->readed == 0) + return 0; if (err == ERROR_BAD_COMMAND || err == ERROR_ACCESS_DENIED) { - softStopThreadedRead(); + piCoutObj << "read error" << (PRIVATE->readed) << errorString(); + stop(); close(); return 0; } @@ -812,13 +835,12 @@ ssize_t PISerial::readDevice(void * read_to, ssize_t max_size) { return PRIVATE->readed; #else if (!canRead()) return -1; - reading_now = true; + if (!PRIVATE->event.wait(fd)) return -1; ssize_t ret = ::read(fd, read_to, max_size); - reading_now = false; if (ret < 0) { int err = errno; if (err == EBADF || err == EFAULT || err == EINVAL || err == EIO) { - softStopThreadedRead(); + stopThreadedRead(); close(); return 0; } @@ -834,12 +856,17 @@ ssize_t PISerial::writeDevice(const void * data, ssize_t max_size) { return -1; } #ifdef WINDOWS - DWORD wrote; -// piCoutObj << "send ...";// << max_size;// << ": " << PIString((char*)data, max_size); + DWORD wrote(0); + //piCoutObj << "send ..." << max_size;// << ": " << PIString((char*)data, max_size); sending = true; - WriteFile(PRIVATE->hCom, data, max_size, &wrote, 0); + memset(&(PRIVATE->overlap_write), 0, sizeof(PRIVATE->overlap_write)); + PRIVATE->overlap_write.hEvent = PRIVATE->event_write.getEvent(); + WriteFile(PRIVATE->hCom, data, max_size, NULL, &(PRIVATE->overlap_write)); + if (PRIVATE->event_write.wait()) { + GetOverlappedResult(PRIVATE->hCom, &(PRIVATE->overlap_write), &wrote, FALSE); + } sending = false; -// piCoutObj << "send ok";// << wrote << " bytes in " << path(); + //piCoutObj << "send ok" << wrote;// << " bytes in " << path(); #else ssize_t wrote; wrote = ::write(fd, data, max_size); @@ -1150,7 +1177,7 @@ PIVector PISerial::availableDevicesInfo(bool test) { if (test) { for (int i = 0; i < ret.size_s(); ++i) { #ifdef WINDOWS - void * hComm = CreateFileA(ret[i].path.dataAscii(), GENERIC_READ, FILE_SHARE_READ, 0, OPEN_EXISTING, FILE_ATTRIBUTE_SYSTEM, 0); + void * hComm = CreateFileA(ret[i].path.dataAscii(), GENERIC_READ, FILE_SHARE_READ, 0, OPEN_EXISTING, FILE_ATTRIBUTE_SYSTEM | FILE_FLAG_OVERLAPPED, 0); if (hComm == INVALID_HANDLE_VALUE) { #else int fd = ::open(ret[i].path.dataAscii(), O_NOCTTY | O_RDONLY); diff --git a/libs/main/io_devices/piserial.h b/libs/main/io_devices/piserial.h index 54277eb4..aa8bdc16 100644 --- a/libs/main/io_devices/piserial.h +++ b/libs/main/io_devices/piserial.h @@ -90,19 +90,17 @@ public: //! \~english Information about serial device //! \~russian Информация о последовательном устройстве struct PIP_EXPORT DeviceInfo { - DeviceInfo(); - //! \~english Returns string representation of USB ID in format "xxxx:xxxx" (vID:pID) //! \~russian Возвращает строковое представление USB ID в формате "xxxx:xxxx" (vID:pID) PIString id() const; //! \~english USB Vendor ID //! \~russian USB Vendor ID - uint vID; + uint vID = 0; //! \~english USB Product ID //! \~russian USB Product ID - uint pID; + uint pID = 0; //! \~english Path to device, e.g. "COM2" or "/dev/ttyUSB0" //! \~russian Путь к устройству, например "COM2" или "/dev/ttyUSB0" @@ -240,6 +238,8 @@ public: //! \~russian Пишет в порт байтовый массив "data". Возвращает если количество записанных байт = размер "data" bool send(const PIByteArray & data) {return send(data.data(), data.size_s());} + void interrupt() override; + //! \~english Returns all available speeds for serial devices //! \~russian Возвращает все возможные скорости для устройств static PIVector availableSpeeds(); @@ -310,8 +310,8 @@ protected: bool closeDevice() override; PRIVATE_DECLARATION(PIP_EXPORT) - int fd, vtime; - bool sending; + int fd = -1, vtime = 10; + std::atomic_bool sending; PITimeMeasurer tm_; }; diff --git a/libs/main/io_utils/piconnection.cpp b/libs/main/io_utils/piconnection.cpp index 79af1255..90f36bd5 100644 --- a/libs/main/io_utils/piconnection.cpp +++ b/libs/main/io_utils/piconnection.cpp @@ -1114,7 +1114,10 @@ PIVector PIConnection::DevicePool::boundedDevices(const PIConnect PIConnection::DevicePool::DeviceData::~DeviceData() { if (rthread) { - rthread->terminate(); + rthread->stop(); + if (dev) dev->interrupt(); + if (!rthread->waitForFinish(1000)) + rthread->terminate(); delete rthread; rthread = nullptr; } @@ -1146,8 +1149,13 @@ void __DevicePool_threadReadDP(void * ddp) { } if (dev->isClosed()) { if (!dev->open()) { - piMSleep(dev->reopenTimeout()); - return; + PITimeMeasurer tm; + int timeout = dev->reopenTimeout(); + while (tm.elapsed_m() < timeout) { + if (dd->rthread->isStopping()) + return; + piMSleep(50); + } } } PIByteArray ba; diff --git a/libs/main/piplatform.h b/libs/main/piplatform.h index 548cbe29..d5125e07 100644 --- a/libs/main/piplatform.h +++ b/libs/main/piplatform.h @@ -100,4 +100,8 @@ #endif +#if defined(LINUX) || defined(MAC_OS) || defined(ANDROID) +# define POSIX_SIGNALS +#endif + #endif // PIPLATFORM_H diff --git a/libs/main/thread/pithread.cpp b/libs/main/thread/pithread.cpp index 1f4e3d2b..ee5d388f 100644 --- a/libs/main/thread/pithread.cpp +++ b/libs/main/thread/pithread.cpp @@ -23,6 +23,9 @@ #ifndef MICRO_PIP # include "pisystemtests.h" #endif +#ifdef WINDOWS +# include +#endif #include #if defined(WINDOWS) # define __THREAD_FUNC_RET__ uint __stdcall @@ -482,7 +485,7 @@ void __PIThreadCollection::stoppedAuto() { auto_mutex.lock(); auto_threads_.removeAll(t); auto_mutex.unlock(); - delete t; + t->deleteLater(); } @@ -591,10 +594,35 @@ PIThread::~PIThread() { } -void PIThread::stop(bool wait) { +void PIThread::stopAndWait(int timeout_ms) { + stop(); + waitForFinish(timeout_ms); +} + + +#ifdef WINDOWS +NTAPI void winThreadAPC(ULONG_PTR) { + //piCout << "APC"; +} +#endif + +void PIThread::interrupt() { + if (PRIVATE->thread == 0) return; + piCout << "PIThread::interrupt"; +#ifdef WINDOWS + CancelSynchronousIo(PRIVATE->thread); + QueueUserAPC(winThreadAPC, PRIVATE->thread, 0); +#else +# ifdef POSIX_SIGNALS + pthread_kill(PRIVATE->thread, PIP_INTERRUPT_SIGNAL); +# endif +#endif +} + + +void PIThread::stop() { //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "stop ..." << running_ << wait; terminating = true; - if (wait) waitForFinish(); } diff --git a/libs/main/thread/pithread.h b/libs/main/thread/pithread.h index 01c9ab05..948cbefd 100644 --- a/libs/main/thread/pithread.h +++ b/libs/main/thread/pithread.h @@ -107,9 +107,14 @@ public: bool start(std::function func, int timer_delay) {ret_func = [func](void*){func();}; return start(timer_delay);} EVENT_HANDLER0(bool, startOnce); EVENT_HANDLER1(bool, startOnce, ThreadFunc, func) {ret_func = func; return startOnce();} - EVENT_HANDLER0(void, stop) {stop(false);} - EVENT_HANDLER1(void, stop, bool, wait); + EVENT_HANDLER0(void, stop); EVENT_HANDLER0(void, terminate); + + //! \~english Stop thread and wait for finish. + //! \~russian Останавливает поток и ожидает завершения. + void stopAndWait(int timeout_ms = -1); + + void interrupt(); //! \~english Set common data passed to external function //! \~russian Устанавливает данные, передаваемые в функцию потока @@ -195,7 +200,7 @@ public: //! \~english Start thread without internal loop //! \~russian Запускает поток без внутреннего цикла - //! \fn void stop(bool wait = false) + //! \fn void stop() //! \brief //! \~english Stop thread //! \~russian Останавливает поток @@ -203,7 +208,7 @@ public: //! \fn void terminate() //! \brief //! \~english Strongly stop thread - //! \~russian Жестко останавливает поток + //! \~russian Жёстко прерывает поток //! \fn bool waitForStart(int timeout_msecs = -1) //! \brief diff --git a/libs/main/thread/pithreadpoolloop.cpp b/libs/main/thread/pithreadpoolloop.cpp index 47b9cbb2..18548c98 100644 --- a/libs/main/thread/pithreadpoolloop.cpp +++ b/libs/main/thread/pithreadpoolloop.cpp @@ -114,7 +114,7 @@ PIThreadPoolLoop::PIThreadPoolLoop(int thread_cnt) { PIThreadPoolLoop::~PIThreadPoolLoop() { for (auto * t: threads) { - t->stop(false); + t->stop(); if (!t->waitForFinish(100)) t->terminate(); delete t; @@ -135,7 +135,7 @@ void PIThreadPoolLoop::start(int index_start, int index_count) { while (1) { int cc = counter.fetch_add(1); if (cc >= end) { - t->stop(false); + t->stop(); return; } func(cc); diff --git a/libs/main/thread/pitimer.cpp b/libs/main/thread/pitimer.cpp index f629bb1e..58cd12ee 100644 --- a/libs/main/thread/pitimer.cpp +++ b/libs/main/thread/pitimer.cpp @@ -19,6 +19,7 @@ #include "pitimer.h" #include "piincludes_p.h" +#include "piconditionvar.h" #ifdef PIP_TIMER_RT # include #endif @@ -127,14 +128,14 @@ void _PITimerBase::setInterval(double i) { interval_ = i; if (isRunning()) { //piCout << "change interval runtime"; - stop(true); + stop(); start(); } } bool _PITimerBase::start(double interval_ms) { - if (isRunning()) stop(true); + if (isRunning()) stop(); deferred_ = false; setInterval(interval_ms); //piCout << "_PITimerBase::startTimer"<threadFunc();} void adjustTimes(); + bool smallWait(int ms); PIThread thread_; PISystemTime st_time, st_inc, st_wait, st_odt; + PIConditionVariable event; }; @@ -201,8 +204,8 @@ public: virtual ~_PITimerImp_RT(); protected: private: - virtual bool startTimer(double interval_ms); - virtual bool stopTimer(bool wait); + bool startTimer(double interval_ms) override; + bool stopTimer() override; int ti; _PITimerImp_RT_Private_ * priv; @@ -213,7 +216,7 @@ private: class _PITimerImp_Pool: public _PITimerImp_Thread { public: _PITimerImp_Pool(); - virtual ~_PITimerImp_Pool() {stop(true);} + virtual ~_PITimerImp_Pool() {stop();} private: class Pool: public PIThread { public: @@ -226,8 +229,8 @@ private: explicit Pool(); virtual ~Pool(); }; - virtual bool startTimer(double interval_ms); - virtual bool stopTimer(bool wait); + bool startTimer(double interval_ms) override; + bool stopTimer() override; }; @@ -235,19 +238,14 @@ private: _PITimerImp_Thread::_PITimerImp_Thread() { thread_.setName("__S__PITimerImp_Thread::thread"); - wait_dt = 100; - wait_dd = 200; - wait_tick = 10; + wait_dt = 1000; + wait_dd = 2000; + wait_tick = 1000; //piCout << "_PITimerImp_Thread" << this << ", thread& =" << &thread_; //piCout << "new _PITimerImp_Thread"; } -_PITimerImp_Thread::~_PITimerImp_Thread() { - stop(true); -} - - void _PITimerImp_Thread::prepareStart(double interval_ms) { if (interval_ms <= 0.) { piCout << "Achtung! Start PITimer with interval <= 0!"; @@ -273,16 +271,14 @@ bool _PITimerImp_Thread::startTimer(double interval_ms) { } -bool _PITimerImp_Thread::stopTimer(bool wait) { -#ifndef FREERTOS - thread_.stop(wait); -#else +bool _PITimerImp_Thread::stopTimer() { thread_.stop(); - if (wait) - if (!thread_.waitForFinish(10)) - if (thread_.isRunning()) - thread_.terminate(); -#endif + event.notifyAll(); + thread_.waitForFinish(); +// if (wait) +// if (!thread_.waitForFinish(10)) +// if (thread_.isRunning()) +// thread_.terminate(); return true; } @@ -299,10 +295,10 @@ bool _PITimerImp_Thread::threadFunc() { dwt = st_time - PISystemTime::current(true); if (wth > 0) { if (dwt.toMilliseconds() > wth + 1.) { - piMSleep(wth); + smallWait(wth); return false; } else { - dwt.sleep(); + if (!smallWait(dwt.toMilliseconds())) return false; deferred_ = false; st_time = PISystemTime::current(true); } @@ -321,11 +317,11 @@ bool _PITimerImp_Thread::threadFunc() { } if (wait_tick > 0) { if (st_wait.toMilliseconds() > wait_tick + 1.) { - piMSleep(wait_tick); + smallWait(wait_tick); return false; } else { //piCout << &thread_ << "sleep for" << st_wait; - st_wait.sleep(); + if (!smallWait(st_wait.toMilliseconds())) return false; } } else { if (st_wait.toMilliseconds() > 0.1) @@ -366,6 +362,17 @@ void _PITimerImp_Thread::adjustTimes() { } +bool _PITimerImp_Thread::smallWait(int ms) { + if (thread_.isStopping()) return false; + if (ms > 0) { + thread_.mutex().lock(); + event.waitFor(thread_.mutex(), ms); + thread_.mutex().unlock(); + } + return !thread_.isStopping(); +} + + #ifdef PIP_TIMER_RT @@ -392,7 +399,7 @@ _PITimerImp_RT::_PITimerImp_RT() { _PITimerImp_RT::~_PITimerImp_RT() { - stop(true); + stop(); delete priv; } @@ -425,7 +432,7 @@ bool _PITimerImp_RT::startTimer(double interval_ms) { } -bool _PITimerImp_RT::stopTimer(bool wait) { +bool _PITimerImp_RT::stopTimer() { if (ti < 0) return true; timer_delete(priv->tt); ti = -1; @@ -509,7 +516,7 @@ bool _PITimerImp_Pool::startTimer(double interval_ms) { } -bool _PITimerImp_Pool::stopTimer(bool wait) { +bool _PITimerImp_Pool::stopTimer() { Pool::instance()->remove(this); return true; } @@ -621,7 +628,7 @@ void PITimer::init() const { void PITimer::destroy() { if (!imp) return; //piCout << this << "destroy" << imp; - imp->stop(false); ///BUG: WTF FreeRTOS segfault on this! + imp->stop(); delete imp; imp = 0; } @@ -711,31 +718,14 @@ void PITimer::startDeferred(double interval_ms, PIDateTime start_datetime) { bool PITimer::restart() { init(); - imp->stop(true); + imp->stop(); return imp->start(); } bool PITimer::stop() { - return stop(true); -} - - -bool PITimer::stop(bool wait) { init(); //piCout << this << "stop" << imp << wait; - return imp->stop(wait); + return imp->stop(); } - -bool PITimer::waitForFinish(int timeout_msecs) { - if (timeout_msecs < 0) { - while (isRunning()) - piMinSleep(); - return true; - } - PITimeMeasurer tm; - while (isRunning() && tm.elapsed_m() < timeout_msecs) - piMinSleep(); - return tm.elapsed_m() < timeout_msecs; -} diff --git a/libs/main/thread/pitimer.h b/libs/main/thread/pitimer.h index 6a330005..73dd5ac6 100644 --- a/libs/main/thread/pitimer.h +++ b/libs/main/thread/pitimer.h @@ -53,7 +53,7 @@ public: void startDeferred(PIDateTime start_datetime) {startDeferred(interval_, start_datetime);} void startDeferred(double interval_ms, PIDateTime start_datetime); - bool stop(bool wait); + bool stop(); typedef void(*TickFunc)(PITimer*); TickFunc tfunc; @@ -62,7 +62,7 @@ public: protected: virtual bool startTimer(double interval_ms) = 0; - virtual bool stopTimer(bool wait) = 0; + virtual bool stopTimer() = 0; double interval_, deferred_delay; bool deferred_, deferred_mode; // mode: true - date, false - delay @@ -161,10 +161,7 @@ public: void startDeferred(double interval_ms, PIDateTime start_datetime); EVENT_HANDLER0(bool, stop); - EVENT_HANDLER1(bool, stop, bool, wait); - bool waitForFinish() {return waitForFinish(-1);} - bool waitForFinish(int timeout_msecs); - + //! \~english Set custom data //! \~russian Установить данные, передаваемые в метод таймера void setData(void * data_) {data_t = data_;} diff --git a/main.cpp b/main.cpp index b5e9a8d1..2b673b70 100644 --- a/main.cpp +++ b/main.cpp @@ -3,6 +3,7 @@ #include "pibytearray.h" #include "pimathbase.h" #include "pijson.h" +#include "piwaitevent_p.h" using namespace PICoutManipulators; @@ -12,43 +13,217 @@ typedef PIVector PIVariantVector; REGISTER_VARIANT(PIVariantMap); REGISTER_VARIANT(PIVariantVector); -const char J[] = - "[ \n" - "{ \n" - " \"idFligth\":\"456123\", \n" - " \"FligthPath\": \"d:/orders/nicirt/BSK-52(BBR)/219031.001/EBN-NM-BBR.IMG\",\n" - " \"FligthDataPath\": \"\", \n" - " \"Passport\": \n" - " { \n" - " \"id\": \"\", \n" - " \"TypePlane\": \"\", \n" - " \"FA_PLANEBORT\": \"Ka52-10\" \n" - " } \n" - " }, [1.1,2,3,4,{\"a\":null},{\"bool\":true,\"bool2\":false}] \n" - "] \n" -; +#ifdef WINDOWS +# include +# include +# include +# include +typedef HANDLE pipe_type; +#else +# include +typedef int pipe_type; +#endif +struct Pipe { + pipe_type fd_read = 0; + pipe_type fd_write = 0; + void create() { +#ifdef WINDOWS + CreatePipe(&fd_read, &fd_write, NULL, 0); +#else + pipe((int*)this); +#endif + } + void destoy() { +#ifdef WINDOWS + CloseHandle(fd_read); + CloseHandle(fd_write); +#else + close(fd_read); + close(fd_write); +#endif + } + int read(void * d, int s) { +#ifdef WINDOWS + DWORD ret(0); + ReadFile(fd_read, d, s, &ret, NULL); + return ret; +#else + return ::read(fd_read, d, s); +#endif + } + int write(void * d, int s) { +#ifdef WINDOWS + DWORD ret(0); + WriteFile(fd_write, d, s, &ret, NULL); + return ret; +#else + return ::write(fd_write, d, s); +#endif + } +}; + +constexpr int count = 4; +Pipe pipes[count]; + +class T: public PIThread { +public: + T(int index): PIThread() {ind = index; pipe = pipes[index];} + void run() { + PIByteArray data(1024); + piCout << "[T"< threads; + piCout << "main start"; + for (int i = 0; i < count; ++i) { + T * t = new T(i); + threads << t; + t->startOnce(); } - piCout << json; + piMSleep(100); + for (int i = 0; i < count; ++i) { + //pipes[i].write((void*)"string", 7); + piMSleep(500); + } + piCout << "main wait"; + for (int i = 0; i < count; ++i) { + threads[i]->interrupt(); + threads[i]->waitForFinish(); + piCout << "main T" << i << "done"; + } + piCout << "main end"; + for (int i = 0; i < count; ++i) { + pipes[i].destoy(); + delete threads[i]; + }*/ + + //PIEthernet eth(PIEthernet::UDP), seth(PIEthernet::UDP); + //eth.setReadAddress("127.0.0.1", 50000); + //eth.startThreadedRead(); + //piCout << eth.open(); + + PIByteArray req = PIByteArray::fromHex("205e011000000000ef"); + PISerial ser; + ser.setSpeed(PISerial::S9600); + ser.setOption(PIIODevice::BlockingRead, false); + ser.setVTime(200); + ser.open("COM3"); + CONNECTL(&ser, threadedReadEvent, ([](const uchar * data, ssize_t size){ + piCout << "*ser readed" << size; + })); + PIThread thread; + thread.start([&](void*){ + piCout << "[T] start"; + PIByteArray data = ((PIIODevice*)&ser)->read(1024); + piCout << "[T] readed" << data.size();// << errorString(); + piCout << "[T] end"; + }, 200); + //ser.startThreadedRead(); + + piSleep(1); + ser.write(req); + phase("Send"); + + piSleep(2); + phase("End"); + + /* + PIEthernet eth(PIEthernet::TCP_Client), seth(PIEthernet::TCP_Server), * server_client = nullptr; + + seth.listen("127.0.0.1", 50000, true); + //seth.startThreadedRead(); + CONNECTL(&seth, newConnection, ([&server_client](PIEthernet * e){ + server_client = e; + e->setName("TCP SC"); + piCout << "newConn" << e; + CONNECTL(e, threadedReadEvent, ([](const uchar * data, ssize_t size){ + piCout << "*TCP SC* readed" << size; + })); + CONNECTL(e, disconnected, ([](bool error){ + piCout << "*TCP SC* disconnected" << error; + })); + e->startThreadedRead(); + })); + + eth.setName("TCP CC"); + //eth.setParameter(PIEthernet::KeepConnection, false); + CONNECTL(ð, connected, ([ð](){ + piCout << "*TCP CC* connected"; + eth.send("byte", 5); + })); + CONNECTL(ð, disconnected, ([](bool error){ + piCout << "*TCP CC* disconnected" << error; + })); + + piMSleep(500); + phase("Connect"); + eth.connect("127.0.0.1", 50000); + eth.startThreadedRead(); + + piMSleep(500); + phase("Send 5"); + piCout << "c-ing" << eth.isConnecting(); + piCout << "c- ed" << eth.isConnected(); + eth.send("byte", 5); + + piMSleep(500); + phase("Send 6"); + eth.send("bytes", 6); + + piMSleep(500); + phase("Disconnect"); + if (server_client) + server_client->close(); + //eth.close(); + + piMSleep(500); + phase("END"); + */ return 0; } diff --git a/main_picloud_test.cpp b/main_picloud_test.cpp index 5afd928b..54171b88 100644 --- a/main_picloud_test.cpp +++ b/main_picloud_test.cpp @@ -11,15 +11,15 @@ int main(int argc, char * argv[]) { PICloudClient c("127.0.0.1:10101"); // c.setReopenEnabled(true); PICloudServer s("127.0.0.1:10101"); - PIVector clients; + auto clients = new PIVector(); CONNECTL(&tm, tickEvent, ([&](void *, int){ if (c.isConnected()) { PIString str = "ping"; piCout << "[Client] send:" << str; c.write(str.toByteArray()); } - if (s.isRunning()) { - for (auto cl : clients) { + if (s.isThreadedRead()) { + for (auto cl : *clients) { if (cl->isOpened()) { PIString str = "ping_S"; piCout << "[Server] send to" << cl << ":" << str; @@ -40,7 +40,7 @@ int main(int argc, char * argv[]) { CONNECTL(&c, disconnected, ([](){piCout << "disconnected";})); CONNECTL(&s, newConnection, ([&](PICloudServer::Client * cl){ piCout << "[Server] new client:" << cl; - clients << cl; + clients->append(cl); CONNECTL(cl, threadedReadEvent, ([cl, &rnd](const uchar * readed, ssize_t size){ PIByteArray ba(readed, size); PIString str = PIString(ba); @@ -51,9 +51,10 @@ int main(int argc, char * argv[]) { } })); CONNECTL(cl, closed, ([&clients, cl](){ + piCout << "[Server] client closed ..." << cl; cl->stop(); - clients.removeAll(cl); - cl->deleteLater(); + clients->removeAll(cl); + piCout << "[Server] client closed ok" << cl; })); cl->startThreadedRead(); })); diff --git a/utils/cloud_dispatcher/dispatcherclient.cpp b/utils/cloud_dispatcher/dispatcherclient.cpp index b2d53fd4..df6cd480 100644 --- a/utils/cloud_dispatcher/dispatcherclient.cpp +++ b/utils/cloud_dispatcher/dispatcherclient.cpp @@ -3,7 +3,6 @@ DispatcherClient::DispatcherClient(PIEthernet * eth_, int id) : authorised(false), eth(eth_), streampacker(eth_), tcp(&streampacker), client_id(id) { - CONNECTU(&disconnect_tm, tickEvent, eth, close); CONNECTU(&streampacker, packetReceiveEvent, this, readed); CONNECTU(eth, disconnected, this, disconnected); piCoutObj << "client connected" << client_id << eth->sendAddress(); @@ -12,7 +11,6 @@ DispatcherClient::DispatcherClient(PIEthernet * eth_, int id) : authorised(false void DispatcherClient::start() { eth->startThreadedRead(); - //disconnect_tm.start(10000); } @@ -27,18 +25,18 @@ PIString DispatcherClient::address() { void DispatcherClient::close() { - eth->softStopThreadedRead(); - eth->close(); + eth->stopAndWait(); } void DispatcherClient::sendConnected(uint client_id) { - //piCoutObj << "sendConnected"; + piCoutObj << "sendConnected" << client_id; tcp.sendConnected(client_id); } void DispatcherClient::sendDisconnected(uint client_id) { + piCoutObj << "sendDisconnected" << client_id; tcp.sendDisconnected(client_id); } @@ -61,7 +59,7 @@ void DispatcherClient::authorise(bool ok) { void DispatcherClient::disconnected(bool withError) { - //piCoutObj << "client disconnected" << eth->sendAddress(); + piCoutObj << "client disconnected" << withError << eth->sendAddress(); disconnectEvent(this); } @@ -70,16 +68,18 @@ void DispatcherClient::readed(PIByteArray & ba) { PIPair hdr = tcp.parseHeader(ba); // piCoutObj << "readed" << hdr.first << hdr.second; if (hdr.first == PICloud::TCP::InvalidType) { - disconnected(true); piCoutObj << "invalid message"; + disconnected(true); return; } if (authorised) { if (hdr.second == tcp.role()) { switch (hdr.first) { case PICloud::TCP::Connect: + piCoutObj << "PICloud::TCP::Connect"; return; case PICloud::TCP::Disconnect: + piCoutObj << "PICloud::TCP::Disconnect"; disconnected(false); return; case PICloud::TCP::Data: @@ -113,9 +113,11 @@ void DispatcherClient::readed(PIByteArray & ba) { return; } case PICloud::TCP::Disconnect: + piCoutObj << "unauthorised PICloud::TCP::Disconnect"; disconnected(false); return; default: + piCoutObj << "authorised invalid message"; disconnected(true); return; } diff --git a/utils/cloud_dispatcher/dispatcherclient.h b/utils/cloud_dispatcher/dispatcherclient.h index 92497e4d..9284a7eb 100644 --- a/utils/cloud_dispatcher/dispatcherclient.h +++ b/utils/cloud_dispatcher/dispatcherclient.h @@ -33,7 +33,6 @@ private: EVENT_HANDLER1(void, readed, PIByteArray &, data); EVENT_HANDLER1(void, disconnected, bool, withError); - PITimer disconnect_tm; std::atomic_bool authorised; PIEthernet * eth; PIStreamPacker streampacker; diff --git a/utils/cloud_dispatcher/dispatcherserver.cpp b/utils/cloud_dispatcher/dispatcherserver.cpp index 29363905..9a9b84bb 100644 --- a/utils/cloud_dispatcher/dispatcherserver.cpp +++ b/utils/cloud_dispatcher/dispatcherserver.cpp @@ -21,7 +21,7 @@ DispatcherServer::~DispatcherServer() { void DispatcherServer::start() { eth.listen(true); - timeout_timer.start(5000); + timeout_timer.start(2000); piCoutObj << "server started" << eth.readAddress(); } @@ -181,7 +181,7 @@ void DispatcherServer::disconnectClient(DispatcherClient *client) { //piCoutObj << "INVALID client" << client; return; } - piCoutObj << "remove" << client->clientId(); + piCoutObj << "remove ..." << client->clientId(); map_mutex.lock(); clients.removeAll(client); rm_clients.removeAll(client); @@ -206,9 +206,10 @@ void DispatcherServer::disconnectClient(DispatcherClient *client) { cc->removeClient(client); index_c_clients.remove(client); } - client->close(); + //client->close(); rmrf_clients << client; map_mutex.unlock(); + piCoutObj << "remove done" << client->clientId(); } diff --git a/utils/system_daemon/daemon.cpp b/utils/system_daemon/daemon.cpp index 8d827d24..ec51c764 100644 --- a/utils/system_daemon/daemon.cpp +++ b/utils/system_daemon/daemon.cpp @@ -24,7 +24,7 @@ Daemon::Remote::Remote(const PIString & n): PIThread() { Daemon::Remote::~Remote() { shellClose(); ft.stop(); - stop(true); + stopAndWait(); } @@ -41,7 +41,6 @@ void Daemon::Remote::shellClose() { if (!term) return; piCoutObj << "shell close"; term_timer.stop(); - term_timer.waitForFinish(1000); term->destroy(); delete term; term = 0; @@ -331,6 +330,7 @@ Daemon::~Daemon() { delete r; } remotes.clear(); + delete _self; }