diff --git a/CMakeLists.txt b/CMakeLists.txt index 4b13bc72..079ae9e4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -92,7 +92,7 @@ set(PIP_UTILS_LIST) set(PIP_TESTS_LIST) set(PIP_EXPORTS) -set(PIP_SRC_MODULES "console;crypt;compress;usb;fftw;opencl;io_utils;cloud;lua") +set(PIP_SRC_MODULES "console;crypt;compress;usb;fftw;opencl;io_utils;client_server;cloud;lua") foreach(_m ${PIP_SRC_MODULES}) set(PIP_MSG_${_m} "no") endforeach() @@ -392,6 +392,7 @@ if (NOT CROSSTOOLS) pip_find_lib(sodium) if(sodium_FOUND) pip_module(crypt "sodium" "PIP crypt support" "" "" "") + pip_module(client_server "pip_io_utils" "PIP client-server helper" "" "" "") pip_module(cloud "pip_io_utils" "PIP cloud support" "" "" "") endif() @@ -480,7 +481,7 @@ if (NOT CROSSTOOLS) #target_link_libraries(pip_plugin pip) add_executable(pip_test "main.cpp") - target_link_libraries(pip_test pip pip_io_utils) + target_link_libraries(pip_test pip pip_io_utils pip_client_server) if(sodium_FOUND) add_executable(pip_cloud_test "main_picloud_test.cpp") target_link_libraries(pip_cloud_test pip_cloud) @@ -585,7 +586,7 @@ if ((NOT PIP_FREERTOS) AND (NOT CROSSTOOLS)) find_package(Doxygen) if(DOXYGEN_FOUND) set(DOXY_DEFINES "${PIP_EXPORTS}") - foreach (_m "console" "usb" "compress" "crypt" "cloud" "fftw" "opencl" "io_utils" "lua") + foreach (_m "console" "usb" "compress" "crypt" "client_server" "cloud" "fftw" "opencl" "io_utils" "lua") string(TOUPPER "${_m}" _mdef) list(APPEND DOXY_DEFINES "PIP_${_mdef}_EXPORT") endforeach() diff --git a/cmake/FindPIP.cmake b/cmake/FindPIP.cmake index 8d7a295b..002c92c7 100644 --- a/cmake/FindPIP.cmake +++ b/cmake/FindPIP.cmake @@ -9,6 +9,7 @@ Create imported targets: * PIP::FFTW * PIP::OpenCL * PIP::IOUtils + * PIP::ClientServer * PIP::Cloud * PIP::Lua @@ -22,7 +23,7 @@ include(SHSTKMacros) shstk_set_find_dirs(PIP) -set(__libs "usb;crypt;console;fftw;compress;io_utils;opencl;cloud;lua") +set(__libs "usb;crypt;console;fftw;compress;opencl;io_utils;client_server;cloud;lua") if (BUILDING_PIP) #set(_libs "pip;pip_usb;pip_console;pip_crypt;pip_fftw;pip_compress;pip_opencl;pip_io_utils;pip_cloud;pip_lua") @@ -83,15 +84,16 @@ if(PIP_FIND_VERSION VERSION_GREATER PIP_VERSION) message(FATAL_ERROR "PIP version ${PIP_VERSION} is available, but ${PIP_FIND_VERSION} requested!") endif() -set(__module_usb USB ) -set(__module_console Console ) -set(__module_crypt Crypt ) -set(__module_fftw FFTW ) -set(__module_compress Compress ) -set(__module_opencl OpenCL ) -set(__module_io_utils IOUtils ) -set(__module_cloud Cloud ) -set(__module_lua Lua ) +set(__module_usb USB ) +set(__module_console Console ) +set(__module_crypt Crypt ) +set(__module_fftw FFTW ) +set(__module_compress Compress ) +set(__module_opencl OpenCL ) +set(__module_io_utils IOUtils ) +set(__module_client_server ClientServer) +set(__module_cloud Cloud ) +set(__module_lua Lua ) foreach (_l ${__libs}) set( __inc_${_l} "") @@ -99,8 +101,9 @@ foreach (_l ${__libs}) set(__libs_${_l} "") endforeach() -set(__deps_io_utils "PIP::Crypt") -set(__deps_cloud "PIP::IOUtils") +set(__deps_io_utils "PIP::Crypt" ) +set(__deps_client_server "PIP::IOUtils") +set(__deps_cloud "PIP::IOUtils") if (BUILDING_PIP) diff --git a/libs/client_server/piclientserver_client.cpp b/libs/client_server/piclientserver_client.cpp new file mode 100644 index 00000000..7c81d86b --- /dev/null +++ b/libs/client_server/piclientserver_client.cpp @@ -0,0 +1,54 @@ +/* + PIP - Platform Independent Primitives + Ivan Pelipenko peri4ko@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "piclientserver_client.h" + +#include "piclientserver_server.h" +#include "piethernet.h" + + +void PIClientServer::ServerClient::createForServer(Server * parent, PIEthernet * tcp_) { + tcp = tcp_; + tcp->setParameter(PIEthernet::KeepConnection, false); + init(); + CONNECTL(tcp, disconnected, ([this, parent](bool) { parent->clientDisconnected(this); })); +} + + +PIClientServer::Client::Client() { + tcp = new PIEthernet(PIEthernet::TCP_Client); + tcp->setParameter(PIEthernet::KeepConnection, true); + own_tcp = true; + init(); +} + + +PIClientServer::Client::~Client() { + if (tcp) tcp->setDebug(false); + close(); +} + + +void PIClientServer::Client::connect(PINetworkAddress addr) { + if (!tcp || !own_tcp) return; + close(); + config.apply(this); + tcp->connect(addr, true); + tcp->startThreadedRead(); + piCout << "Connect to" << addr.toString(); +} diff --git a/libs/client_server/piclientserver_client_base.cpp b/libs/client_server/piclientserver_client_base.cpp new file mode 100644 index 00000000..faa99b97 --- /dev/null +++ b/libs/client_server/piclientserver_client_base.cpp @@ -0,0 +1,87 @@ +/* + PIP - Platform Independent Primitives + Ivan Pelipenko peri4ko@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "piclientserver_client_base.h" + +#include "piethernet.h" +#include "piliterals_time.h" + + +PIClientServer::ClientBase::ClientBase() {} + + +PIClientServer::ClientBase::~ClientBase() { + close(); + if (own_tcp) piDeleteSafety(tcp); +} + + +void PIClientServer::ClientBase::close() { + if (!tcp) return; + can_write = false; + tcp->interrupt(); + tcp->stopAndWait(10_s); + if (tcp->isThreadedRead()) tcp->terminateThreadedRead(); + tcp->close(); + stream.clear(); +} + + +int PIClientServer::ClientBase::write(const void * d, const size_t s) { + if (!tcp) return -1; + if (!can_write) return 0; + PIMutexLocker guard(write_mutex); + // piCout << "... send ..."; + stream.send(PIByteArray(d, s)); + // piCout << "... send ok"; + return s; +} + + +void PIClientServer::ClientBase::init() { + if (!tcp) return; + CONNECTL(&stream, sendRequest, [this](const PIByteArray & ba) { + if (!can_write) return; + tcp->send(ba); + // piMSleep(1); + }); + CONNECTL(&stream, packetReceiveEvent, [this](PIByteArray & ba) { readed(ba); }); + CONNECTL(tcp, threadedReadEvent, [this](const uchar * readed, ssize_t size) { + if (!can_write) return; + stream.received(readed, size); + }); + CONNECTL(tcp, connected, [this]() { + can_write = true; + // piCout << "Connected"; + connected(); + }); + CONNECTL(tcp, disconnected, [this](bool) { + can_write = false; + stream.clear(); + // piCout << "Disconnected"; + disconnected(); + }); +} + + +void PIClientServer::ClientBase::destroy() { + can_write = false; + write_mutex.lock(); + piDeleteSafety(tcp); + // piCout << "Destroyed"; +} diff --git a/libs/client_server/piclientserver_config.cpp b/libs/client_server/piclientserver_config.cpp new file mode 100644 index 00000000..a8d190a0 --- /dev/null +++ b/libs/client_server/piclientserver_config.cpp @@ -0,0 +1,50 @@ +/* + PIP - Platform Independent Primitives + Ivan Pelipenko peri4ko@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "piclientserver_config.h" + +#include "piclientserver_client_base.h" + + +void PIClientServer::Config::setPacketSign(ushort sign) { + packet_sign = sign; +} + + +void PIClientServer::Config::setPacketSize(int bytes) { + packet_size = bytes; +} + + +void PIClientServer::Config::enableSymmetricEncryption(const PIByteArray & key) { + crypt_key = key; +} + + +void PIClientServer::Config::apply(ClientBase * client) { + auto & s(client->stream); + + s.setPacketSign(packet_sign); + s.setMaxPacketSize(packet_size); + + if (crypt_key.isNotEmpty()) { + s.setCryptEnabled(true); + s.setCryptKey(crypt_key); + } else + s.setCryptEnabled(false); +} diff --git a/libs/client_server/piclientserver_server.cpp b/libs/client_server/piclientserver_server.cpp new file mode 100644 index 00000000..164f5dfa --- /dev/null +++ b/libs/client_server/piclientserver_server.cpp @@ -0,0 +1,132 @@ +/* + PIP - Platform Independent Primitives + Ivan Pelipenko peri4ko@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "piclientserver_server.h" + +#include "piclientserver_client.h" +#include "piethernet.h" +#include "piliterals_time.h" + + +PIClientServer::Server::Server() { + tcp_server = new PIEthernet(PIEthernet::TCP_Server); + clean_thread = new PIThread(); + client_factory = [] { return new ServerClient(); }; + CONNECTL(tcp_server, newConnection, [this](PIEthernet * c) { + PIMutexLocker guard(clients_mutex); + if (clients.size_s() >= max_clients) { + piCout << "Server::newConnection overflow clients count"; + delete c; + return; + } + auto sc = client_factory(); + if (!sc) { + piCout << "ClientFactory returns nullptr!"; + return; + } + sc->createForServer(this, c); + newClient(sc); + }); + + clean_thread->start([this]() { + clean_notifier.wait(); + PIVector to_delete; + clients_mutex.lock(); + for (auto c: clients) { + const PIEthernet * eth = c->getTCP(); + if (!eth) continue; + if (eth->isConnected()) continue; + c->can_write = false; + to_delete << c; + } + for (auto c: to_delete) + clients.removeOne(c); + clients_mutex.unlock(); + for (auto c: to_delete) { + c->aboutDelete(); + c->destroy(); + delete c; + } + }); +} + + +PIClientServer::Server::~Server() { + clean_thread->stop(); + clean_notifier.notify(); + clean_thread->waitForFinish(); + piDeleteSafety(clean_thread); + stopServer(); + for (auto c: clients) { + c->aboutDelete(); + c->destroy(); + delete c; + } + piDeleteSafety(tcp_server); +} + + +void PIClientServer::Server::listen(PINetworkAddress addr) { + if (!tcp_server) return; + stopServer(); + is_closing = false; + tcp_server->listen(addr, true); + // piCout << "Listen on" << addr.toString(); +} + + +void PIClientServer::Server::setMaxClients(int new_max_clients) { + max_clients = new_max_clients; +} + + +int PIClientServer::Server::clientsCount() const { + PIMutexLocker guard(clients_mutex); + return clients.size_s(); +} + + +void PIClientServer::Server::forEachClient(std::function func) { + PIMutexLocker guard(clients_mutex); + for (auto * c: clients) { + func(c); + if (is_closing) break; + } +} + + +void PIClientServer::Server::stopServer() { + if (!tcp_server) return; + is_closing = true; + tcp_server->stopThreadedListen(); + tcp_server->stopAndWait(); +} + + +void PIClientServer::Server::newClient(ServerClient * c) { + clients << c; + config.apply(c); + c->tcp->startThreadedRead(); + c->connected(); + piCout << "New client"; +} + + +void PIClientServer::Server::clientDisconnected(ServerClient * c) { + clean_notifier.notify(); +} diff --git a/libs/main/client_server/piclientserver_client.h b/libs/main/client_server/piclientserver_client.h new file mode 100644 index 00000000..556f3a6c --- /dev/null +++ b/libs/main/client_server/piclientserver_client.h @@ -0,0 +1,69 @@ +/*! \file piclientserver_client.h + * \ingroup ClientServer + * \~\brief + * \~english + * \~russian + */ +/* + PIP - Platform Independent Primitives + Ivan Pelipenko peri4ko@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef piclientserver_client_H +#define piclientserver_client_H + +#include "piclientserver_client_base.h" + + +namespace PIClientServer { + + +// ServerClient + +class PIP_CLIENT_SERVER_EXPORT ServerClient: public ClientBase { + friend class Server; + NO_COPY_CLASS(ServerClient); + +public: + ServerClient() {} + +protected: + virtual void aboutDelete() {} + +private: + void createForServer(Server * parent, PIEthernet * tcp_); +}; + + +// Client + +class PIP_CLIENT_SERVER_EXPORT Client: public ClientBase { + NO_COPY_CLASS(Client); + +public: + Client(); + ~Client(); + + void connect(PINetworkAddress addr); + +protected: + +private: +}; + +} // namespace PIClientServer + +#endif diff --git a/libs/main/client_server/piclientserver_client_base.h b/libs/main/client_server/piclientserver_client_base.h new file mode 100644 index 00000000..3c1105ed --- /dev/null +++ b/libs/main/client_server/piclientserver_client_base.h @@ -0,0 +1,77 @@ +/*! \file piclientserver_client_base.h + * \ingroup ClientServer + * \~\brief + * \~english + * \~russian + */ +/* + PIP - Platform Independent Primitives + Ivan Pelipenko peri4ko@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef piclientserver_client_base_H +#define piclientserver_client_base_H + +#include "piclientserver_config.h" +#include "pip_client_server_export.h" +#include "pistreampacker.h" + +class PIEthernet; + +namespace PIClientServer { + +class Server; + +class PIP_CLIENT_SERVER_EXPORT ClientBase { + friend class Config; + friend class Server; + NO_COPY_CLASS(ClientBase); + +public: + ClientBase(); + virtual ~ClientBase(); + + const PIEthernet * getTCP() const { return tcp; } + + void close(); + + int write(const void * d, const size_t s); + int write(const PIByteArray & ba) { return write(ba.data(), ba.size()); } + + Config & configuration() { return config; } + +protected: + virtual void readed(PIByteArray data) {} + virtual void connected() {} + virtual void disconnected() {} + + void init(); + + bool own_tcp = false; + std::atomic_bool can_write = {true}; + PIEthernet * tcp = nullptr; + Config config; + +private: + void destroy(); + + PIStreamPacker stream; + mutable PIMutex write_mutex; +}; + +} // namespace PIClientServer + +#endif diff --git a/libs/main/client_server/piclientserver_config.h b/libs/main/client_server/piclientserver_config.h new file mode 100644 index 00000000..6ed6b5f4 --- /dev/null +++ b/libs/main/client_server/piclientserver_config.h @@ -0,0 +1,60 @@ +/*! \file piclientserver_config.h + * \ingroup ClientServer + * \~\brief + * \~english + * \~russian + */ +/* + PIP - Platform Independent Primitives + Ivan Pelipenko peri4ko@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef piclientserver_config_H +#define piclientserver_config_H + +#include "pibytearray.h" +#include "pip_client_server_export.h" + + +namespace PIClientServer { + +class Server; +class Client; +class ClientBase; + +class PIP_CLIENT_SERVER_EXPORT Config { + friend class Server; + friend class Client; + +public: + void setPacketSign(ushort sign); + void setPacketSize(int bytes); + + void enableSymmetricEncryption(const PIByteArray & key); + +protected: + void apply(ClientBase * client); + + PIByteArray crypt_key; + ushort packet_sign = 0xAFBE; + int packet_size = 1400; + +private: +}; + +} // namespace PIClientServer + +#endif diff --git a/libs/main/client_server/piclientserver_server.h b/libs/main/client_server/piclientserver_server.h new file mode 100644 index 00000000..37a380d1 --- /dev/null +++ b/libs/main/client_server/piclientserver_server.h @@ -0,0 +1,80 @@ +/*! \file piclientserver_server.h + * \ingroup ClientServer + * \~\brief + * \~english + * \~russian + */ +/* + PIP - Platform Independent Primitives + Ivan Pelipenko peri4ko@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef piclientserver_server_H +#define piclientserver_server_H + +#include "piclientserver_config.h" +#include "pimutex.h" +#include "pinetworkaddress.h" +#include "pip_client_server_export.h" +#include "pithreadnotifier.h" + +class PIEthernet; +class PIThread; + +namespace PIClientServer { + +class ServerClient; + +class PIP_CLIENT_SERVER_EXPORT Server { + friend class ServerClient; + NO_COPY_CLASS(Server); + +public: + Server(); + virtual ~Server(); + + void listen(PINetworkAddress addr); + void listenAll(ushort port) { listen({0, port}); } + + int getMaxClients() const { return max_clients; } + void setMaxClients(int new_max_clients); + int clientsCount() const; + void forEachClient(std::function func); + + void setClientFactory(std::function f) { client_factory = f; } + + Config & configuration() { return config; } + +private: + void stopServer(); + void newClient(ServerClient * c); + void clientDisconnected(ServerClient * c); + + std::function client_factory; + std::atomic_bool is_closing = {false}; + PIEthernet * tcp_server = nullptr; + PIThread * clean_thread = nullptr; + PIThreadNotifier clean_notifier; + Config config; + PIVector clients; + mutable PIMutex clients_mutex; + + int max_clients = 1000; +}; + +} // namespace PIClientServer + +#endif diff --git a/libs/main/console/pikbdlistener.h b/libs/main/console/pikbdlistener.h index 165ed9ad..27080497 100644 --- a/libs/main/console/pikbdlistener.h +++ b/libs/main/console/pikbdlistener.h @@ -29,9 +29,10 @@ #include "pithread.h" #include "pitime.h" -#define WAIT_FOR_EXIT \ - while (!PIKbdListener::exiting) \ - piMSleep(PIP_MIN_MSLEEP * 5); // TODO: rewrite with condvar +#define WAIT_FOR_EXIT \ + while (!PIKbdListener::exiting) \ + piMSleep(PIP_MIN_MSLEEP * 5); \ + if (PIKbdListener::instance()) PIKbdListener::instance()->stopAndWait(); class PIP_EXPORT PIKbdListener: public PIThread { diff --git a/libs/main/core/pibase_macros.h b/libs/main/core/pibase_macros.h index aa95ed99..3f597634 100644 --- a/libs/main/core/pibase_macros.h +++ b/libs/main/core/pibase_macros.h @@ -328,7 +328,7 @@ typedef long long ssize_t; __PrivateInitializer__(const __PrivateInitializer__ & o); \ ~__PrivateInitializer__(); \ __PrivateInitializer__ & operator=(const __PrivateInitializer__ & o); \ - __Private__ * p; \ + __Private__ * p = nullptr; \ }; \ __PrivateInitializer__ __privateinitializer__; @@ -343,11 +343,10 @@ typedef long long ssize_t; p = new c::__Private__(); \ } \ c::__PrivateInitializer__::~__PrivateInitializer__() { \ - delete p; \ - p = 0; \ + piDeleteSafety(p); \ } \ c::__PrivateInitializer__ & c::__PrivateInitializer__::operator=(const c::__PrivateInitializer__ &) { \ - if (p) delete p; \ + piDeleteSafety(p); \ p = new c::__Private__(); \ return *this; \ } diff --git a/libs/main/introspection/piintrospection_server.cpp b/libs/main/introspection/piintrospection_server.cpp index 519ca06d..5e71e820 100644 --- a/libs/main/introspection/piintrospection_server.cpp +++ b/libs/main/introspection/piintrospection_server.cpp @@ -33,15 +33,11 @@ PRIVATE_DEFINITION_END(PIIntrospectionServer) PIIntrospectionServer::PIIntrospectionServer(): PIPeer(genName()) { PRIVATE->process_info = PIIntrospection::getInfo(); - sysmon = 0; } PIIntrospectionServer::~PIIntrospectionServer() { - PIPeer::stop(); - if (sysmon) - if (sysmon->property("__iserver__").toBool()) delete sysmon; - sysmon = 0; + // stop(); } @@ -69,6 +65,15 @@ void PIIntrospectionServer::start(const PIString & server_name) { } +void PIIntrospectionServer::stop() { + PIPeer::stopAndWait(); + PIPeer::destroy(); + if (sysmon) + if (sysmon->property("__iserver__").toBool()) delete sysmon; + sysmon = nullptr; +} + + PIString PIIntrospectionServer::genName() { randomize(); return "__introspection__server_" + PIString::fromNumber(randomi() % 1000); @@ -102,7 +107,7 @@ void PIIntrospectionServer::dataReceived(const PIString & from, const PIByteArra void PIIntrospectionServer::sysmonDeleted() { PIMutexLocker _ml(sysmon_mutex); - sysmon = 0; + sysmon = nullptr; } #endif // PIP_INTROSPECTION diff --git a/libs/main/introspection/piintrospection_server.h b/libs/main/introspection/piintrospection_server.h index 8833350a..45e1acd8 100644 --- a/libs/main/introspection/piintrospection_server.h +++ b/libs/main/introspection/piintrospection_server.h @@ -33,6 +33,11 @@ //! \~russian Запускает сервер интроспекции с именем "name" # define PIINTROSPECTION_START(name) +//! \ingroup Introspection +//! \~english Stop introspection server +//! \~russian Останавливает сервер интроспекции +# define PIINTROSPECTION_STOP + #else # if defined(PIP_INTROSPECTION) && !defined(PIP_FORCE_NO_PIINTROSPECTION) @@ -44,6 +49,7 @@ class PISystemMonitor; # define PIINTROSPECTION_SERVER (PIIntrospectionServer::instance()) # define PIINTROSPECTION_START(name) PIINTROSPECTION_SERVER->start(#name); +# define PIINTROSPECTION_STOP PIINTROSPECTION_SERVER->stop(); class PIP_EXPORT PIIntrospectionServer: public PIPeer { PIOBJECT_SUBCLASS(PIIntrospectionServer, PIPeer); @@ -52,6 +58,7 @@ public: static PIIntrospectionServer * instance(); void start(const PIString & server_name); + void stop(); private: PIIntrospectionServer(); @@ -59,17 +66,17 @@ private: NO_COPY_CLASS(PIIntrospectionServer); PIString genName(); - virtual void dataReceived(const PIString & from, const PIByteArray & data); + void dataReceived(const PIString & from, const PIByteArray & data) override; EVENT_HANDLER(void, sysmonDeleted); PRIVATE_DECLARATION(PIP_EXPORT) - PITimer itimer; - PISystemMonitor * sysmon; + PISystemMonitor * sysmon = nullptr; PIMutex sysmon_mutex; }; # else # define PIINTROSPECTION_START(name) +# define PIINTROSPECTION_STOP # endif #endif // DOXYGEN diff --git a/libs/main/introspection/piintrospection_server_p.cpp b/libs/main/introspection/piintrospection_server_p.cpp index d9ef1359..613b3e28 100644 --- a/libs/main/introspection/piintrospection_server_p.cpp +++ b/libs/main/introspection/piintrospection_server_p.cpp @@ -139,8 +139,10 @@ PIByteArray PIIntrospection::packThreads() { PIMap & tm(p->threads); auto it = tm.makeIterator(); while (it.next()) { - it.value().classname = PIStringAscii(it.key()->className()); - it.value().name = it.key()->name(); + if (it.key()->isPIObject()) { + it.value().classname = PIStringAscii(it.key()->className()); + it.value().name = it.key()->name(); + } } ret << tm.values(); p->mutex.unlock(); diff --git a/libs/main/io_devices/piethernet.cpp b/libs/main/io_devices/piethernet.cpp index 4aa86b93..d4d7d04e 100644 --- a/libs/main/io_devices/piethernet.cpp +++ b/libs/main/io_devices/piethernet.cpp @@ -303,18 +303,12 @@ bool PIEthernet::closeDevice() { // piCoutObj << "close"; bool ned = connected_; connected_ = connecting_ = false; - server_thread_.stop(); - PRIVATE->event.interrupt(); - if (server_thread_.isRunning()) { - if (!server_thread_.waitForFinish(1_s)) server_thread_.terminate(); - } - PRIVATE->event.destroy(); - if (sock_s == sock) sock_s = -1; - closeSocket(sock); - closeSocket(sock_s); + stopThreadedListen(); clients_mutex.lock(); - piDeleteAllAndClear(clients_); + auto cl = clients_; + clients_.clear(); clients_mutex.unlock(); + piDeleteAll(cl); if (ned) { // piCoutObj << "Disconnect on close"; disconnected(false); @@ -528,16 +522,32 @@ bool PIEthernet::listen(const PINetworkAddress & addr, bool threaded) { return listen(threaded); } + +void PIEthernet::stopThreadedListen() { + server_thread_.stop(); + PRIVATE->event.interrupt(); + if (server_thread_.isRunning()) { + if (!server_thread_.waitForFinish(1_s)) server_thread_.terminate(); + } + PRIVATE->event.destroy(); + if (sock_s == sock) sock_s = -1; + closeSocket(sock); + closeSocket(sock_s); +} + + PIEthernet * PIEthernet::client(int index) { PIMutexLocker locker(clients_mutex); return clients_[index]; } + int PIEthernet::clientsCount() const { PIMutexLocker locker(clients_mutex); return clients_.size_s(); } + PIVector PIEthernet::clients() const { PIMutexLocker locker(clients_mutex); return clients_; @@ -777,6 +787,9 @@ ssize_t PIEthernet::writeDevice(const void * data, ssize_t max_size) { closeSocket(sock); init(); disconnected(true); + if (params[KeepConnection]) { + connect(); + } } }; if (!isOptionSet(BlockingWrite)) { diff --git a/libs/main/io_devices/piethernet.h b/libs/main/io_devices/piethernet.h index a1c10c3f..e2a2e5e9 100644 --- a/libs/main/io_devices/piethernet.h +++ b/libs/main/io_devices/piethernet.h @@ -251,6 +251,8 @@ public: //! Start listen for incoming TCP connections on address "addr". Use only for TCP_Server bool listen(const PINetworkAddress & addr, bool threaded = false); + void stopThreadedListen(); + PIEthernet * client(int index); int clientsCount() const; PIVector clients() const; diff --git a/libs/main/io_devices/piiodevice.h b/libs/main/io_devices/piiodevice.h index db7225d7..9b4f92d8 100644 --- a/libs/main/io_devices/piiodevice.h +++ b/libs/main/io_devices/piiodevice.h @@ -243,6 +243,10 @@ public: //! \~russian Возвращает запущен ли поток чтения bool isThreadedRead() const; + //! \~english Returns if threaded read is stopping + //! \~russian Возвращает останавливается ли поток чтения + bool isThreadedReadStopping() const { return read_thread.isStopping(); } + //! \~english Start threaded read //! \~russian Запускает потоковое чтение void startThreadedRead(); @@ -565,8 +569,6 @@ protected: static PIIODevice * newDeviceByPrefix(const char * prefix); - bool isThreadedReadStopping() const { return read_thread.isStopping(); } - DeviceMode mode_ = ReadOnly; DeviceOptions options_; ReadRetFunc func_read = nullptr; diff --git a/libs/main/io_devices/pipeer.cpp b/libs/main/io_devices/pipeer.cpp index 180ad28e..bbbce3fe 100644 --- a/libs/main/io_devices/pipeer.cpp +++ b/libs/main/io_devices/pipeer.cpp @@ -201,33 +201,7 @@ PIPeer::~PIPeer() { stop(); if (destroyed) return; destroyed = true; - sync_timer.stopAndWait(); - diag_s.stopAndWait(); - diag_d.stopAndWait(); - PIMutexLocker ml(peers_mutex); - piForeach(PeerInfo & p, peers) - if (p._data) { - p._data->dt_in.stop(); - p._data->dt_out.stop(); - p._data->t.stopAndWait(); - } - destroyEths(); - piForeach(PIEthernet * i, eths_mcast) { - if (!i) continue; - i->stopAndWait(); - } - piForeach(PIEthernet * i, eths_bcast) { - if (!i) continue; - i->stopAndWait(); - } - eth_lo.stopAndWait(); - eth_tcp_srv.stopAndWait(); - eth_tcp_cli.stopAndWait(); - sendSelfRemove(); - destroyMBcasts(); - eth_send.close(); - piForeach(PeerInfo & p, peers) - p.destroy(); + destroy(); } @@ -363,6 +337,42 @@ void PIPeer::initMBcasts(PIStringList al) { } +void PIPeer::destroy() { + sync_timer.stopAndWait(); + diag_s.stopAndWait(); + diag_d.stopAndWait(); + PIMutexLocker ml(peers_mutex); + for (auto & p: peers) + if (p._data) { + p._data->dt_in.stop(); + p._data->dt_out.stop(); + p._data->t.stopAndWait(); + } + destroyEths(); + for (auto * i: eths_mcast) { + if (!i) continue; + i->stopAndWait(); + } + for (auto * i: eths_bcast) { + if (!i) continue; + i->stopAndWait(); + } + eth_lo.stopAndWait(); + eth_tcp_srv.stopAndWait(); + eth_tcp_cli.stopAndWait(); + sendSelfRemove(); + eth_lo.close(); + eth_tcp_srv.close(); + eth_tcp_cli.close(); + destroyMBcasts(); + eth_send.close(); + for (auto & p: peers) + p.destroy(); + peers.clear(); + destroyed = true; +} + + void PIPeer::destroyEths() { for (auto * i: eths_traffic) { if (!i) continue; @@ -930,6 +940,7 @@ ssize_t PIPeer::bytesAvailable() const { ssize_t PIPeer::readDevice(void * read_to, ssize_t max_size) { + iterrupted = false; read_buffer_mutex.lock(); bool empty = read_buffer.isEmpty(); read_buffer_mutex.unlock(); @@ -937,6 +948,9 @@ ssize_t PIPeer::readDevice(void * read_to, ssize_t max_size) { read_buffer_mutex.lock(); empty = read_buffer.isEmpty(); read_buffer_mutex.unlock(); + if (iterrupted) { + return 0; + } piMSleep(10); } read_buffer_mutex.lock(); @@ -945,6 +959,9 @@ ssize_t PIPeer::readDevice(void * read_to, ssize_t max_size) { read_buffer_mutex.unlock(); ssize_t sz = piMini(ba.size_s(), max_size); memcpy(read_to, ba.data(), sz); + if (iterrupted) { + return 0; + } return sz; } read_buffer_mutex.unlock(); @@ -964,6 +981,11 @@ ssize_t PIPeer::writeDevice(const void * data, ssize_t size) { } +void PIPeer::interrupt() { + iterrupted = true; +} + + void PIPeer::newTcpClient(PIEthernet * client) { client->setName("__S__PIPeer_eth_TCP_ServerClient" + client->path()); piCoutObj << "client" << client->path(); diff --git a/libs/main/io_devices/pipeer.h b/libs/main/io_devices/pipeer.h index d69f4da1..09fae55d 100644 --- a/libs/main/io_devices/pipeer.h +++ b/libs/main/io_devices/pipeer.h @@ -171,6 +171,8 @@ protected: EVENT_HANDLER2(bool, dataRead, const uchar *, readed, ssize_t, size); EVENT_HANDLER2(bool, mbcastRead, const uchar *, readed, ssize_t, size); + void destroy(); + private: EVENT_HANDLER1(void, timerEvent, int, delim); EVENT_HANDLER2(bool, sendInternal, const PIString &, to, const PIByteArray &, data); @@ -212,6 +214,7 @@ private: void configureFromVariantDevice(const PIPropertyStorage & d) override; ssize_t readDevice(void * read_to, ssize_t max_size) override; ssize_t writeDevice(const void * data, ssize_t size) override; + void interrupt() override; DeviceInfoFlags deviceInfoFlags() const override { return PIIODevice::Reliable; } PeerInfo * quickestPeer(const PIString & to); @@ -243,6 +246,7 @@ private: mutable PIMutex read_buffer_mutex; PIQueue read_buffer; int read_buffer_size; + std::atomic_bool iterrupted = {false}; PIMutex mc_mutex, eth_mutex, peers_mutex, send_mutex, send_mc_mutex; }; diff --git a/libs/main/system/pisystemmonitor.cpp b/libs/main/system/pisystemmonitor.cpp index c8e61f11..960caea5 100644 --- a/libs/main/system/pisystemmonitor.cpp +++ b/libs/main/system/pisystemmonitor.cpp @@ -150,7 +150,7 @@ void PISystemMonitor::setStatistic(const PISystemMonitor::ProcessStats & s) { void PISystemMonitor::stop() { - PIThread::stop(); + PIThread::stopAndWait(); #ifdef WINDOWS if (PRIVATE->hProc != 0) { CloseHandle(PRIVATE->hProc); diff --git a/libs/main/thread/pithread.cpp b/libs/main/thread/pithread.cpp index c75776a3..5dd1f287 100644 --- a/libs/main/thread/pithread.cpp +++ b/libs/main/thread/pithread.cpp @@ -568,6 +568,7 @@ PIThread::PIThread(bool startNow, PISystemTime loop_delay): PIObject() { PIThread::~PIThread() { PIINTROSPECTION_THREAD_DELETE(this); if (!running_ || PRIVATE->thread == 0) return; + piCout << "[PIThread \"" << name() << "\"] Warning, terminate on destructor!"; #ifdef FREERTOS // void * ret(0); // PICout(PICoutManipulators::DefaultControls) << "~PIThread" << PRIVATE->thread; @@ -587,6 +588,8 @@ PIThread::~PIThread() { CloseHandle(PRIVATE->thread); # endif #endif + UNREGISTER_THREAD(this); + PIINTROSPECTION_THREAD_STOP(this); terminating = running_ = false; } diff --git a/libs/main/thread/pithreadnotifier.cpp b/libs/main/thread/pithreadnotifier.cpp index 2e54d954..b0a4a79e 100644 --- a/libs/main/thread/pithreadnotifier.cpp +++ b/libs/main/thread/pithreadnotifier.cpp @@ -72,9 +72,9 @@ //! w->startOnce(); //! //! piMSleep(500); -//! notifier.notifyOnce(); // notify one of them after 500 ms +//! notifier.notify(); // notify one of them after 500 ms //! piMSleep(500); -//! notifier.notifyOnce(); // notify one of them after 1000 ms +//! notifier.notify(); // notify one of them after 1000 ms //! //! for (auto * w: workers) //! w->waitForFinish(); @@ -93,17 +93,17 @@ PIThreadNotifier::PIThreadNotifier(): cnt(0) {} //! \~\details //! \~english -//! If \a notifyOnce() has been called before, then returns immediately.\n -//! If \a notifyOnce() has been called "n" times, then returns immediately "n" times, +//! If \a notify() has been called before, then returns immediately.\n +//! If \a notify() has been called "n" times, then returns immediately "n" times, //! but only if wait in one thread.\n -//! If many threads waiting, then if \a notifyOnce() has been called "n" times, +//! If many threads waiting, then if \a notify() has been called "n" times, //! all threads total returns "n" times in undefined sequence. //! //! \~russian -//! Если ранее был вызван \a notifyOnce(), то возвращает управление немедленно.\n -//! Если ранее был вызван \a notifyOnce() "n" раз, то возвращает управление немедленно "n" раз, +//! Если ранее был вызван \a notify(), то возвращает управление немедленно.\n +//! Если ранее был вызван \a notify() "n" раз, то возвращает управление немедленно "n" раз, //! но только если ожидать одним потоком.\n -//! Если ожидают несколько потоков, и \a notifyOnce() был вызван "n" раз, +//! Если ожидают несколько потоков, и \a notify() был вызван "n" раз, //! то все потоки суммарно вернут управление "n" раз в неопределенной последовательности. //! void PIThreadNotifier::wait() { diff --git a/libs/main/thread/pithreadnotifier.h b/libs/main/thread/pithreadnotifier.h index 2cb0571e..e7657113 100644 --- a/libs/main/thread/pithreadnotifier.h +++ b/libs/main/thread/pithreadnotifier.h @@ -33,8 +33,8 @@ class PIP_EXPORT PIThreadNotifier { public: PIThreadNotifier(); - //! \~english Start waiting, return if other thread call \a notifyOnce() - //! \~russian Начать ожидание, продолжает когда другой поток вызовет \a notifyOnce() + //! \~english Start waiting, return if other thread call \a notify() + //! \~russian Начать ожидание, продолжает когда другой поток вызовет \a notify() void wait(); //! \~english Notify one waiting thread, which waiting on \a wait() function diff --git a/main.cpp b/main.cpp index c748f73f..a9c31875 100644 --- a/main.cpp +++ b/main.cpp @@ -1,5 +1,8 @@ #include "pibytearray.h" +#include "piclientserver_client.h" +#include "piclientserver_server.h" #include "picodeparser.h" +#include "piintrospection_server.h" #include "piiostream.h" #include "pijson.h" #include "pimathbase.h" @@ -16,22 +19,122 @@ enum MyEvent { meIntString, }; -PITimeMeasurer tm; -std::atomic_int cnt = {0}; -void tfunc(int delim) { - // piCout << "tick with delimiter" << delim; - ++cnt; +PIKbdListener kbd; + + +class MyServerClient: public PIClientServer::ServerClient { +public: + ~MyServerClient() { send_thread.stopAndWait(); } + +protected: + void readed(PIByteArray data) override { piCout << "readed" << (data.size()); } + // void aboutDelete() override { piCout << "aboutDelete"; } + // void disconnected() override { piCout << "disconnected"; } + void connected() override { + // piCout << "connected"; + send_thread.start( + [this] { + // write((PIString::fromNumber(++counter)).toUTF8()); + PIByteArray ba(64_KiB); + write(ba); + }, + 2_Hz); + } + PIThread send_thread; + int counter = 0; }; -void tfunc4(int delim) { - piCout << "tick4 with delimiter" << delim; + + +class MyClient: public PIClientServer::Client { +public: + ~MyClient() { send_thread.stopAndWait(); } + +protected: + void readed(PIByteArray data) override { piCout << "readed" << (data.size()); } + void disconnected() override { piCout << "disconnected"; } + void connected() override { + piCout << "connected"; + send_thread.start( + [this] { + // write((PIString::fromNumber(++counter)).toUTF8()); + write(PIByteArray(64_KiB)); + }, + 2_Hz); + } + PIThread send_thread; + int counter = 0; }; int main(int argc, char * argv[]) { - uint v = 0xaabbccdd; - piCout << Hex << v << piChangedEndian(v); - piChangeEndianBinary(&v, sizeof(v)); - piCout << Hex << v << piChangedEndian(v); + /*PIPeer p("123"); + + piCout << "start ..."; + p.start(); + piCout << "start ok"; + + piSleep(1.); + + piCout << "stop ..."; + p.stopAndWait(); + piCout << "stop ok"; + + piSleep(1.); + + piCout << "exit"; + return 0;*/ + + if (argc > 1) { + PIINTROSPECTION_START(server); + } else { + PIINTROSPECTION_START(client); + } + + kbd.enableExitCapture(); + + + PIClientServer::Server * s = nullptr; + PIThread * s_thread = new PIThread(); + PIVector cv; + + if (argc > 1) { + piCout << "Server"; + s = new PIClientServer::Server(); + s->setClientFactory([] { return new MyServerClient(); }); + s->configuration().enableSymmetricEncryption("1122334455667788"_hex); + s->listenAll(12345); + s_thread->start( + [s] { + piCout << "*** clients" << s->clientsCount(); + int i = 0; + s->forEachClient([&i](PIClientServer::ServerClient * c) { + // piCout << "client" << ++i << c; + c->write(PIByteArray(1_KiB)); + // c->close(); + // piMSleep(200); + }); + }, + 1._Hz); + } else { + piCout << "Client"; + piForTimes(2) { + piMSleep(25); + auto c = new MyClient(); + c->configuration().enableSymmetricEncryption("1122334455667788"_hex); + c->connect(PINetworkAddress::resolve("127.0.0.1", 12345)); + cv << c; + } + } + + WAIT_FOR_EXIT; + + s_thread->stopAndWait(); + + piDeleteSafety(s); + piDeleteAllAndClear(cv); + + PIINTROSPECTION_STOP + return 0; /*PIPackedTCP * tcp_s = PIIODevice::createFromFullPath("ptcp://s::8000")->cast(); // new PIPackedTCP(PIPackedTCP::Server, {"0.0.0.0:8000"});