diff --git a/libs/cloud/picloudclient.cpp b/libs/cloud/picloudclient.cpp index 5ebf9d70..df32160d 100644 --- a/libs/cloud/picloudclient.cpp +++ b/libs/cloud/picloudclient.cpp @@ -45,10 +45,11 @@ PICloudClient::PICloudClient(const PIString & path, PIIODevice::DeviceMode mode) PICloudClient::~PICloudClient() { //piCoutObj << "~PICloudClient()"; - softStopThreadedRead(); + //softStopThreadedRead(); //eth.close(); //if (is_connected) disconnected(); - close(); + //close(); + stopAndWait(); //piCoutObj << "~PICloudClient() closed"; internalDisconnect(); // stop(false); @@ -69,6 +70,11 @@ void PICloudClient::setKeepConnection(bool on) { } +void PICloudClient::interrupt() { + eth.interrupt(); +} + + bool PICloudClient::openDevice() { //piCoutObj << "open";// << path(); bool op = eth.connect(PIEthernet::Address::resolve(path()), false); diff --git a/libs/cloud/picloudserver.cpp b/libs/cloud/picloudserver.cpp index a5fbe927..931038bc 100644 --- a/libs/cloud/picloudserver.cpp +++ b/libs/cloud/picloudserver.cpp @@ -41,7 +41,7 @@ PICloudServer::PICloudServer(const PIString & path, PIIODevice::DeviceMode mode) PICloudServer::~PICloudServer() { - stop(); + stopAndWait(); close(); } @@ -107,6 +107,11 @@ void PICloudServer::stopThreadedReadDevice() { } +void PICloudServer::interrupt() { + eth.interrupt(); +} + + void PICloudServer::clientDisconnect(uint client_id) { tcp.sendDisconnected(client_id); } @@ -170,6 +175,11 @@ ssize_t PICloudServer::Client::writeDevice(const void * data, ssize_t size) { } +void PICloudServer::Client::interrupt() { + cond_buff.notifyOne(); +} + + void PICloudServer::Client::pushBuffer(const PIByteArray & ba) { if (!is_connected) return; mutex_buff.lock(); diff --git a/libs/main/cloud/picloudclient.h b/libs/main/cloud/picloudclient.h index a4c55e6f..4fb58d9b 100644 --- a/libs/main/cloud/picloudclient.h +++ b/libs/main/cloud/picloudclient.h @@ -43,6 +43,7 @@ public: void setKeepConnection(bool on); bool isConnected() const {return is_connected;} ssize_t bytesAvailable() const override {return buff.size();} + void interrupt() override; EVENT(connected); EVENT(disconnected); diff --git a/libs/main/cloud/picloudserver.h b/libs/main/cloud/picloudserver.h index b253b40e..4f8d110d 100644 --- a/libs/main/cloud/picloudserver.h +++ b/libs/main/cloud/picloudserver.h @@ -51,6 +51,7 @@ public: ssize_t writeDevice(const void * data, ssize_t size) override; DeviceInfoFlags deviceInfoFlags() const override {return PIIODevice::Reliable;} ssize_t bytesAvailable() const override {return buff.size();} + void interrupt() override; private: void pushBuffer(const PIByteArray & ba); @@ -74,6 +75,7 @@ protected: ssize_t readDevice(void * read_to, ssize_t max_size) override; ssize_t writeDevice(const void * data, ssize_t max_size) override; void stopThreadedReadDevice() override; + void interrupt() override; private: EVENT_HANDLER1(void, _readed, PIByteArray &, ba); diff --git a/libs/main/core/piincludes.h b/libs/main/core/piincludes.h index e867a756..c348f2ea 100644 --- a/libs/main/core/piincludes.h +++ b/libs/main/core/piincludes.h @@ -44,6 +44,7 @@ class PIInit; #endif class PIChar; class PICout; +class PIWaitEvent; struct lconv; diff --git a/libs/main/core/piwaitevent_p.cpp b/libs/main/core/piwaitevent_p.cpp new file mode 100644 index 00000000..f4a832dc --- /dev/null +++ b/libs/main/core/piwaitevent_p.cpp @@ -0,0 +1,126 @@ +/* + PIP - Platform Independent Primitives + Private PIP wait object + Ivan Pelipenko peri4ko@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "piwaitevent_p.h" +#ifdef WINDOWS +//# ifdef _WIN32_WINNT +//# undef _WIN32_WINNT +//# define _WIN32_WINNT 0x0600 +//# endif +# include +#else +# include +# include +#endif +#include "piincludes_p.h" +#include "pistring.h" + + +PIWaitEvent::~PIWaitEvent() { + destroy(); +} + + +void PIWaitEvent::create() { + destroy(); +#ifdef WINDOWS + event = CreateEventA(NULL, TRUE, FALSE, NULL); + if (!event) { + piCout << "Error with CreateEventA:" << errorString(); + } +#else + for (int i = 0; i < 3; ++i) memset(&(fds[i]), 0, sizeof(fds[i])); + if (::pipe(pipe_fd) < 0) { + piCout << "Error with pipe:" << errorString(); + } else { + fcntl(pipe_fd[ReadEnd], F_SETFL, O_NONBLOCK); + } +#endif +} + + +void PIWaitEvent::destroy() { +#ifdef WINDOWS + if (event) { + CloseHandle(event); + event = NULL; + } +#else + for (int i = 0; i < 2; ++i) { + if (pipe_fd[i] != 0) { + ::close(pipe_fd[i]); + pipe_fd[i] = 0; + } + } +#endif +} + + +#ifdef WINDOWS +bool PIWaitEvent::wait() { +#else +bool PIWaitEvent::wait(int fd, PIWaitEvent::SelectRole role) { +#endif + if (!isCreate()) return false; +#ifdef WINDOWS + DWORD ret = WaitForSingleObjectEx(event, INFINITE, TRUE); + ResetEvent(event); + if (ret == WAIT_IO_COMPLETION || ret == WAIT_FAILED) return false; +#else + int nfds = piMaxi(pipe_fd[ReadEnd], fd) + 1; + int fd_index = 0; + switch (role) { + case CheckRead: + fd_index = 0; + break; + case CheckWrite: + fd_index = 1; + break; + } + for (int i = 0; i < 3; ++i) FD_ZERO(&(fds[i])); + FD_SET(pipe_fd[ReadEnd], &(fds[0])); + FD_SET(fd, &(fds[2])); + FD_SET(fd, &(fds[fd_index])); + ::select(nfds, &(fds[0]), &(fds[1]), &(fds[2]), nullptr); + int buf = 0; + while (::read(pipe_fd[ReadEnd], &buf, sizeof(buf)) > 0); + if (FD_ISSET(fd, &(fds[2]))) return false; + return FD_ISSET(fd, &(fds[fd_index])); +#endif + return true; +} + + +void PIWaitEvent::interrupt() { + if (!isCreate()) return; +#ifdef WINDOWS + SetEvent(event); +#else + ::write(pipe_fd[WriteEnd], "", 1); +#endif +} + + +bool PIWaitEvent::isCreate() const { +#ifdef WINDOWS + return event; +#else + return pipe_fd[ReadEnd] != 0; +#endif +} diff --git a/libs/main/core/piwaitevent_p.h b/libs/main/core/piwaitevent_p.h new file mode 100644 index 00000000..e5c7ee73 --- /dev/null +++ b/libs/main/core/piwaitevent_p.h @@ -0,0 +1,67 @@ +/* + PIP - Platform Independent Primitives + Private PIP wait object + Ivan Pelipenko peri4ko@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef PIWAITEVENT_P_H +#define PIWAITEVENT_P_H + +#include "pibase.h" +#ifdef WINDOWS +# include +# include +# include +#else +# include +# include +#endif + + +class PIWaitEvent { +public: + ~PIWaitEvent(); + + void create(); + void destroy(); +#ifdef WINDOWS + bool wait(); +#else + enum SelectRole { + CheckRead, + CheckWrite + }; + bool wait(int fd, SelectRole role = CheckRead); +#endif + void interrupt(); + bool isCreate() const; + +#ifdef WINDOWS + HANDLE event = NULL; +#else +private: + int pipe_fd[2] = {0, 0}; + fd_set fds[3]; + enum { + ReadEnd = 0, + WriteEnd = 1 + }; +#endif + +}; + + +#endif // PIWAITEVENT_P_H diff --git a/libs/main/io_devices/piethernet.cpp b/libs/main/io_devices/piethernet.cpp index 59013625..18d0f4e2 100644 --- a/libs/main/io_devices/piethernet.cpp +++ b/libs/main/io_devices/piethernet.cpp @@ -67,6 +67,7 @@ # endif # endif #endif +#include "piwaitevent_p.h" #include @@ -212,18 +213,7 @@ PRIVATE_DEFINITION_START(PIEthernet) sockaddr_in addr_; sockaddr_in saddr_; sockaddr_in raddr_; -#ifdef WINDOWS - WSAEVENT read_event = nullptr; - void createEvent() { - closeEvent(); - read_event = WSACreateEvent(); - } - void closeEvent() { - if (!read_event) return; - WSACloseEvent(read_event); - read_event = nullptr; - } -#endif + PIWaitEvent event; PRIVATE_DEFINITION_END(PIEthernet) @@ -252,23 +242,18 @@ PIEthernet::PIEthernet(int sock_, PIString ip_port): PIIODevice("", ReadWrite) { setParameters(PIEthernet::ReuseAddress | PIEthernet::MulticastLoop); setType(TCP_Client, false); setPath(ip_port); -#ifdef WINDOWS - u_long mode = 1; - ioctlsocket(sock, FIONBIO, &mode); - PRIVATE->createEvent(); -#endif + ethNonblocking(sock); + PRIVATE->event.create(); //piCoutObj << "new tcp client" << sock_; } PIEthernet::~PIEthernet() { - //piCout << "~PIEthernet" << uint(this); - stop(); + //piCout << "~PIEthernet"; + stopAndWait(); close(); -#ifdef WINDOWS - PRIVATE->closeEvent(); -#endif - //piCoutObj << "~PIEthernet done"; + PRIVATE->event.destroy(); + //piCout << "~PIEthernet done"; } @@ -294,12 +279,11 @@ void PIEthernet::construct() { bool PIEthernet::init() { if (isOpened()) return true; - //cout << "init " << type_ << endl; + if (sock != -1) return true; + //piCout << "init " << type(); + PRIVATE->event.destroy(); if (sock_s == sock) sock_s = -1; -#ifdef WINDOWS - PRIVATE->closeEvent(); -#endif closeSocket(sock); closeSocket(sock_s); int st = 0, pr = 0; @@ -312,14 +296,8 @@ bool PIEthernet::init() { } PIFlags params = parameters(); sock = ::socket(AF_INET, st, pr); -#ifdef WINDOWS - if (type() != TCP_Server) { - // non-blocking socket - u_long mode = 1; - ioctlsocket(sock, FIONBIO, &mode); - } - PRIVATE->createEvent(); -#endif + ethNonblocking(sock); + PRIVATE->event.create(); if (params[SeparateSockets]) sock_s = ::socket(AF_INET, st, pr); else @@ -332,7 +310,7 @@ bool PIEthernet::init() { if (params[PIEthernet::Broadcast]) ethSetsockoptBool(sock, SOL_SOCKET, SO_BROADCAST); applyTimeouts(); applyOptInt(IPPROTO_IP, IP_TTL, TTL()); -// piCoutObj << "inited" << path(); + //piCoutObj << "inited" << path(); return true; } @@ -386,11 +364,12 @@ PIEthernet::Address PIEthernet::getBroadcast(const PIEthernet::Address & ip, con bool PIEthernet::openDevice() { if (connected_) return true; + //piCoutObj << "open"; init(); if (sock == -1 || path().isEmpty()) return false; addr_r.set(path()); - if (type() == TCP_Client) - connecting_ = true; + //if (type() == TCP_Client) + // connecting_ = true; if (type() != UDP || mode() == PIIODevice::WriteOnly) return true; memset(&PRIVATE->addr_, 0, sizeof(PRIVATE->addr_)); @@ -423,24 +402,22 @@ bool PIEthernet::openDevice() { bool PIEthernet::closeDevice() { - //cout << "close\n"; + //piCoutObj << "close"; + bool ned = connected_; + connected_ = connecting_ = false; + server_thread_.stop(); + PRIVATE->event.interrupt(); if (server_thread_.isRunning()) { - server_thread_.stop(); - server_thread_.interrupt(); if (!server_thread_.waitForFinish(1000)) server_thread_.terminate(); } + PRIVATE->event.destroy(); if (sock_s == sock) sock_s = -1; closeSocket(sock); closeSocket(sock_s); -#ifdef WINDOWS - if (PRIVATE->read_event) WSASetEvent(PRIVATE->read_event); -#endif while (!clients_.isEmpty()) delete clients_.back(); - bool ned = connected_; - connected_ = connecting_ = false; if (ned) disconnected(false); return true; } @@ -591,6 +568,7 @@ bool PIEthernet::connect(bool threaded) { connecting_ = true; return true; } + if (connected_) return false; if (sock == -1) init(); if (sock == -1) return false; memset(&PRIVATE->addr_, 0, sizeof(PRIVATE->addr_)); @@ -710,40 +688,21 @@ bool PIEthernet::send(const PIEthernet::Address & addr, const PIByteArray & data } +void PIEthernet::interrupt() { + //piCout << "interrupt"; + PRIVATE->event.interrupt(); +} + + ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { //piCout << "read" << sock; if (sock == -1) init(); if (sock == -1 || read_to == 0) return -1; - int rs = 0, s = 0, lerr = 0; - sockaddr_in client_addr; - socklen_t slen = sizeof(client_addr); -// piCoutObj << "read from " << ip_ << ":" << port_; + int rs = 0, lerr = 0; + //piCoutObj << "read from " << path() << connecting_; switch (type()) { - case TCP_SingleTCP: - reading_now = true; - ::listen(sock, 64); - reading_now = false; - s = accept(sock, (sockaddr * )&client_addr, &slen); - if (s == -1) { - //piCoutObj << "Can`t accept new connection, " << ethErrorString(); - piMinSleep(); - return -1; - } - reading_now = true; - rs = ethRecv(s, read_to, max_size); - reading_now = false; - closeSocket(s); - return rs; case TCP_Client: if (connecting_) { -#ifdef ANDROID - /*if (sock_s == sock) - sock_s = -1; - closeSocket(sock); - closeSocket(sock_s); - init(); - qDebug() << "init() in read thread";*/ -#endif addr_r.set(path()); memset(&PRIVATE->addr_, 0, sizeof(PRIVATE->addr_)); PRIVATE->addr_.sin_port = htons(addr_r.port()); @@ -752,9 +711,9 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { #ifdef QNX PRIVATE->addr_.sin_len = sizeof(PRIVATE->addr_); #endif - piCout << "connect to " << path() << "..."; + piCoutObj << "connect to " << path() << "..."; connected_ = connectTCP(); - piCout << "connect to " << path() << connected_; + piCoutObj << "connect to " << path() << connected_; if (!connected_) piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString(); opened_ = connected_; @@ -772,18 +731,19 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { long wr = waitForEvent(FD_READ | FD_CLOSE); switch (wr) { case FD_READ: - piCout << "fd_read ..."; + //piCout << "fd_read ..."; rs = ethRecv(sock, read_to, max_size); break; case FD_CLOSE: - piCout << "fd_close ..."; + //piCout << "fd_close ..."; rs = -1; break; default: break; } } #else - rs = ethRecv(sock, read_to, max_size); + if (PRIVATE->event.wait(sock)) + rs = ethRecv(sock, read_to, max_size); #endif //piCoutObj << "readed" << rs; if (rs <= 0) { @@ -792,37 +752,39 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { #ifdef WINDOWS if ((lerr == WSAEWOULDBLOCK || lerr == WSAETIMEDOUT) && !parameters()[DisonnectOnTimeout]) { #elif defined(ANDROID) - if ((lerr == EWOULDBLOCK || lerr == EAGAIN || lerr == EINTR) && !parameters()[DisonnectOnTimeout]) { + if ((lerr == EWOULDBLOCK || lerr == EAGAIN || lerr == EINTR) && !parameters()[DisonnectOnTimeout] && rs < 0) { #else - if ((lerr == EWOULDBLOCK || lerr == EAGAIN) && !parameters()[DisonnectOnTimeout]) { + if ((lerr == EWOULDBLOCK || lerr == EAGAIN) && !parameters()[DisonnectOnTimeout] && rs < 0) { #endif //piCoutObj << errorString(); return -1; } if (connected_) { - piCoutObj << "Disconnect on read," << ethErrorString(); + //piCoutObj << "Disconnect on read," << ethErrorString(); opened_ = connected_ = false; + closeSocket(sock); init(); disconnected(rs < 0); } - if (parameters()[KeepConnection]) + if (parameters()[KeepConnection]) { connect(); + } //piCoutObj << "eth" << ip_ << "disconnected"; } if (rs > 0) received(read_to, rs); return rs; case UDP: { memset(&PRIVATE->raddr_, 0, sizeof(PRIVATE->raddr_)); - piCoutObj << "read from" << path() << "..."; + //piCoutObj << "read from" << path() << "..."; #ifdef WINDOWS long wr = waitForEvent(FD_READ | FD_CLOSE); switch (wr) { case FD_READ: - piCout << "fd_read ..."; + //piCout << "fd_read ..."; rs = ethRecvfrom(sock, read_to, max_size, 0, (sockaddr*)&PRIVATE->raddr_); break; case FD_CLOSE: - piCout << "fd_close ..."; + //piCout << "fd_close ..."; rs = -1; break; default: break; @@ -830,7 +792,7 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { #else rs = ethRecvfrom(sock, read_to, max_size, 0, (sockaddr*)&PRIVATE->raddr_); #endif - piCoutObj << "read from" << path() << rs << "bytes"; + //piCoutObj << "read from" << path() << rs << "bytes"; if (rs > 0) { addr_lr.set(uint(PRIVATE->raddr_.sin_addr.s_addr), ntohs(PRIVATE->raddr_.sin_port)); //piCoutObj << "read from" << ip_r << ":" << port_r << rs << "bytes"; @@ -854,26 +816,6 @@ ssize_t PIEthernet::writeDevice(const void * data, ssize_t max_size) { //piCoutObj << "sending to " << ip_s << ":" << port_s << " " << max_size << " bytes"; int ret = 0; switch (type()) { - case TCP_SingleTCP: - memset(&PRIVATE->addr_, 0, sizeof(PRIVATE->addr_)); - PRIVATE->addr_.sin_port = htons(addr_s.port()); - PRIVATE->addr_.sin_addr.s_addr = addr_s.ip(); - PRIVATE->addr_.sin_family = AF_INET; -#ifdef QNX - PRIVATE->addr_.sin_len = sizeof(PRIVATE->addr_); -#endif - //piCoutObj << "connect SingleTCP" << ip_s << ":" << port_s << "..."; - if (!connectTCP()) { - //piCoutObj << "Can`t connect to " << ip_s << ":" << port_s << ", " << ethErrorString(); - piMinSleep(); - return -1; - } - //piCoutObj << "ok, write SingleTCP" << int(data) << max_size << "bytes ..."; - ret = ::send(sock, (const char *)data, max_size, 0); - //piCoutObj << "ok, ret" << ret; - closeSocket(sock); - init(); - return ret; case UDP: PRIVATE->saddr_.sin_port = htons(addr_s.port()); PRIVATE->saddr_.sin_addr.s_addr = addr_s.ip(); @@ -912,6 +854,7 @@ ssize_t PIEthernet::writeDevice(const void * data, ssize_t max_size) { if (ret < 0) { piCoutObj << "Disconnect on write," << ethErrorString(); opened_ = connected_ = false; + closeSocket(sock); init(); disconnected(true); } @@ -926,8 +869,7 @@ PIIODevice::DeviceInfoFlags PIEthernet::deviceInfoFlags() const { switch (type()) { case UDP: return 0; case TCP_Client: - case TCP_Server: - case TCP_SingleTCP: return Sequential | Reliable; + case TCP_Server: return Sequential | Reliable; default: break; } return 0; @@ -960,9 +902,15 @@ void PIEthernet::server_func(void * eth) { piMSleep(10); return; } +#else + if (!ce->PRIVATEWB->event.wait(ce->sock)) { + piMSleep(10); + return; + } #endif + //piCout << "server" << "accept ..."; int s = accept(ce->sock, (sockaddr * )&client_addr, &slen); - piCout << ethErrorString(); + //piCout << "server" << "accept done" << ethErrorString(); if (s == -1) { int lerr = ethErrorCore(); #ifdef WINDOWS @@ -1003,44 +951,43 @@ void PIEthernet::setType(Type t, bool reopen) { bool PIEthernet::connectTCP() { -#ifdef WINDOWS ::connect(sock, (sockaddr * )&(PRIVATE->addr_), sizeof(PRIVATE->addr_)); + //piCout << errorString(); +#ifdef WINDOWS long wr = waitForEvent(FD_CONNECT | FD_CLOSE); switch (wr) { - case FD_CONNECT: { + case FD_CONNECT: //piCout << "fd_connect ..."; - timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = 0; - fd_set fd_test; - FD_ZERO(&fd_test); - FD_SET(sock, &fd_test); - ::select(0, nullptr, &fd_test, nullptr, &timeout); - return FD_ISSET(sock, &fd_test); - } break; + return ethIsWriteable(sock); default: break; } - return false; #else - return ::connect(sock, (sockaddr * )&(PRIVATE->addr_), sizeof(PRIVATE->addr_)) == 0; + if (PRIVATE->event.wait(sock, PIWaitEvent::CheckWrite)) { + if (ethIsWriteable(sock)) return true; + else { + closeSocket(sock); + init(); + } + } #endif + return false; } #ifdef WINDOWS long PIEthernet::waitForEvent(long mask) { - if (!PRIVATE->read_event || sock < 0) return 0; - WSAEventSelect(sock, PRIVATE->read_event, mask); - DWORD wr = WSAWaitForMultipleEvents(1, &(PRIVATE->read_event), FALSE, WSA_INFINITE, TRUE); - piCout << "wait result" << wr; - if (wr == WSA_WAIT_EVENT_0) { + if (!PRIVATE->event.isCreate() || sock < 0) return 0; + WSAEventSelect(sock, PRIVATE->event.event, mask); + if (PRIVATE->event.wait()) { + //DWORD wr = WSAWaitForMultipleEvents(1, &(PRIVATE->read_event), FALSE, WSA_INFINITE, TRUE); + //piCout << "wait result" << wr; + //if (wr == WSA_WAIT_EVENT_0) { WSANETWORKEVENTS events; memset(&events, 0, sizeof(events)); - WSAEnumNetworkEvents(sock, PRIVATE->read_event, &events); - WSAResetEvent(PRIVATE->read_event); + WSAEnumNetworkEvents(sock, PRIVATE->event.event, &events); + //piCout << "wait result" << events.lNetworkEvents; return events.lNetworkEvents; } - WSAResetEvent(PRIVATE->read_event); return 0; } #endif @@ -1449,3 +1396,43 @@ int PIEthernet::ethSetsockoptBool(int sock, int level, int optname, bool value) so = (value ? 1 : 0); return ethSetsockopt(sock, level, optname, &so, sizeof(so)); } + + +void PIEthernet::ethNonblocking(int sock) { + if (sock < 0) return; +#ifdef WINDOWS + u_long mode = 1; + ioctlsocket(sock, FIONBIO, &mode); +#else + fcntl(sock, F_SETFL, O_NONBLOCK); +#endif +} + + +bool PIEthernet::ethIsWriteable(int sock) { +/* fd_set fd_test; + FD_ZERO(&fd_test); + FD_SET(sock, &fd_test); + int fds = 0; +#ifndef WINDOWS + fds = sock + 1; +#endif + timeval timeout; + timeout.tv_sec = timeout.tv_usec = 0; + ::select(fds, nullptr, &fd_test, nullptr, &timeout); + return FD_ISSET(sock, &fd_test);*/ +#ifdef WINDOWS + fd_set fd_test; + FD_ZERO(&fd_test); + FD_SET(sock, &fd_test); + timeval timeout; + timeout.tv_sec = timeout.tv_usec = 0; + ::select(0, nullptr, &fd_test, nullptr, &timeout); + return FD_ISSET(sock, &fd_test); +#else + int ret = 0; + socklen_t len = sizeof(ret); + getsockopt(sock, SOL_SOCKET, SO_ERROR, (char*)&ret, &len); + return ret == 0; +#endif +} diff --git a/libs/main/io_devices/piethernet.h b/libs/main/io_devices/piethernet.h index de8c5987..e4648874 100644 --- a/libs/main/io_devices/piethernet.h +++ b/libs/main/io_devices/piethernet.h @@ -49,8 +49,7 @@ public: enum Type { UDP /** UDP - User Datagram Protocol */ , TCP_Client /** TCP client - allow connection to TCP server */ , - TCP_Server /** TCP server - receive connections from TCP clients */ , - TCP_SingleTCP /** TCP client single mode - connect & send & disconnect, on each packet */ + TCP_Server /** TCP server - receive connections from TCP clients */ }; //! \brief Parameters of %PIEthernet @@ -315,6 +314,8 @@ public: bool canWrite() const override {return mode() & WriteOnly;} + void interrupt() override; + int socket() const {return sock;} EVENT1(newConnection, PIEthernet * , client); @@ -487,7 +488,7 @@ protected: PRIVATE_DECLARATION(PIP_EXPORT) int sock, sock_s; - bool connected_, connecting_, listen_threaded, server_bounded; + std::atomic_bool connected_, connecting_, listen_threaded, server_bounded; mutable Address addr_r, addr_s, addr_lr; Type eth_type; PIThread server_thread_; @@ -514,6 +515,8 @@ private: static int ethSetsockopt(int sock, int level, int optname, const void * optval, int optlen); static int ethSetsockoptInt(int sock, int level, int optname, int value = 1); static int ethSetsockoptBool(int sock, int level, int optname, bool value = true); + static void ethNonblocking(int sock); + static bool ethIsWriteable(int sock); }; diff --git a/libs/main/io_devices/piiodevice.cpp b/libs/main/io_devices/piiodevice.cpp index 155674a6..90c4d95b 100644 --- a/libs/main/io_devices/piiodevice.cpp +++ b/libs/main/io_devices/piiodevice.cpp @@ -131,8 +131,8 @@ PIIODevice::PIIODevice(const PIString & path, PIIODevice::DeviceMode mode): PIOb PIIODevice::~PIIODevice() { - stop(); - waitThreadedReadFinished(); + destroying = true; + stopAndWait(); } @@ -215,8 +215,10 @@ void PIIODevice::stopThreadedRead() { read_thread.stop(); #else read_thread.stop(); - read_thread.interrupt(); - stopThreadedReadDevice(); + if (!destroying) { + interrupt(); + stopThreadedReadDevice(); + } if (reading_now) { read_thread.terminate(); reading_now = false; @@ -271,6 +273,13 @@ void PIIODevice::stop() { } +void PIIODevice::stopAndWait(int timeout_ms) { + stop(); + waitThreadedReadFinished(timeout_ms); + waitThreadedWriteFinished(timeout_ms); +} + + ssize_t PIIODevice::read(void * read_to, ssize_t max_size) { ssize_t ret = readDevice(read_to, max_size); return ret; diff --git a/libs/main/io_devices/piiodevice.h b/libs/main/io_devices/piiodevice.h index c5f7936d..3bd8f3b9 100644 --- a/libs/main/io_devices/piiodevice.h +++ b/libs/main/io_devices/piiodevice.h @@ -277,6 +277,14 @@ public: //! \~russian Останавливает потоковое чтение и запись. void stop(); + //! \~english Stop both threaded read and threaded write and wait for finish. + //! \~russian Останавливает потоковое чтение и запись и ожидает завершения. + void stopAndWait(int timeout_ms = -1); + + //! \~english Interrupt blocking operation. + //! \~russian Прерывает блокирующую операцию. + virtual void interrupt() {} + //! \~english Read from device maximum "max_size" bytes to "read_to" //! \~russian Читает из устройства не более "max_size" байт в "read_to" @@ -561,7 +569,7 @@ private: PIQueue > write_queue; ullong tri = 0; uint threaded_read_buffer_size, reopen_timeout = 1000; - bool reopen_enabled = true; + bool reopen_enabled = true, destroying = false; static PIMutex nfp_mutex; static PIMap nfp_cache; diff --git a/libs/main/io_devices/piserial.cpp b/libs/main/io_devices/piserial.cpp index d3c97c44..a9437a73 100644 --- a/libs/main/io_devices/piserial.cpp +++ b/libs/main/io_devices/piserial.cpp @@ -22,6 +22,7 @@ #include "piconfig.h" #include "pidir.h" #include "pipropertystorage.h" +#include "piwaitevent_p.h" #include #if defined(MICRO_PIP) @@ -169,24 +170,22 @@ REGISTER_DEVICE(PISerial) PRIVATE_DEFINITION_START(PISerial) + PIWaitEvent event; #ifdef WINDOWS + PIWaitEvent event_write; DCB desc, sdesc; - void * hCom; - DWORD readed, mask; + HANDLE hCom = nullptr; + DWORD readed = 0, mask = 0; + OVERLAPPED overlap, overlap_write; #else termios desc, sdesc; - uint readed; + uint readed = 0; #endif PRIVATE_DEFINITION_END(PISerial) -PISerial::DeviceInfo::DeviceInfo() { - vID = pID = 0; -} - - PIString PISerial::DeviceInfo::id() const { return PIString::fromNumber(vID, 16).toLowerCase().expandLeftTo(4, '0') + ":" + PIString::fromNumber(pID, 16).toLowerCase().expandLeftTo(4, '0'); @@ -209,19 +208,18 @@ PISerial::PISerial(const PIString & device_, PISerial::Speed speed_, PIFlagsevent.destroy(); +#ifdef WINDOWS + PRIVATE->event_write.destroy(); +#endif } void PISerial::construct() { -#ifdef WINDOWS - PRIVATE->hCom = 0; -#endif - fd = -1; - //setPriority(piHigh); - vtime = 10; sending = false; + //setPriority(piHigh); setParameters(0); setSpeed(S115200); setDataBitsCount(8); @@ -628,6 +626,15 @@ bool PISerial::send(const void * data, int size) { } +void PISerial::interrupt() { + //piCoutObj << "interrupt"; + PRIVATE->event.interrupt(); +#ifdef WINDOWS + PRIVATE->event_write.interrupt(); +#endif +} + + bool PISerial::openDevice() { PIString p = path(); //piCout << "ser open" << p; @@ -651,7 +658,7 @@ bool PISerial::openDevice() { if (isReadable()) {ds |= GENERIC_READ; sm |= FILE_SHARE_READ;} if (isWriteable()) {ds |= GENERIC_WRITE; sm |= FILE_SHARE_WRITE;} PIString wp = "//./" + p; - PRIVATE->hCom = CreateFileA(wp.dataAscii(), ds, sm, 0, OPEN_EXISTING, FILE_ATTRIBUTE_SYSTEM, 0); + PRIVATE->hCom = CreateFileA(wp.dataAscii(), ds, sm, 0, OPEN_EXISTING, FILE_ATTRIBUTE_SYSTEM | FILE_FLAG_OVERLAPPED, 0); if (PRIVATE->hCom == INVALID_HANDLE_VALUE) { piCoutObj << "Unable to open \"" << p << "\""; fd = -1; @@ -675,6 +682,11 @@ bool PISerial::openDevice() { //piCoutObj << "Initialized " << p; #endif applySettings(); + PRIVATE->event.create(); +#ifdef WINDOWS + PRIVATE->event_write.create(); +#endif + return true; } @@ -696,6 +708,10 @@ bool PISerial::closeDevice() { #endif fd = -1; } + PRIVATE->event.destroy(); +#ifdef WINDOWS + PRIVATE->event_write.destroy(); +#endif return true; } @@ -796,12 +812,21 @@ ssize_t PISerial::readDevice(void * read_to, ssize_t max_size) { #ifdef WINDOWS if (!canRead()) return -1; if (sending) return -1; -// piCoutObj << "com event ..."; //piCoutObj << "read ..." << PRIVATE->hCom; - ReadFile(PRIVATE->hCom, read_to, max_size, &PRIVATE->readed, 0); + memset(&(PRIVATE->overlap), 0, sizeof(PRIVATE->overlap)); + PRIVATE->overlap.hEvent = PRIVATE->event.event; + ReadFile(PRIVATE->hCom, read_to, max_size, NULL, &(PRIVATE->overlap)); + PRIVATE->readed = 0; + if (PRIVATE->event.wait()) { + GetOverlappedResult(PRIVATE->hCom, &(PRIVATE->overlap), &(PRIVATE->readed), FALSE); + } else + return -1; + //piCoutObj << "read done" << PRIVATE->readed; DWORD err = GetLastError(); - //piCout << err; + if (err == ERROR_TIMEOUT && PRIVATE->readed == 0) + return 0; if (err == ERROR_BAD_COMMAND || err == ERROR_ACCESS_DENIED) { + piCoutObj << "read error" << (PRIVATE->readed) << errorString(); softStopThreadedRead(); close(); return 0; @@ -810,9 +835,8 @@ ssize_t PISerial::readDevice(void * read_to, ssize_t max_size) { return PRIVATE->readed; #else if (!canRead()) return -1; - reading_now = true; + if (!PRIVATE->event.wait(fd)) return -1; ssize_t ret = ::read(fd, read_to, max_size); - reading_now = false; if (ret < 0) { int err = errno; if (err == EBADF || err == EFAULT || err == EINVAL || err == EIO) { @@ -832,12 +856,17 @@ ssize_t PISerial::writeDevice(const void * data, ssize_t max_size) { return -1; } #ifdef WINDOWS - DWORD wrote; -// piCoutObj << "send ...";// << max_size;// << ": " << PIString((char*)data, max_size); + DWORD wrote(0); + //piCoutObj << "send ..." << max_size;// << ": " << PIString((char*)data, max_size); sending = true; - WriteFile(PRIVATE->hCom, data, max_size, &wrote, 0); + memset(&(PRIVATE->overlap_write), 0, sizeof(PRIVATE->overlap_write)); + PRIVATE->overlap_write.hEvent = PRIVATE->event_write.event; + WriteFile(PRIVATE->hCom, data, max_size, NULL, &(PRIVATE->overlap_write)); + if (PRIVATE->event_write.wait()) { + GetOverlappedResult(PRIVATE->hCom, &(PRIVATE->overlap_write), &wrote, FALSE); + } sending = false; -// piCoutObj << "send ok";// << wrote << " bytes in " << path(); + //piCoutObj << "send ok" << wrote;// << " bytes in " << path(); #else ssize_t wrote; wrote = ::write(fd, data, max_size); @@ -1148,7 +1177,7 @@ PIVector PISerial::availableDevicesInfo(bool test) { if (test) { for (int i = 0; i < ret.size_s(); ++i) { #ifdef WINDOWS - void * hComm = CreateFileA(ret[i].path.dataAscii(), GENERIC_READ, FILE_SHARE_READ, 0, OPEN_EXISTING, FILE_ATTRIBUTE_SYSTEM, 0); + void * hComm = CreateFileA(ret[i].path.dataAscii(), GENERIC_READ, FILE_SHARE_READ, 0, OPEN_EXISTING, FILE_ATTRIBUTE_SYSTEM | FILE_FLAG_OVERLAPPED, 0); if (hComm == INVALID_HANDLE_VALUE) { #else int fd = ::open(ret[i].path.dataAscii(), O_NOCTTY | O_RDONLY); diff --git a/libs/main/io_devices/piserial.h b/libs/main/io_devices/piserial.h index 54277eb4..aa8bdc16 100644 --- a/libs/main/io_devices/piserial.h +++ b/libs/main/io_devices/piserial.h @@ -90,19 +90,17 @@ public: //! \~english Information about serial device //! \~russian Информация о последовательном устройстве struct PIP_EXPORT DeviceInfo { - DeviceInfo(); - //! \~english Returns string representation of USB ID in format "xxxx:xxxx" (vID:pID) //! \~russian Возвращает строковое представление USB ID в формате "xxxx:xxxx" (vID:pID) PIString id() const; //! \~english USB Vendor ID //! \~russian USB Vendor ID - uint vID; + uint vID = 0; //! \~english USB Product ID //! \~russian USB Product ID - uint pID; + uint pID = 0; //! \~english Path to device, e.g. "COM2" or "/dev/ttyUSB0" //! \~russian Путь к устройству, например "COM2" или "/dev/ttyUSB0" @@ -240,6 +238,8 @@ public: //! \~russian Пишет в порт байтовый массив "data". Возвращает если количество записанных байт = размер "data" bool send(const PIByteArray & data) {return send(data.data(), data.size_s());} + void interrupt() override; + //! \~english Returns all available speeds for serial devices //! \~russian Возвращает все возможные скорости для устройств static PIVector availableSpeeds(); @@ -310,8 +310,8 @@ protected: bool closeDevice() override; PRIVATE_DECLARATION(PIP_EXPORT) - int fd, vtime; - bool sending; + int fd = -1, vtime = 10; + std::atomic_bool sending; PITimeMeasurer tm_; }; diff --git a/libs/main/io_utils/piconnection.cpp b/libs/main/io_utils/piconnection.cpp index 4c32ad80..90f36bd5 100644 --- a/libs/main/io_utils/piconnection.cpp +++ b/libs/main/io_utils/piconnection.cpp @@ -1115,7 +1115,7 @@ PIVector PIConnection::DevicePool::boundedDevices(const PIConnect PIConnection::DevicePool::DeviceData::~DeviceData() { if (rthread) { rthread->stop(); - rthread->interrupt(); + if (dev) dev->interrupt(); if (!rthread->waitForFinish(1000)) rthread->terminate(); delete rthread; diff --git a/main.cpp b/main.cpp index 46f90d43..73e4ad68 100644 --- a/main.cpp +++ b/main.cpp @@ -79,6 +79,12 @@ public: Pipe pipe; }; +PITimeMeasurer tm; +void phase(const char * msg) { + piCout << ""; + piCout << piRound(tm.elapsed_s() * 10) / 10. << "s" << msg; +} + int main(int argc, char * argv[]) { piCout << "main"; @@ -109,53 +115,87 @@ int main(int argc, char * argv[]) { delete threads[i]; }*/ - PIEthernet eth(PIEthernet::UDP), seth(PIEthernet::TCP_Client); - eth.connect("127.0.0.1", 50000); - eth.startThreadedRead(); + //PIEthernet eth(PIEthernet::UDP), seth(PIEthernet::UDP); + //eth.setReadAddress("127.0.0.1", 50000); + //eth.startThreadedRead(); //piCout << eth.open(); - /* + PIByteArray req = PIByteArray::fromHex("205e011000000000ef"); PISerial ser; ser.setSpeed(PISerial::S9600); - ser.setOption(PIIODevice::BlockingRead); - piCout << ser.open("COM3"); - */ - - /* + ser.setOption(PIIODevice::BlockingRead, false); + ser.setVTime(200); + ser.open("COM3"); + CONNECTL(&ser, threadedReadEvent, ([](const uchar * data, ssize_t size){ + piCout << "*ser readed" << size; + })); PIThread thread; thread.start([&](void*){ - piCout << "[T] start" << GetCurrentThreadId(); - //PIByteArray data = ((PIIODevice*)&ser)->read(1024); - eth.connect("192.168.1.13", 23, false); - piCout << "[T] connected" << eth.isConnected() << errorString(); - //piCout << "[T] readed" << data.size() << errorString(); + piCout << "[T] start"; + PIByteArray data = ((PIIODevice*)&ser)->read(1024); + piCout << "[T] readed" << data.size();// << errorString(); piCout << "[T] end"; - }); - piMSleep(500); - //eth.close(); - //piMSleep(500); - thread.stop(); - thread.interrupt(); - //seth.send("127.0.0.1", 50000, "string", 7); - //thread.interrupt(); - thread.waitForFinish(); - */ + }, 200); + //ser.startThreadedRead(); + + piSleep(1); + ser.write(req); + phase("Send"); + + piSleep(2); + phase("End"); /* - eth.listen("127.0.0.1", 50000); - seth.connect("127.0.0.1", 50001, false); + PIEthernet eth(PIEthernet::TCP_Client), seth(PIEthernet::TCP_Server), * server_client = nullptr; + + seth.listen("127.0.0.1", 50000, true); + //seth.startThreadedRead(); + CONNECTL(&seth, newConnection, ([&server_client](PIEthernet * e){ + server_client = e; + e->setName("TCP SC"); + piCout << "newConn" << e; + CONNECTL(e, threadedReadEvent, ([](const uchar * data, ssize_t size){ + piCout << "*TCP SC* readed" << size; + })); + CONNECTL(e, disconnected, ([](bool error){ + piCout << "*TCP SC* disconnected" << error; + })); + e->startThreadedRead(); + })); + + eth.setName("TCP CC"); + //eth.setParameter(PIEthernet::KeepConnection, false); + CONNECTL(ð, connected, ([ð](){ + piCout << "*TCP CC* connected"; + eth.send("byte", 5); + })); + CONNECTL(ð, disconnected, ([](bool error){ + piCout << "*TCP CC* disconnected" << error; + })); + piMSleep(500); - piCout << "connected" << seth.isConnected(); - */ - piMSleep(1000); - piCout << "main stop ..."; - eth.stopThreadedRead(); - piCout << "main wait ..." << eth.isThreadedRead(); - eth.waitThreadedReadFinished(); + phase("Connect"); + eth.connect("127.0.0.1", 50000); + eth.startThreadedRead(); + piMSleep(500); + phase("Send 5"); + piCout << "c-ing" << eth.isConnecting(); + piCout << "c- ed" << eth.isConnected(); + eth.send("byte", 5); + + piMSleep(500); + phase("Send 6"); + eth.send("bytes", 6); + + piMSleep(500); + phase("Disconnect"); + if (server_client) + server_client->close(); //eth.close(); - piCout << "main end" << eth.isThreadedRead(); - //ser.close(); + piMSleep(500); + phase("END"); + */ return 0; }