From cb59017ebb41aaec1276a1e1c34b578e3cf55080 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=91=D1=8B=D1=87=D0=BA=D0=BE=D0=B2=20=D0=90=D0=BD=D0=B4?= =?UTF-8?q?=D1=80=D0=B5=D0=B9?= Date: Fri, 11 Nov 2022 16:18:05 +0300 Subject: [PATCH] PICloud many important fixes --- libs/cloud/picloudclient.cpp | 2 +- libs/cloud/picloudserver.cpp | 32 +++++++++++++++------ libs/main/cloud/picloudserver.h | 2 ++ libs/main/io_devices/piethernet.cpp | 6 ++-- libs/main/io_devices/piiodevice.cpp | 1 + libs/main/io_devices/piiodevice.h | 2 +- utils/cloud_dispatcher/cloudserver.cpp | 17 +++++++---- utils/cloud_dispatcher/cloudserver.h | 1 + utils/cloud_dispatcher/dispatcherserver.cpp | 13 ++++++++- 9 files changed, 57 insertions(+), 19 deletions(-) diff --git a/libs/cloud/picloudclient.cpp b/libs/cloud/picloudclient.cpp index b259e025..9a3adc1c 100644 --- a/libs/cloud/picloudclient.cpp +++ b/libs/cloud/picloudclient.cpp @@ -23,6 +23,7 @@ PICloudClient::PICloudClient(const PIString & path, PIIODevice::DeviceMode mode) : PIIODevice(path, mode), PICloudBase() { tcp.setRole(PICloud::TCP::Client); + setThreadedReadBufferSize(eth.threadedReadBufferSize()); setName("cloud_client"); is_connected = false; is_deleted = false; @@ -182,6 +183,5 @@ void PICloudClient::_readed(PIByteArray & ba) { } //piCoutObj << "readed" << ba.toHex(); } - if (buff.size_s() > threadedReadBufferSize()) piMSleep(100); // FIXME: sleep here is bad //piCoutObj << "_readed done"; } diff --git a/libs/cloud/picloudserver.cpp b/libs/cloud/picloudserver.cpp index c6d460b2..7aa12f76 100644 --- a/libs/cloud/picloudserver.cpp +++ b/libs/cloud/picloudserver.cpp @@ -27,27 +27,33 @@ PICloudServer::PICloudServer(const PIString & path, PIIODevice::DeviceMode mode) setName("cloud_server__" + server_name); is_deleted = false; eth.setReopenEnabled(false); + setThreadedReadBufferSize(eth.threadedReadBufferSize()); CONNECT1(void, PIByteArray, &streampacker, packetReceiveEvent, this, _readed); CONNECTL(ð, connected, [this](){ + open_mutex.lock(); opened_ = true; - //piCoutObj << "connected" << ð + cvar.notifyOne(); + open_mutex.unlock(); + //piCoutObj << "connected"; tcp.sendStart(); }); CONNECTL(ð, disconnected, [this](bool){ if (is_deleted) return; - //piCoutObj << "disconnected" << ð + //piCoutObj << "disconnected"; clients_mutex.lock(); - removed_clients_.append(clients_); for (auto c : clients_) { c->is_connected = false; c->close(); } + removed_clients_.append(clients_); clients_.clear(); index_clients.clear(); clients_mutex.unlock(); + open_mutex.lock(); opened_ = false; + cvar.notifyOne(); + open_mutex.unlock(); ping_timer.stop(); - piMSleep(100); }); CONNECTL(&ping_timer, tickEvent, [this] (void *, int){ if (eth.isConnected()) tcp.sendPing(); @@ -62,10 +68,10 @@ PICloudServer::~PICloudServer() { close(); waitThreadedReadFinished(); //piCout << "wait"; - for (auto c : removed_clients_) { + while(removed_clients_.isNotEmpty()) { + Client * c = removed_clients_.take_back(); delete c; } - removed_clients_.clear(); //piCoutObj << "~PICloudServer done" << this; } @@ -83,7 +89,7 @@ PIVector PICloudServer::clients() const { bool PICloudServer::openDevice() { - //piCout << "PICloudServer open device" << path(); + piCout << "PICloudServer open device" << path(); bool op = eth.connect(PIEthernet::Address::resolve(path()), false); if (op) { eth.startThreadedRead(); @@ -102,7 +108,12 @@ bool PICloudServer::closeDevice() { eth.stopAndWait(); ping_timer.stop(); eth.close(); + cvar.notifyOne(); clients_mutex.lock(); + for (auto c : clients_) { + c->is_connected = false; + c->close(); + } removed_clients_.append(clients_); clients_.clear(); index_clients.clear(); @@ -114,7 +125,10 @@ bool PICloudServer::closeDevice() { ssize_t PICloudServer::readDevice(void * read_to, ssize_t max_size) { if (is_deleted) return -1; //piCoutObj << "readDevice"; - if (!opened_) openDevice(); + open_mutex.lock(); + if (isOpened()) cvar.wait(open_mutex); + open_mutex.unlock(); + //piCoutObj << "opened_ = " << opened_; //else piMSleep(eth.readTimeout()); return -1; } @@ -128,6 +142,7 @@ ssize_t PICloudServer::writeDevice(const void * data, ssize_t max_size) { void PICloudServer::interrupt() { eth.interrupt(); + cvar.notifyOne(); } @@ -145,6 +160,7 @@ int PICloudServer::sendData(const PIByteArray & data, uint client_id) { PICloudServer::Client::Client(PICloudServer * srv, uint id) : server(srv), client_id(id) { setMode(PIIODevice::ReadWrite); setReopenEnabled(false); + setThreadedReadBufferSize(server->threadedReadBufferSize()); is_connected = true; } diff --git a/libs/main/cloud/picloudserver.h b/libs/main/cloud/picloudserver.h index 23062329..3a637b35 100644 --- a/libs/main/cloud/picloudserver.h +++ b/libs/main/cloud/picloudserver.h @@ -86,6 +86,8 @@ private: PIVector removed_clients_; PIMap index_clients; PITimer ping_timer; + PIConditionVariable cvar; + PIMutex open_mutex; mutable PIMutex clients_mutex; std::atomic_bool is_deleted; }; diff --git a/libs/main/io_devices/piethernet.cpp b/libs/main/io_devices/piethernet.cpp index f38b4efc..2ae8f8fd 100644 --- a/libs/main/io_devices/piethernet.cpp +++ b/libs/main/io_devices/piethernet.cpp @@ -588,7 +588,7 @@ bool PIEthernet::connect(bool threaded) { if (!connected_) { piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString(); } - opened_ = connected_; + opened_.exchange(connected_); if (connected_) { connected(); } @@ -720,7 +720,7 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) { piCoutObj << "connect to " << path() << connected_; if (!connected_) piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString(); - opened_ = connected_; + opened_.exchange(connected_); if (connected_) { connecting_ = false; connected(); @@ -847,7 +847,7 @@ ssize_t PIEthernet::writeDevice(const void * data, ssize_t max_size) { connected_ = connectTCP(); if (!connected_) piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString(); - opened_ = connected_; + opened_.exchange(connected_); if (connected_) { connecting_ = false; connected(); diff --git a/libs/main/io_devices/piiodevice.cpp b/libs/main/io_devices/piiodevice.cpp index 4cc8b183..e91e55f7 100644 --- a/libs/main/io_devices/piiodevice.cpp +++ b/libs/main/io_devices/piiodevice.cpp @@ -304,6 +304,7 @@ ssize_t PIIODevice::write(const void * data, ssize_t max_size) { void PIIODevice::_init() { + opened_ = false; setOptions(0); setReopenEnabled(true); setReopenTimeout(1000); diff --git a/libs/main/io_devices/piiodevice.h b/libs/main/io_devices/piiodevice.h index d75b69c4..1703f3cb 100644 --- a/libs/main/io_devices/piiodevice.h +++ b/libs/main/io_devices/piiodevice.h @@ -540,7 +540,7 @@ protected: DeviceMode mode_; DeviceOptions options_; ReadRetFunc func_read = nullptr; - bool opened_ = false; + std::atomic_bool opened_; void * ret_data_ = nullptr; private: diff --git a/utils/cloud_dispatcher/cloudserver.cpp b/utils/cloud_dispatcher/cloudserver.cpp index eed4f2d6..74d6836f 100644 --- a/utils/cloud_dispatcher/cloudserver.cpp +++ b/utils/cloud_dispatcher/cloudserver.cpp @@ -5,7 +5,9 @@ CloudServer::CloudServer(DispatcherClient * c, const PIByteArray & sname) : serv server_uuid = sname; CONNECTL(c, dataReadedServer, ([this](uint id, PIByteArray & ba){ last_ping.reset(); + mutex_clients.lock(); DispatcherClient * cl = index_clients.value(id, nullptr); + mutex_clients.unlock(); if (cl) cl->sendData(ba); })); CONNECTL(c, pingReceived, [this]() {last_ping.reset();}); @@ -27,9 +29,11 @@ PIByteArray CloudServer::serverUUID() const { void CloudServer::addClient(DispatcherClient * c) { last_ping.reset(); + mutex_clients.lock(); clients << c; uint cid = c->clientId(); index_clients.insert(cid, c); + mutex_clients.unlock(); c->sendConnected(1); server->sendConnected(cid); CONNECTL(c, dataReaded, ([this, cid](PIByteArray & ba){ @@ -41,14 +45,19 @@ void CloudServer::addClient(DispatcherClient * c) { void CloudServer::removeClient(DispatcherClient * c) { last_ping.reset(); + mutex_clients.lock(); clients.removeOne(c); index_clients.remove(c->clientId()); + mutex_clients.unlock(); server->sendDisconnected(c->clientId()); } PIVector CloudServer::getClients() { - return clients; + mutex_clients.lock(); + PIVector cl = clients; + mutex_clients.unlock(); + return cl; } @@ -58,14 +67,12 @@ double CloudServer::lastPing() { void CloudServer::printStatus() { + mutex_clients.lock(); piCout << " " << "Clients for" << server->address() << server_uuid.toHex() << ":"; for (auto c: clients) { piCout << " " << c->address() << c->clientId(); } -// for (auto c: clients) { -// c->sendData(PIByteArray::fromHex("000000")); -// server->sendDataToClient(PIByteArray::fromHex("000000"), c->clientId()); -// } + mutex_clients.unlock(); } diff --git a/utils/cloud_dispatcher/cloudserver.h b/utils/cloud_dispatcher/cloudserver.h index bd6fd5dd..7517fd49 100644 --- a/utils/cloud_dispatcher/cloudserver.h +++ b/utils/cloud_dispatcher/cloudserver.h @@ -23,6 +23,7 @@ private: PIMap index_clients; PIByteArray server_uuid; PITimeMeasurer last_ping; + PIMutex mutex_clients; }; #endif // CLOUDSERVER_H diff --git a/utils/cloud_dispatcher/dispatcherserver.cpp b/utils/cloud_dispatcher/dispatcherserver.cpp index 9a9b84bb..51ec1ae7 100644 --- a/utils/cloud_dispatcher/dispatcherserver.cpp +++ b/utils/cloud_dispatcher/dispatcherserver.cpp @@ -58,7 +58,18 @@ void DispatcherServer::cleanClients() { for (auto c: ss) { if (c->lastPing() > 15.0) { piCout << "remove Server by ping timeout" << c->getConnection()->clientId(); + PIVector cscv = c->getClients(); + for(auto csc : cscv) { + clients.removeAll(csc); + index_c_clients.remove(csc); + c->removeClient(csc); + csc->close(); + rmrf_clients << csc; + } + c_servers.remove(c->serverUUID()); + index_c_servers.remove(c->getConnection()); rmrf_clients << const_cast(c->getConnection()); + delete c; } } for (auto c: rm_clients) { @@ -250,8 +261,8 @@ void DispatcherServer::newConnection(PIEthernet *cl) { map_mutex.unlock(); }); //piCoutObj << "add client" << client; - client->start(); map_mutex.lock(); clients.push_back(client); map_mutex.unlock(); + client->start(); }