From 897f03f3d06bd21e54ef85d8852fef6bf091d98a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=91=D1=8B=D1=87=D0=BA=D0=BE=D0=B2=20=D0=90=D0=BD=D0=B4?= =?UTF-8?q?=D1=80=D0=B5=D0=B9?= Date: Tue, 8 Nov 2022 14:43:52 +0300 Subject: [PATCH] some fixes for picloud, but still not working correctly --- CMakeLists.txt | 2 + libs/cloud/picloudclient.cpp | 35 ++++++++--------- libs/cloud/picloudserver.cpp | 42 +++++++++++---------- libs/main/io_devices/piiodevice.cpp | 4 -- libs/main/io_devices/piiodevice.h | 2 - libs/main/io_devices/piserial.cpp | 2 +- main_picloud_test.cpp | 6 ++- utils/cloud_dispatcher/dispatcherclient.cpp | 16 ++++---- utils/cloud_dispatcher/dispatcherclient.h | 1 - utils/cloud_dispatcher/dispatcherserver.cpp | 5 ++- 10 files changed, 60 insertions(+), 55 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 697dcaf5..f6e9daec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -492,6 +492,8 @@ if (NOT CROSSTOOLS) #target_link_libraries(pip_plugin pip) add_executable(pip_test "main.cpp") target_link_libraries(pip_test pip) + add_executable(pip_cloud_test "main_picloud_test.cpp") + target_link_libraries(pip_cloud_test pip_cloud) endif() else() diff --git a/libs/cloud/picloudclient.cpp b/libs/cloud/picloudclient.cpp index 4a88b807..a1e4cf6b 100644 --- a/libs/cloud/picloudclient.cpp +++ b/libs/cloud/picloudclient.cpp @@ -33,22 +33,22 @@ PICloudClient::PICloudClient(const PIString & path, PIIODevice::DeviceMode mode) if (is_deleted) return; bool need_disconn = is_connected; //piCoutObj << "eth disconnected"; - eth.softStopThreadedRead(); + eth.stop(); opened_ = false; internalDisconnect(); - if (need_disconn) - disconnected(); + if (need_disconn) disconnected(); //piCoutObj << "eth disconnected done"; }); } PICloudClient::~PICloudClient() { - //piCoutObj << "~PICloudClient()"; + piCoutObj << "~PICloudClient() ..." << this; + is_deleted = true; stopAndWait(); close(); - is_deleted = true; internalDisconnect(); + piCoutObj << "~PICloudClient() done" << this; } @@ -70,14 +70,14 @@ void PICloudClient::interrupt() { bool PICloudClient::openDevice() { - //piCoutObj << "open";// << path(); + piCoutObj << "open";// << path(); bool op = eth.connect(PIEthernet::Address::resolve(path()), false); if (op) { mutex_connect.lock(); eth.startThreadedRead(); - //piCoutObj << "connecting..."; + piCoutObj << "connecting..."; bool conn_ok = cond_connect.waitFor(mutex_connect, (int)eth.readTimeout()); - //piCoutObj << "conn_ok" << conn_ok << is_connected; + piCoutObj << "conn_ok" << conn_ok << is_connected; mutex_connect.unlock(); if (!conn_ok) { mutex_connect.lock(); @@ -105,36 +105,37 @@ bool PICloudClient::closeDevice() { ssize_t PICloudClient::readDevice(void * read_to, ssize_t max_size) { - if (is_deleted) return -1; - //piCoutObj << "readDevice"; + if (is_deleted || max_size <= 0) return -1; + piCoutObj << "readDevice ..."; if (!is_connected && eth.isClosed()) openDevice(); ssize_t sz = -1; mutex_buff.lock(); - cond_buff.wait(mutex_buff); if (is_connected) { if (buff.isEmpty()) { sz = 0; } else { - sz = piMini(max_size, buff.size()); + sz = piMin(max_size, buff.size_s()); memcpy(read_to, buff.data(), sz); buff.remove(0, sz); } + if (sz == 0) cond_buff.wait(mutex_buff); } mutex_buff.unlock(); if (!is_connected) opened_ = false; - //piCoutObj << "readDevice done" << sz; + piCoutObj << "readDevice done" << sz; return sz; } ssize_t PICloudClient::writeDevice(const void * data, ssize_t size) { if (is_deleted) return -1; -// piCoutObj << "writeDevice"; + piCoutObj << "writeDevice" << size; return tcp.sendData(PIByteArray(data, size)); } void PICloudClient::internalDisconnect() { + piCoutObj << "internalDisconnect"; is_connected = false; cond_buff.notifyOne(); cond_connect.notifyOne(); @@ -146,7 +147,7 @@ void PICloudClient::internalDisconnect() { void PICloudClient::_readed(PIByteArray & ba) { if (is_deleted) return; PIPair hdr = tcp.parseHeader(ba); - //piCoutObj << "_readed" << ba.size() << hdr.first << hdr.second; + piCoutObj << "_readed" << ba.size() << hdr.first << hdr.second; if (hdr.second == tcp.role()) { switch (hdr.first) { case PICloud::TCP::Connect: @@ -159,7 +160,7 @@ void PICloudClient::_readed(PIByteArray & ba) { } break; case PICloud::TCP::Disconnect: - eth.softStopThreadedRead(); + eth.stop(); opened_ = false; eth.close(); break; @@ -177,5 +178,5 @@ void PICloudClient::_readed(PIByteArray & ba) { //piCoutObj << "readed" << ba.toHex(); } while (buff.size_s() > threadedReadBufferSize()) piMSleep(100); // FIXME: sleep here is bad - //piCoutObj << "_readed done"; + piCoutObj << "_readed done"; } diff --git a/libs/cloud/picloudserver.cpp b/libs/cloud/picloudserver.cpp index 98704297..2e0949eb 100644 --- a/libs/cloud/picloudserver.cpp +++ b/libs/cloud/picloudserver.cpp @@ -26,11 +26,10 @@ PICloudServer::PICloudServer(const PIString & path, PIIODevice::DeviceMode mode) tcp.setServerName(server_name); setName("cloud_server__" + server_name); CONNECT1(void, PIByteArray, &streampacker, packetReceiveEvent, this, _readed); - CONNECTL(ð, connected, [this](){opened_ = true; piCoutObj << "connected"; tcp.sendStart();}); + CONNECTL(ð, connected, [this](){opened_ = true; piCoutObj << "connected" << ð tcp.sendStart();}); CONNECTL(ð, disconnected, [this](bool){ - piCoutObj << "disconnected"; - eth.softStopThreadedRead(); - eth.interrupt(); + piCoutObj << "disconnected" << ð + eth.stop(); opened_ = false; ping_timer.stop(false); piMSleep(100); @@ -42,8 +41,10 @@ PICloudServer::PICloudServer(const PIString & path, PIIODevice::DeviceMode mode) PICloudServer::~PICloudServer() { + piCoutObj << "~PICloudServer ..." << this; stopAndWait(); close(); + piCoutObj << "~PICloudServer done" << this; } @@ -60,7 +61,7 @@ PIVector PICloudServer::clients() const { bool PICloudServer::openDevice() { - //piCout << "PICloudServer open device" << path(); + piCout << "PICloudServer open device" << path(); bool op = eth.connect(PIEthernet::Address::resolve(path()), false); if (op) { eth.startThreadedRead(); @@ -74,17 +75,19 @@ bool PICloudServer::openDevice() { bool PICloudServer::closeDevice() { - eth.stop(); + piCoutObj << "closeDevice" << this; + eth.stopAndWait(); ping_timer.stop(false); clients_mutex.lock(); for (auto c : clients_) { + c->stopAndWait(); c->close(); - c->stop(); } clients_mutex.unlock(); eth.close(); - for (auto c : clients_) + for (auto c : clients_) { delete c; + } return true; } @@ -126,12 +129,10 @@ PICloudServer::Client::Client(PICloudServer * srv, uint id) : server(srv), clien PICloudServer::Client::~Client() { - if (is_connected) { - is_connected = false; - cond_buff.notifyOne(); - } - stopAndWait(); + piCoutObj << "~PICloudServer::Client..." << this; close(); + stopAndWait(); + piCoutObj << "~PICloudServer::Client done" << this; } @@ -141,6 +142,7 @@ bool PICloudServer::Client::openDevice() { bool PICloudServer::Client::closeDevice() { + piCoutObj << "closeDevice" << this; if (is_connected) { server->clientDisconnect(client_id); is_connected = false; @@ -154,7 +156,6 @@ ssize_t PICloudServer::Client::readDevice(void * read_to, ssize_t max_size) { if (!is_connected) return -1; ssize_t sz = -1; mutex_buff.lock(); - cond_buff.wait(mutex_buff); if (is_connected) { if (buff.isEmpty()) { sz = 0; @@ -163,6 +164,7 @@ ssize_t PICloudServer::Client::readDevice(void * read_to, ssize_t max_size) { memcpy(read_to, buff.data(), sz); buff.remove(0, sz); } + if (sz == 0) cond_buff.wait(mutex_buff); } mutex_buff.unlock(); return sz; @@ -201,8 +203,8 @@ void PICloudServer::_readed(PIByteArray & ba) { if (oc) { tcp.sendDisconnected(id); } else { - //piCoutObj << "new Client" << id; Client * c = new Client(this, id); + piCoutObj << "new Client" << id << c; CONNECT1(void, PIObject *, c, deleted, this, clientDeleted); clients_mutex.lock(); clients_ << c; @@ -213,13 +215,14 @@ void PICloudServer::_readed(PIByteArray & ba) { } break; case PICloud::TCP::Disconnect: { uint id = tcp.parseDisconnect(ba); - //piCoutObj << "remove Client" << id; + piCoutObj << "remove Client" << id; clients_mutex.lock(); Client * oc = index_clients.value(id, nullptr); clients_mutex.unlock(); if (oc) { - oc->is_connected = false; - oc->close(); + //oc->is_connected = false; + //oc->close(); + delete oc; } } break; case PICloud::TCP::Data: { @@ -227,7 +230,7 @@ void PICloudServer::_readed(PIByteArray & ba) { clients_mutex.lock(); Client * oc = index_clients.value(d.first, nullptr); clients_mutex.unlock(); - //piCoutObj << "data for" << d.first << d.second.toHex(); + piCoutObj << "data for" << d.first << d.second.size(); if (oc && !d.second.isEmpty()) oc->pushBuffer(d.second); } break; default: break; @@ -238,6 +241,7 @@ void PICloudServer::_readed(PIByteArray & ba) { void PICloudServer::clientDeleted(PIObject * o) { PICloudServer::Client * c = (PICloudServer::Client*)o; + piCoutObj << "clientDeleted" << c; clients_mutex.lock(); clients_.removeOne(c); auto it = index_clients.makeIterator(); diff --git a/libs/main/io_devices/piiodevice.cpp b/libs/main/io_devices/piiodevice.cpp index bbcfc092..4cc8b183 100644 --- a/libs/main/io_devices/piiodevice.cpp +++ b/libs/main/io_devices/piiodevice.cpp @@ -637,7 +637,3 @@ void PIIODevice::configureFromVariantDevice(const PIPropertyStorage & d) { } -void PIIODevice::softStopThreadedRead() { - read_thread.stop(); -} - diff --git a/libs/main/io_devices/piiodevice.h b/libs/main/io_devices/piiodevice.h index 2ce101ef..d75b69c4 100644 --- a/libs/main/io_devices/piiodevice.h +++ b/libs/main/io_devices/piiodevice.h @@ -390,8 +390,6 @@ public: //! \~russian Пишет в устройство блок памяти "mb" ssize_t write(const PIMemoryBlock & mb) {return write(mb.data(), mb.size());} - void softStopThreadedRead(); - EVENT_VHANDLER(void, flush) {;} EVENT(opened); diff --git a/libs/main/io_devices/piserial.cpp b/libs/main/io_devices/piserial.cpp index b7590f7e..79829a87 100644 --- a/libs/main/io_devices/piserial.cpp +++ b/libs/main/io_devices/piserial.cpp @@ -827,7 +827,7 @@ ssize_t PISerial::readDevice(void * read_to, ssize_t max_size) { return 0; if (err == ERROR_BAD_COMMAND || err == ERROR_ACCESS_DENIED) { piCoutObj << "read error" << (PRIVATE->readed) << errorString(); - softStopThreadedRead(); + stop(); close(); return 0; } diff --git a/main_picloud_test.cpp b/main_picloud_test.cpp index 5afd928b..dbaea6ba 100644 --- a/main_picloud_test.cpp +++ b/main_picloud_test.cpp @@ -18,7 +18,7 @@ int main(int argc, char * argv[]) { piCout << "[Client] send:" << str; c.write(str.toByteArray()); } - if (s.isRunning()) { + if (s.isThreadedRead()) { for (auto cl : clients) { if (cl->isOpened()) { PIString str = "ping_S"; @@ -51,9 +51,11 @@ int main(int argc, char * argv[]) { } })); CONNECTL(cl, closed, ([&clients, cl](){ + piCout << "[Server] client closed ..." << cl; cl->stop(); clients.removeAll(cl); - cl->deleteLater(); + piCout << "[Server] client closed ok" << cl; + //cl->deleteLater(); })); cl->startThreadedRead(); })); diff --git a/utils/cloud_dispatcher/dispatcherclient.cpp b/utils/cloud_dispatcher/dispatcherclient.cpp index b370ebb7..df6cd480 100644 --- a/utils/cloud_dispatcher/dispatcherclient.cpp +++ b/utils/cloud_dispatcher/dispatcherclient.cpp @@ -3,7 +3,6 @@ DispatcherClient::DispatcherClient(PIEthernet * eth_, int id) : authorised(false), eth(eth_), streampacker(eth_), tcp(&streampacker), client_id(id) { - CONNECTU(&disconnect_tm, tickEvent, eth, close); CONNECTU(&streampacker, packetReceiveEvent, this, readed); CONNECTU(eth, disconnected, this, disconnected); piCoutObj << "client connected" << client_id << eth->sendAddress(); @@ -12,7 +11,6 @@ DispatcherClient::DispatcherClient(PIEthernet * eth_, int id) : authorised(false void DispatcherClient::start() { eth->startThreadedRead(); - //disconnect_tm.start(10000); } @@ -27,18 +25,18 @@ PIString DispatcherClient::address() { void DispatcherClient::close() { - eth->softStopThreadedRead(); - eth->interrupt(); + eth->stopAndWait(); } void DispatcherClient::sendConnected(uint client_id) { - //piCoutObj << "sendConnected"; + piCoutObj << "sendConnected" << client_id; tcp.sendConnected(client_id); } void DispatcherClient::sendDisconnected(uint client_id) { + piCoutObj << "sendDisconnected" << client_id; tcp.sendDisconnected(client_id); } @@ -61,7 +59,7 @@ void DispatcherClient::authorise(bool ok) { void DispatcherClient::disconnected(bool withError) { - //piCoutObj << "client disconnected" << eth->sendAddress(); + piCoutObj << "client disconnected" << withError << eth->sendAddress(); disconnectEvent(this); } @@ -70,16 +68,18 @@ void DispatcherClient::readed(PIByteArray & ba) { PIPair hdr = tcp.parseHeader(ba); // piCoutObj << "readed" << hdr.first << hdr.second; if (hdr.first == PICloud::TCP::InvalidType) { - disconnected(true); piCoutObj << "invalid message"; + disconnected(true); return; } if (authorised) { if (hdr.second == tcp.role()) { switch (hdr.first) { case PICloud::TCP::Connect: + piCoutObj << "PICloud::TCP::Connect"; return; case PICloud::TCP::Disconnect: + piCoutObj << "PICloud::TCP::Disconnect"; disconnected(false); return; case PICloud::TCP::Data: @@ -113,9 +113,11 @@ void DispatcherClient::readed(PIByteArray & ba) { return; } case PICloud::TCP::Disconnect: + piCoutObj << "unauthorised PICloud::TCP::Disconnect"; disconnected(false); return; default: + piCoutObj << "authorised invalid message"; disconnected(true); return; } diff --git a/utils/cloud_dispatcher/dispatcherclient.h b/utils/cloud_dispatcher/dispatcherclient.h index 92497e4d..9284a7eb 100644 --- a/utils/cloud_dispatcher/dispatcherclient.h +++ b/utils/cloud_dispatcher/dispatcherclient.h @@ -33,7 +33,6 @@ private: EVENT_HANDLER1(void, readed, PIByteArray &, data); EVENT_HANDLER1(void, disconnected, bool, withError); - PITimer disconnect_tm; std::atomic_bool authorised; PIEthernet * eth; PIStreamPacker streampacker; diff --git a/utils/cloud_dispatcher/dispatcherserver.cpp b/utils/cloud_dispatcher/dispatcherserver.cpp index 29363905..e976fdd2 100644 --- a/utils/cloud_dispatcher/dispatcherserver.cpp +++ b/utils/cloud_dispatcher/dispatcherserver.cpp @@ -181,7 +181,7 @@ void DispatcherServer::disconnectClient(DispatcherClient *client) { //piCoutObj << "INVALID client" << client; return; } - piCoutObj << "remove" << client->clientId(); + piCoutObj << "remove ..." << client->clientId(); map_mutex.lock(); clients.removeAll(client); rm_clients.removeAll(client); @@ -206,9 +206,10 @@ void DispatcherServer::disconnectClient(DispatcherClient *client) { cc->removeClient(client); index_c_clients.remove(client); } - client->close(); + //client->close(); rmrf_clients << client; map_mutex.unlock(); + piCoutObj << "remove done" << client->clientId(); }