From da4b09be9e253bdc052a45b53cc00969a539b8d1 Mon Sep 17 00:00:00 2001 From: peri4 Date: Wed, 11 Sep 2024 21:41:55 +0300 Subject: [PATCH] PIEthernet fix tcp-server close (properly delete all clients) PIEthernet::stopThreadedListen() method decompose client to 2 implementations - server-side and client-side --- libs/client_server/piclientserver_client.cpp | 50 ++++++++++++++ ...ent.cpp => piclientserver_client_base.cpp} | 58 ++++------------ ...r_server.cpp => piclientserver_server.cpp} | 31 +++++++-- .../client_server/piclientserver_client.h | 69 +++++++++++++++++++ ..._client.h => piclientserver_client_base.h} | 27 +++----- ...erver_server.h => piclientserver_server.h} | 28 ++++---- libs/main/io_devices/piethernet.cpp | 30 +++++--- libs/main/io_devices/piethernet.h | 2 + libs/main/io_devices/piiodevice.h | 6 +- main.cpp | 57 ++++++++++++--- 10 files changed, 257 insertions(+), 101 deletions(-) create mode 100644 libs/client_server/piclientserver_client.cpp rename libs/client_server/{piclient_server_client.cpp => piclientserver_client_base.cpp} (62%) rename libs/client_server/{piclient_server_server.cpp => piclientserver_server.cpp} (79%) create mode 100644 libs/main/client_server/piclientserver_client.h rename libs/main/client_server/{piclient_server_client.h => piclientserver_client_base.h} (79%) rename libs/main/client_server/{piclient_server_server.h => piclientserver_server.h} (71%) diff --git a/libs/client_server/piclientserver_client.cpp b/libs/client_server/piclientserver_client.cpp new file mode 100644 index 00000000..040ea744 --- /dev/null +++ b/libs/client_server/piclientserver_client.cpp @@ -0,0 +1,50 @@ +/* + PIP - Platform Independent Primitives + Ivan Pelipenko peri4ko@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "piclientserver_client.h" + +#include "piethernet.h" + + +void PIClientServer::ServerClient::createForServer(PIEthernet * tcp_) { + tcp = tcp_; + tcp->setParameter(PIEthernet::KeepConnection, false); + init(); +} + + +PIClientServer::Client::Client() { + tcp = new PIEthernet(PIEthernet::TCP_Client); + tcp->setParameter(PIEthernet::KeepConnection, true); + own_tcp = true; + init(); +} + + +PIClientServer::Client::~Client() { + stop(); +} + + +void PIClientServer::Client::connect(PINetworkAddress addr) { + if (!tcp || !own_tcp) return; + stop(); + tcp->connect(addr, true); + tcp->startThreadedRead(); + piCout << "Connect to" << addr.toString(); +} diff --git a/libs/client_server/piclient_server_client.cpp b/libs/client_server/piclientserver_client_base.cpp similarity index 62% rename from libs/client_server/piclient_server_client.cpp rename to libs/client_server/piclientserver_client_base.cpp index d5b51ee2..d01bc7ec 100644 --- a/libs/client_server/piclient_server_client.cpp +++ b/libs/client_server/piclientserver_client_base.cpp @@ -16,41 +16,22 @@ along with this program. If not, see . */ -#include "piclient_server_client.h" +#include "piclientserver_client_base.h" #include "piethernet.h" #include "piliterals_time.h" -#include "pitime.h" -PIClientServer::Client::Client() {} +PIClientServer::ClientBase::ClientBase() {} -PIClientServer::Client::~Client() { +PIClientServer::ClientBase::~ClientBase() { stop(); if (own_tcp) piDeleteSafety(tcp); } -void PIClientServer::Client::createNew() { - if (tcp) return; - tcp = new PIEthernet(PIEthernet::TCP_Client); - tcp->setParameter(PIEthernet::KeepConnection, true); - own_tcp = true; - init(); -} - - -void PIClientServer::Client::connect(PINetworkAddress addr) { - if (!tcp || !own_tcp) return; - stop(); - tcp->connect(addr, true); - tcp->startThreadedRead(); - piCout << "Connect to" << addr.toString(); -} - - -void PIClientServer::Client::stop() { +void PIClientServer::ClientBase::stop() { if (!tcp) return; can_write = false; tcp->interrupt(); @@ -60,18 +41,18 @@ void PIClientServer::Client::stop() { } -int PIClientServer::Client::write(const void * d, const size_t s) { +int PIClientServer::ClientBase::write(const void * d, const size_t s) { if (!tcp) return -1; if (!can_write) return 0; PIMutexLocker guard(write_mutex); - piCout << "... send ..."; + // piCout << "... send ..."; stream.send(PIByteArray(d, s)); - piCout << "... send ok"; + // piCout << "... send ok"; return s; } -void PIClientServer::Client::enableSymmetricEncryption(const PIByteArray & key) { +void PIClientServer::ClientBase::enableSymmetricEncryption(const PIByteArray & key) { if (key.isNotEmpty()) { stream.setCryptEnabled(true); stream.setCryptKey(key); @@ -80,46 +61,35 @@ void PIClientServer::Client::enableSymmetricEncryption(const PIByteArray & key) } -void PIClientServer::Client::createForServer(PIEthernet * tcp_) { - tcp = tcp_; - tcp->setParameter(PIEthernet::KeepConnection, false); - init(); -} - - -void PIClientServer::Client::init() { +void PIClientServer::ClientBase::init() { if (!tcp) return; CONNECTL(&stream, sendRequest, [this](const PIByteArray & ba) { if (!can_write) return; tcp->send(ba); // piMSleep(1); }); - CONNECTL(&stream, packetReceiveEvent, [this](PIByteArray & ba) { - if (readed_func) readed_func(ba); - readed(ba); - }); + CONNECTL(&stream, packetReceiveEvent, [this](PIByteArray & ba) { readed(ba); }); CONNECTL(tcp, threadedReadEvent, [this](const uchar * readed, ssize_t size) { if (!can_write) return; stream.received(readed, size); }); CONNECTL(tcp, connected, [this]() { can_write = true; - piCout << "Connected"; + // piCout << "Connected"; connected(); }); CONNECTL(tcp, disconnected, [this](bool) { can_write = false; stream.clear(); - piCout << "Disconnected"; + // piCout << "Disconnected"; disconnected(); }); } -void PIClientServer::Client::destroy() { +void PIClientServer::ClientBase::destroy() { can_write = false; write_mutex.lock(); piDeleteSafety(tcp); - aboutDelete(); - piCout << "Destroyed"; + // piCout << "Destroyed"; } diff --git a/libs/client_server/piclient_server_server.cpp b/libs/client_server/piclientserver_server.cpp similarity index 79% rename from libs/client_server/piclient_server_server.cpp rename to libs/client_server/piclientserver_server.cpp index 09260ebd..439eac1e 100644 --- a/libs/client_server/piclient_server_server.cpp +++ b/libs/client_server/piclientserver_server.cpp @@ -16,9 +16,9 @@ along with this program. If not, see . */ -#include "piclient_server_server.h" +#include "piclientserver_server.h" -#include "piclient_server_client.h" +#include "piclientserver_client.h" #include "piethernet.h" #include "piliterals_time.h" @@ -26,7 +26,7 @@ PIClientServer::Server::Server() { tcp_server = new PIEthernet(PIEthernet::TCP_Server); clean_thread = new PIThread(); - client_factory = [] { return new Client(); }; + client_factory = [] { return new ServerClient(); }; CONNECTL(tcp_server, newConnection, [this](PIEthernet * c) { PIMutexLocker guard(clients_mutex); if (clients.size_s() >= max_clients) { @@ -45,7 +45,7 @@ PIClientServer::Server::Server() { clean_thread->start( [this]() { - PIVector to_delete; + PIVector to_delete; clients_mutex.lock(); for (auto c: clients) { const PIEthernet * eth = c->getTCP(); @@ -58,6 +58,7 @@ PIClientServer::Server::Server() { clients.removeOne(c); clients_mutex.unlock(); for (auto c: to_delete) { + c->aboutDelete(); c->destroy(); delete c; } @@ -71,6 +72,7 @@ PIClientServer::Server::~Server() { piDeleteSafety(clean_thread); stopServer(); for (auto c: clients) { + c->aboutDelete(); c->destroy(); delete c; } @@ -81,6 +83,7 @@ PIClientServer::Server::~Server() { void PIClientServer::Server::listen(PINetworkAddress addr) { if (!tcp_server) return; stopServer(); + is_closing = false; tcp_server->listen(addr, true); // piCout << "Listen on" << addr.toString(); } @@ -91,6 +94,21 @@ void PIClientServer::Server::setMaxClients(int new_max_clients) { } +int PIClientServer::Server::clientsCount() const { + PIMutexLocker guard(clients_mutex); + return clients.size_s(); +} + + +void PIClientServer::Server::forEachClient(std::function func) { + PIMutexLocker guard(clients_mutex); + for (auto * c: clients) { + func(c); + if (is_closing) break; + } +} + + void PIClientServer::Server::enableSymmetricEncryption(const PIByteArray & key) { crypt_key = key; } @@ -98,14 +116,15 @@ void PIClientServer::Server::enableSymmetricEncryption(const PIByteArray & key) void PIClientServer::Server::stopServer() { if (!tcp_server) return; + is_closing = true; + tcp_server->stopThreadedListen(); tcp_server->stopAndWait(); } -void PIClientServer::Server::newClient(Client * c) { +void PIClientServer::Server::newClient(ServerClient * c) { clients << c; c->enableSymmetricEncryption(crypt_key); - c->readed_func = [this, c](PIByteArray ba) { readed(c, ba); }; c->tcp->startThreadedRead(); c->connected(); piCout << "New client"; diff --git a/libs/main/client_server/piclientserver_client.h b/libs/main/client_server/piclientserver_client.h new file mode 100644 index 00000000..6d160a5b --- /dev/null +++ b/libs/main/client_server/piclientserver_client.h @@ -0,0 +1,69 @@ +/*! \file piclientserver_client.h + * \ingroup ClientServer + * \~\brief + * \~english + * \~russian + */ +/* + PIP - Platform Independent Primitives + Ivan Pelipenko peri4ko@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef piclientserver_client_H +#define piclientserver_client_H + +#include "piclientserver_client_base.h" + + +namespace PIClientServer { + + +// ServerClient + +class PIP_CLIENT_SERVER_EXPORT ServerClient: public ClientBase { + friend class Server; + NO_COPY_CLASS(ServerClient); + +public: + ServerClient() {} + +protected: + virtual void aboutDelete() {} + +private: + void createForServer(PIEthernet * tcp_); +}; + + +// Client + +class PIP_CLIENT_SERVER_EXPORT Client: public ClientBase { + NO_COPY_CLASS(Client); + +public: + Client(); + ~Client(); + + void connect(PINetworkAddress addr); + +protected: + +private: +}; + +} // namespace PIClientServer + +#endif diff --git a/libs/main/client_server/piclient_server_client.h b/libs/main/client_server/piclientserver_client_base.h similarity index 79% rename from libs/main/client_server/piclient_server_client.h rename to libs/main/client_server/piclientserver_client_base.h index 3181523f..e587c095 100644 --- a/libs/main/client_server/piclient_server_client.h +++ b/libs/main/client_server/piclientserver_client_base.h @@ -1,4 +1,4 @@ -/*! \file piclient_server_client.h +/*! \file piclientserver_client_base.h * \ingroup ClientServer * \~\brief * \~english @@ -22,8 +22,8 @@ along with this program. If not, see . */ -#ifndef piclient_server_client_H -#define piclient_server_client_H +#ifndef piclientserver_client_base_H +#define piclientserver_client_base_H #include "pinetworkaddress.h" #include "pip_client_server_export.h" @@ -35,18 +35,14 @@ namespace PIClientServer { class Server; -class PIP_CLIENT_SERVER_EXPORT Client: public PIObject { +class PIP_CLIENT_SERVER_EXPORT ClientBase { friend class Server; - NO_COPY_CLASS(Client); - PIOBJECT(Client); + NO_COPY_CLASS(ClientBase); public: - Client(); - virtual ~Client(); + ClientBase(); + virtual ~ClientBase(); - void createNew(); - - void connect(PINetworkAddress addr); const PIEthernet * getTCP() const { return tcp; } void stop(); @@ -60,19 +56,18 @@ protected: virtual void readed(PIByteArray data) {} virtual void connected() {} virtual void disconnected() {} - virtual void aboutDelete() {} -private: - void createForServer(PIEthernet * tcp_); void init(); - void destroy(); bool own_tcp = false; std::atomic_bool can_write = {true}; PIEthernet * tcp = nullptr; + +private: + void destroy(); + PIStreamPacker stream; mutable PIMutex write_mutex; - std::function readed_func = nullptr; }; } // namespace PIClientServer diff --git a/libs/main/client_server/piclient_server_server.h b/libs/main/client_server/piclientserver_server.h similarity index 71% rename from libs/main/client_server/piclient_server_server.h rename to libs/main/client_server/piclientserver_server.h index 7143db71..07681ec2 100644 --- a/libs/main/client_server/piclient_server_server.h +++ b/libs/main/client_server/piclientserver_server.h @@ -1,4 +1,4 @@ -/*! \file piclient_server_server.h +/*! \file piclientserver_server.h * \ingroup ClientServer * \~\brief * \~english @@ -22,8 +22,8 @@ along with this program. If not, see . */ -#ifndef piclient_server_server_H -#define piclient_server_server_H +#ifndef piclientserver_server_H +#define piclientserver_server_H #include "pimutex.h" #include "pinetworkaddress.h" @@ -34,7 +34,7 @@ class PIThread; namespace PIClientServer { -class Client; +class ServerClient; class PIP_CLIENT_SERVER_EXPORT Server { public: @@ -46,24 +46,24 @@ public: int getMaxClients() const { return max_clients; } void setMaxClients(int new_max_clients); + int clientsCount() const; + void forEachClient(std::function func); - void setClientFactory(std::function f) { client_factory = f; } + void setClientFactory(std::function f) { client_factory = f; } void enableSymmetricEncryption(const PIByteArray & key); -protected: - virtual void readed(Client * c, PIByteArray data) {} - private: void stopServer(); - void newClient(Client * c); + void newClient(ServerClient * c); - std::function client_factory; - PIEthernet * tcp_server = nullptr; - PIThread * clean_thread = nullptr; + std::function client_factory; + std::atomic_bool is_closing = {false}; + PIEthernet * tcp_server = nullptr; + PIThread * clean_thread = nullptr; PIByteArray crypt_key; - PIVector clients; - PIMutex clients_mutex; + PIVector clients; + mutable PIMutex clients_mutex; int max_clients = 1000; }; diff --git a/libs/main/io_devices/piethernet.cpp b/libs/main/io_devices/piethernet.cpp index 796d67d0..d4d7d04e 100644 --- a/libs/main/io_devices/piethernet.cpp +++ b/libs/main/io_devices/piethernet.cpp @@ -303,18 +303,12 @@ bool PIEthernet::closeDevice() { // piCoutObj << "close"; bool ned = connected_; connected_ = connecting_ = false; - server_thread_.stop(); - PRIVATE->event.interrupt(); - if (server_thread_.isRunning()) { - if (!server_thread_.waitForFinish(1_s)) server_thread_.terminate(); - } - PRIVATE->event.destroy(); - if (sock_s == sock) sock_s = -1; - closeSocket(sock); - closeSocket(sock_s); + stopThreadedListen(); clients_mutex.lock(); - piDeleteAllAndClear(clients_); + auto cl = clients_; + clients_.clear(); clients_mutex.unlock(); + piDeleteAll(cl); if (ned) { // piCoutObj << "Disconnect on close"; disconnected(false); @@ -528,16 +522,32 @@ bool PIEthernet::listen(const PINetworkAddress & addr, bool threaded) { return listen(threaded); } + +void PIEthernet::stopThreadedListen() { + server_thread_.stop(); + PRIVATE->event.interrupt(); + if (server_thread_.isRunning()) { + if (!server_thread_.waitForFinish(1_s)) server_thread_.terminate(); + } + PRIVATE->event.destroy(); + if (sock_s == sock) sock_s = -1; + closeSocket(sock); + closeSocket(sock_s); +} + + PIEthernet * PIEthernet::client(int index) { PIMutexLocker locker(clients_mutex); return clients_[index]; } + int PIEthernet::clientsCount() const { PIMutexLocker locker(clients_mutex); return clients_.size_s(); } + PIVector PIEthernet::clients() const { PIMutexLocker locker(clients_mutex); return clients_; diff --git a/libs/main/io_devices/piethernet.h b/libs/main/io_devices/piethernet.h index a1c10c3f..e2a2e5e9 100644 --- a/libs/main/io_devices/piethernet.h +++ b/libs/main/io_devices/piethernet.h @@ -251,6 +251,8 @@ public: //! Start listen for incoming TCP connections on address "addr". Use only for TCP_Server bool listen(const PINetworkAddress & addr, bool threaded = false); + void stopThreadedListen(); + PIEthernet * client(int index); int clientsCount() const; PIVector clients() const; diff --git a/libs/main/io_devices/piiodevice.h b/libs/main/io_devices/piiodevice.h index db7225d7..9b4f92d8 100644 --- a/libs/main/io_devices/piiodevice.h +++ b/libs/main/io_devices/piiodevice.h @@ -243,6 +243,10 @@ public: //! \~russian Возвращает запущен ли поток чтения bool isThreadedRead() const; + //! \~english Returns if threaded read is stopping + //! \~russian Возвращает останавливается ли поток чтения + bool isThreadedReadStopping() const { return read_thread.isStopping(); } + //! \~english Start threaded read //! \~russian Запускает потоковое чтение void startThreadedRead(); @@ -565,8 +569,6 @@ protected: static PIIODevice * newDeviceByPrefix(const char * prefix); - bool isThreadedReadStopping() const { return read_thread.isStopping(); } - DeviceMode mode_ = ReadOnly; DeviceOptions options_; ReadRetFunc func_read = nullptr; diff --git a/main.cpp b/main.cpp index dd59b5b6..cdcb0bb0 100644 --- a/main.cpp +++ b/main.cpp @@ -1,6 +1,6 @@ #include "pibytearray.h" -#include "piclient_server_client.h" -#include "piclient_server_server.h" +#include "piclientserver_client.h" +#include "piclientserver_server.h" #include "picodeparser.h" #include "piiostream.h" #include "pijson.h" @@ -21,10 +21,32 @@ enum MyEvent { PIKbdListener kbd; +class MyServerClient: public PIClientServer::ServerClient { +protected: + void readed(PIByteArray data) override { piCout << "readed" << (data.size()); } + void aboutDelete() override { piCout << "aboutDelete"; } + void disconnected() override { piCout << "disconnected"; } + void connected() override { + piCout << "connected"; + send_thread.start( + [this] { + // write((PIString::fromNumber(++counter)).toUTF8()); + PIByteArray ba(64_KiB); + write(ba); + }, + 2_Hz); + } + PIThread send_thread; + int counter = 0; +}; + + class MyClient: public PIClientServer::Client { protected: void readed(PIByteArray data) override { piCout << "readed" << (data.size()); } + void disconnected() override { piCout << "disconnected"; } void connected() override { + piCout << "connected"; send_thread.start( [this] { // write((PIString::fromNumber(++counter)).toUTF8()); @@ -43,26 +65,43 @@ int main(int argc, char * argv[]) { PIClientServer::Server * s = nullptr; - PIClientServer::Client * c = nullptr; + PIThread s_thread; + PIVector cv; if (argc > 1) { piCout << "Server"; s = new PIClientServer::Server(); - s->setClientFactory([] { return new MyClient(); }); + s->setClientFactory([] { return new MyServerClient(); }); s->enableSymmetricEncryption("1122334455667788"_hex); s->listenAll(12345); + s_thread.start( + [s] { + piCout << "*** clients" << s->clientsCount(); + int i = 0; + s->forEachClient([&i](PIClientServer::ServerClient * c) { + piCout << "client" << ++i << c; + c->write(PIByteArray(16_KiB)); + piMSleep(200); + }); + }, + 0.5_Hz); } else { piCout << "Client"; - c = new MyClient(); - c->createNew(); - c->enableSymmetricEncryption("1122334455667788"_hex); - c->connect(PINetworkAddress::resolve("127.0.0.1", 12345)); + piForTimes(5) { + piMSleep(50); + auto c = new MyClient(); + c->enableSymmetricEncryption("1122334455667788"_hex); + c->connect(PINetworkAddress::resolve("127.0.0.1", 12345)); + cv << c; + } } WAIT_FOR_EXIT; + s_thread.stopAndWait(); + piDeleteSafety(s); - piDeleteSafety(c); + piDeleteAllAndClear(cv); return 0; /*PIPackedTCP * tcp_s =