From 6e81a419fba6d8c6bb3ff1ee838eb3b5663b6813 Mon Sep 17 00:00:00 2001 From: peri4 Date: Tue, 1 Nov 2022 00:02:44 +0300 Subject: [PATCH] start move to interruption of blocking calls, PIThread and PIEthernet --- libs/main/core/piincludes_p.h | 4 + libs/main/core/piinit.cpp | 26 +++-- libs/main/io_devices/piethernet.cpp | 142 +++++++++++++++++++++--- libs/main/io_devices/piethernet.h | 4 + libs/main/piplatform.h | 4 + libs/main/thread/pithread.cpp | 23 ++++ libs/main/thread/pithread.h | 2 + main.cpp | 163 ++++++++++++++++++++++------ 8 files changed, 311 insertions(+), 57 deletions(-) diff --git a/libs/main/core/piincludes_p.h b/libs/main/core/piincludes_p.h index e3c622ca..d35baa6e 100644 --- a/libs/main/core/piincludes_p.h +++ b/libs/main/core/piincludes_p.h @@ -22,6 +22,10 @@ #include "picout.h" #ifdef WINDOWS +# ifdef _WIN32_WINNT +# undef _WIN32_WINNT +# define _WIN32_WINNT 0x0600 +# endif # include # include # include diff --git a/libs/main/core/piinit.cpp b/libs/main/core/piinit.cpp index c8354cfb..6d29486f 100644 --- a/libs/main/core/piinit.cpp +++ b/libs/main/core/piinit.cpp @@ -92,9 +92,19 @@ void __sighandler__(PISignals::Signal s) { } -#ifdef ANDROID -void android_thread_exit_handler(int sig) { - pthread_exit(0); +#ifdef POSIX_SIGNALS +void pipThreadSignalHandler(int sig) { +//# ifdef ANDROID +// pthread_exit(0); +//# endif +} +void pipInitThreadSignals() { + struct sigaction actions; + memset(&actions, 0, sizeof(actions)); + sigemptyset(&actions.sa_mask); + actions.sa_flags = 0; + actions.sa_handler = pipThreadSignalHandler; + sigaction(SIGUSR2, &actions, 0); } #endif @@ -166,14 +176,10 @@ PIInit::PIInit() { setlocale(LC_ALL, ""); setlocale(LC_NUMERIC, "C"); # endif //HAS_LOCALE -#else //ANDROID - struct sigaction actions; - memset(&actions, 0, sizeof(actions)); - sigemptyset(&actions.sa_mask); - actions.sa_flags = 0; - actions.sa_handler = android_thread_exit_handler; - sigaction(SIGTERM, &actions, 0); #endif //ANDROID +#ifdef POSIX_SIGNALS + pipInitThreadSignals(); +#endif PRIVATE->delete_locs = false; __syslocname__ = __sysoemname__ = 0; __utf8name__ = const_cast("UTF-8"); diff --git a/libs/main/io_devices/piethernet.cpp b/libs/main/io_devices/piethernet.cpp index 3ad67450..7f459ef9 100644 --- a/libs/main/io_devices/piethernet.cpp +++ b/libs/main/io_devices/piethernet.cpp @@ -212,6 +212,18 @@ PRIVATE_DEFINITION_START(PIEthernet) sockaddr_in addr_; sockaddr_in saddr_; sockaddr_in raddr_; +#ifdef WINDOWS + WSAEVENT read_event = nullptr; + void createEvent() { + closeEvent(); + read_event = WSACreateEvent(); + } + void closeEvent() { + if (!read_event) return; + WSACloseEvent(read_event); + read_event = nullptr; + } +#endif PRIVATE_DEFINITION_END(PIEthernet) @@ -240,6 +252,11 @@ PIEthernet::PIEthernet(int sock_, PIString ip_port): PIIODevice("", ReadWrite) { setParameters(PIEthernet::ReuseAddress | PIEthernet::MulticastLoop); setType(TCP_Client, false); setPath(ip_port); +#ifdef WINDOWS + u_long mode = 1; + ioctlsocket(sock, FIONBIO, &mode); + PRIVATE->createEvent(); +#endif //piCoutObj << "new tcp client" << sock_; } @@ -248,6 +265,9 @@ PIEthernet::~PIEthernet() { //piCout << "~PIEthernet" << uint(this); stop(); close(); +#ifdef WINDOWS + PRIVATE->closeEvent(); +#endif //piCoutObj << "~PIEthernet done"; } @@ -277,6 +297,9 @@ bool PIEthernet::init() { //cout << "init " << type_ << endl; if (sock_s == sock) sock_s = -1; +#ifdef WINDOWS + PRIVATE->closeEvent(); +#endif closeSocket(sock); closeSocket(sock_s); int st = 0, pr = 0; @@ -289,6 +312,14 @@ bool PIEthernet::init() { } PIFlags params = parameters(); sock = ::socket(AF_INET, st, pr); +#ifdef WINDOWS + if (type() != TCP_Server) { + // non-blocking socket + u_long mode = 1; + ioctlsocket(sock, FIONBIO, &mode); + } + PRIVATE->createEvent(); +#endif if (params[SeparateSockets]) sock_s = ::socket(AF_INET, st, pr); else @@ -395,13 +426,17 @@ bool PIEthernet::closeDevice() { //cout << "close\n"; if (server_thread_.isRunning()) { server_thread_.stop(); - if (!server_thread_.waitForFinish(100)) + server_thread_.interrupt(); + if (!server_thread_.waitForFinish(1000)) server_thread_.terminate(); } if (sock_s == sock) sock_s = -1; closeSocket(sock); closeSocket(sock_s); +#ifdef WINDOWS + if (PRIVATE->read_event) WSASetEvent(PRIVATE->read_event); +#endif while (!clients_.isEmpty()) delete clients_.back(); bool ned = connected_; @@ -566,7 +601,7 @@ bool PIEthernet::connect(bool threaded) { #ifdef QNX PRIVATE->addr_.sin_len = sizeof(PRIVATE->addr_); #endif - connected_ = (::connect(sock, (sockaddr * )&PRIVATE->addr_, sizeof(PRIVATE->addr_)) == 0); + connected_ = connectTCP(); if (!connected_) { piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString(); } @@ -618,7 +653,7 @@ bool PIEthernet::listen(bool threaded) { return false; } opened_ = server_bounded = true; - //piCoutObj << "listen on " << ip_ << ":" << port_; + piCoutObj << "listen on" << path(); server_thread_.start(server_func); return true; } @@ -718,9 +753,7 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { PRIVATE->addr_.sin_len = sizeof(PRIVATE->addr_); #endif //piCout << "connect to " << path() << "..."; - reading_now = true; - connected_ = (::connect(sock, (sockaddr * )&(PRIVATE->addr_), sizeof(PRIVATE->addr_)) == 0); - reading_now = false; + connected_ = connectTCP(); //piCout << "connect to " << path() << connected_; if (!connected_) piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString(); @@ -734,9 +767,24 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { } if (!connected_) return -1; errorClear(); - reading_now = true; +#ifdef WINDOWS + { + long wr = waitForEvent(FD_READ | FD_CLOSE); + switch (wr) { + case FD_READ: + piCout << "fd_read ..."; + rs = ethRecv(sock, read_to, max_size); + break; + case FD_CLOSE: + piCout << "fd_close ..."; + rs = -1; + break; + default: break; + } + } +#else rs = ethRecv(sock, read_to, max_size); - reading_now = false; +#endif //piCoutObj << "readed" << rs; if (rs <= 0) { lerr = ethErrorCore(); @@ -763,11 +811,26 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { } if (rs > 0) received(read_to, rs); return rs; - case UDP: + case UDP: { memset(&PRIVATE->raddr_, 0, sizeof(PRIVATE->raddr_)); - reading_now = true; + piCoutObj << "read from" << path() << "..."; +#ifdef WINDOWS + long wr = waitForEvent(FD_READ | FD_CLOSE); + switch (wr) { + case FD_READ: + piCout << "fd_read ..."; + rs = ethRecvfrom(sock, read_to, max_size, 0, (sockaddr*)&PRIVATE->raddr_); + break; + case FD_CLOSE: + piCout << "fd_close ..."; + rs = -1; + break; + default: break; + } +#else rs = ethRecvfrom(sock, read_to, max_size, 0, (sockaddr*)&PRIVATE->raddr_); - reading_now = false; +#endif + piCoutObj << "read from" << path() << rs << "bytes"; if (rs > 0) { addr_lr.set(uint(PRIVATE->raddr_.sin_addr.s_addr), ntohs(PRIVATE->raddr_.sin_port)); //piCoutObj << "read from" << ip_r << ":" << port_r << rs << "bytes"; @@ -775,6 +838,7 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { } //else piCoutObj << "read returt" << rs << ", error" << ethErrorString(); return rs; + } default: break; } return -1; @@ -799,7 +863,7 @@ ssize_t PIEthernet::writeDevice(const void * data, ssize_t max_size) { PRIVATE->addr_.sin_len = sizeof(PRIVATE->addr_); #endif //piCoutObj << "connect SingleTCP" << ip_s << ":" << port_s << "..."; - if (::connect(sock, (sockaddr * )&PRIVATE->addr_, sizeof(PRIVATE->addr_)) != 0) { + if (!connectTCP()) { //piCoutObj << "Can`t connect to " << ip_s << ":" << port_s << ", " << ethErrorString(); piMinSleep(); return -1; @@ -834,7 +898,7 @@ ssize_t PIEthernet::writeDevice(const void * data, ssize_t max_size) { PRIVATE->addr_.sin_len = sizeof(PRIVATE->addr_); #endif //piCoutObj << "connect to " << ip << ":" << port_; - connected_ = (::connect(sock, (sockaddr * )&PRIVATE->addr_, sizeof(PRIVATE->addr_)) == 0); + connected_ = connectTCP(); if (!connected_) piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString(); opened_ = connected_; @@ -890,7 +954,15 @@ void PIEthernet::server_func(void * eth) { } sockaddr_in client_addr; socklen_t slen = sizeof(client_addr); +#ifdef WINDOWS + long wr = ce->waitForEvent(FD_ACCEPT | FD_CLOSE); + if (wr != FD_ACCEPT) { + piMSleep(10); + return; + } +#endif int s = accept(ce->sock, (sockaddr * )&client_addr, &slen); + piCout << ethErrorString(); if (s == -1) { int lerr = ethErrorCore(); #ifdef WINDOWS @@ -930,6 +1002,50 @@ void PIEthernet::setType(Type t, bool reopen) { } +bool PIEthernet::connectTCP() { +#ifdef WINDOWS + ::connect(sock, (sockaddr * )&(PRIVATE->addr_), sizeof(PRIVATE->addr_)); + long wr = waitForEvent(FD_CONNECT | FD_CLOSE); + switch (wr) { + case FD_CONNECT: { + //piCout << "fd_connect ..."; + timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 0; + fd_set fd_test; + FD_ZERO(&fd_test); + FD_SET(sock, &fd_test); + ::select(0, nullptr, &fd_test, nullptr, &timeout); + return FD_ISSET(sock, &fd_test); + } break; + default: break; + } + return false; +#else + return ::connect(sock, (sockaddr * )&(PRIVATE->addr_), sizeof(PRIVATE->addr_)) == 0; +#endif +} + + +#ifdef WINDOWS +long PIEthernet::waitForEvent(long mask) { + if (!PRIVATE->read_event || sock < 0) return 0; + WSAEventSelect(sock, PRIVATE->read_event, mask); + DWORD wr = WSAWaitForMultipleEvents(1, &(PRIVATE->read_event), FALSE, WSA_INFINITE, TRUE); + piCout << "wait result" << wr; + if (wr == WSA_WAIT_EVENT_0) { + WSANETWORKEVENTS events; + memset(&events, 0, sizeof(events)); + WSAEnumNetworkEvents(sock, PRIVATE->read_event, &events); + WSAResetEvent(PRIVATE->read_event); + return events.lNetworkEvents; + } + WSAResetEvent(PRIVATE->read_event); + return 0; +} +#endif + + bool PIEthernet::configureDevice(const void * e_main, const void * e_parent) { PIConfig::Entry * em = (PIConfig::Entry * )e_main; PIConfig::Entry * ep = (PIConfig::Entry * )e_parent; diff --git a/libs/main/io_devices/piethernet.h b/libs/main/io_devices/piethernet.h index 95f350bc..de8c5987 100644 --- a/libs/main/io_devices/piethernet.h +++ b/libs/main/io_devices/piethernet.h @@ -500,6 +500,10 @@ private: EVENT_HANDLER1(void, clientDeleted, PIObject *, o); static void server_func(void * eth); void setType(Type t, bool reopen = true); + bool connectTCP(); +#ifdef WINDOWS + long waitForEvent(long mask); +#endif static int ethErrorCore(); static PIString ethErrorString(); diff --git a/libs/main/piplatform.h b/libs/main/piplatform.h index 548cbe29..d5125e07 100644 --- a/libs/main/piplatform.h +++ b/libs/main/piplatform.h @@ -100,4 +100,8 @@ #endif +#if defined(LINUX) || defined(MAC_OS) || defined(ANDROID) +# define POSIX_SIGNALS +#endif + #endif // PIPLATFORM_H diff --git a/libs/main/thread/pithread.cpp b/libs/main/thread/pithread.cpp index 1f4e3d2b..dccba139 100644 --- a/libs/main/thread/pithread.cpp +++ b/libs/main/thread/pithread.cpp @@ -23,6 +23,9 @@ #ifndef MICRO_PIP # include "pisystemtests.h" #endif +#ifdef WINDOWS +# include +#endif #include #if defined(WINDOWS) # define __THREAD_FUNC_RET__ uint __stdcall @@ -591,6 +594,26 @@ PIThread::~PIThread() { } +#ifdef WINDOWS +NTAPI void winThreadAPC(ULONG_PTR) { + //piCout << "APC"; +} +#endif + +void PIThread::interrupt() { + if (PRIVATE->thread == 0) return; + piCout << "PIThread::interrupt"; +#ifdef WINDOWS + CancelSynchronousIo(PRIVATE->thread); + QueueUserAPC(winThreadAPC, PRIVATE->thread, 0); +#else +# ifdef POSIX_SIGNALS + pthread_kill(PRIVATE->thread, SIGUSR2); +# endif +#endif +} + + void PIThread::stop(bool wait) { //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "stop ..." << running_ << wait; terminating = true; diff --git a/libs/main/thread/pithread.h b/libs/main/thread/pithread.h index 01c9ab05..61b90e59 100644 --- a/libs/main/thread/pithread.h +++ b/libs/main/thread/pithread.h @@ -110,6 +110,8 @@ public: EVENT_HANDLER0(void, stop) {stop(false);} EVENT_HANDLER1(void, stop, bool, wait); EVENT_HANDLER0(void, terminate); + + void interrupt(); //! \~english Set common data passed to external function //! \~russian Устанавливает данные, передаваемые в функцию потока diff --git a/main.cpp b/main.cpp index b5e9a8d1..78b276d7 100644 --- a/main.cpp +++ b/main.cpp @@ -12,43 +12,138 @@ typedef PIVector PIVariantVector; REGISTER_VARIANT(PIVariantMap); REGISTER_VARIANT(PIVariantVector); -const char J[] = - "[ \n" - "{ \n" - " \"idFligth\":\"456123\", \n" - " \"FligthPath\": \"d:/orders/nicirt/BSK-52(BBR)/219031.001/EBN-NM-BBR.IMG\",\n" - " \"FligthDataPath\": \"\", \n" - " \"Passport\": \n" - " { \n" - " \"id\": \"\", \n" - " \"TypePlane\": \"\", \n" - " \"FA_PLANEBORT\": \"Ka52-10\" \n" - " } \n" - " }, [1.1,2,3,4,{\"a\":null},{\"bool\":true,\"bool2\":false}] \n" - "] \n" -; +#ifdef WINDOWS +# include +# include +# include +# include +typedef HANDLE pipe_type; +#else +# include +typedef int pipe_type; +#endif +struct Pipe { + pipe_type fd_read = 0; + pipe_type fd_write = 0; + void create() { +#ifdef WINDOWS + CreatePipe(&fd_read, &fd_write, NULL, 0); +#else + pipe((int*)this); +#endif + } + void destoy() { +#ifdef WINDOWS + CloseHandle(fd_read); + CloseHandle(fd_write); +#else + close(fd_read); + close(fd_write); +#endif + } + int read(void * d, int s) { +#ifdef WINDOWS + DWORD ret(0); + ReadFile(fd_read, d, s, &ret, NULL); + return ret; +#else + return ::read(fd_read, d, s); +#endif + } + int write(void * d, int s) { +#ifdef WINDOWS + DWORD ret(0); + WriteFile(fd_write, d, s, &ret, NULL); + return ret; +#else + return ::write(fd_write, d, s); +#endif + } +}; + +constexpr int count = 4; +Pipe pipes[count]; + +class T: public PIThread { +public: + T(int index): PIThread() {ind = index; pipe = pipes[index];} + void run() { + PIByteArray data(1024); + piCout << "[T"< threads; + piCout << "main start"; + for (int i = 0; i < count; ++i) { + T * t = new T(i); + threads << t; + t->startOnce(); } - piCout << json; + piMSleep(100); + for (int i = 0; i < count; ++i) { + //pipes[i].write((void*)"string", 7); + piMSleep(500); + } + piCout << "main wait"; + for (int i = 0; i < count; ++i) { + threads[i]->interrupt(); + threads[i]->waitForFinish(); + piCout << "main T" << i << "done"; + } + piCout << "main end"; + for (int i = 0; i < count; ++i) { + pipes[i].destoy(); + delete threads[i]; + }*/ + + PIEthernet eth(PIEthernet::TCP_Server), seth(PIEthernet::TCP_Client); + //eth.setReadAddress("127.0.0.1", 50000); + //piCout << eth.open(); + + //PISerial ser; + //ser.setSpeed(PISerial::S9600); + //ser.setOption(PIIODevice::BlockingRead); + //piCout << ser.open("COM3"); + + /* + PIThread thread; + thread.start([&](void*){ + piCout << "[T] start" << GetCurrentThreadId(); + //PIByteArray data = ((PIIODevice*)ð)->read(1024); + eth.connect("192.168.1.13", 23, false); + piCout << "[T] connected" << eth.isConnected() << errorString(); + //piCout << "[T] readed" << data.size() << errorString(); + piCout << "[T] end"; + }); + piMSleep(500); + eth.close(); + //piMSleep(500); + //thread.stop(); + //thread.interrupt(); + //seth.send("127.0.0.1", 50000, "string", 7); + //thread.interrupt(); + thread.waitForFinish();*/ + eth.listen("127.0.0.1", 50000); + piMSleep(500); + seth.connect("127.0.0.1", 50001, false); + piMSleep(500); + piCout << "connected" << seth.isConnected(); + //eth.close(); + piCout << "main end"; + + eth.close(); return 0; }