From bf63b0e9f30e3af2c072a409cf4c32919cac12b0 Mon Sep 17 00:00:00 2001 From: andrey Date: Wed, 7 Apr 2021 18:03:28 +0300 Subject: [PATCH] PIClout data send and receive test --- libs/cloud/picloudserver.cpp | 7 ++- main.cpp | 36 +++++++++++++++ utils/cloud_dispatcher/cloudserver.cpp | 22 +++++++-- utils/cloud_dispatcher/dispatcherclient.cpp | 51 ++++++++++++++------- utils/cloud_dispatcher/dispatcherclient.h | 8 ++-- utils/cloud_dispatcher/dispatcherserver.cpp | 27 +++++++---- 6 files changed, 115 insertions(+), 36 deletions(-) diff --git a/libs/cloud/picloudserver.cpp b/libs/cloud/picloudserver.cpp index 22253ebe..e28d4e5f 100644 --- a/libs/cloud/picloudserver.cpp +++ b/libs/cloud/picloudserver.cpp @@ -113,8 +113,8 @@ bool PICloudServer::Client::openDevice() { bool PICloudServer::Client::closeDevice() { - server->clientDisconnect(client_id); if (is_connected) { + server->clientDisconnect(client_id); is_connected = false; cond_buff.notifyOne(); } @@ -170,7 +170,10 @@ void PICloudServer::_readed(PIByteArray & ba) { case PICloud::TCP::Disconnect: { uint id = tcp.parseDisconnect(ba); Client * oc = index_clients.value(id, nullptr); - if (oc) oc->close(); + if (oc) { + oc->is_connected = false; + oc->close(); + } } break; case PICloud::TCP::Data: { PIPair d = tcp.parseDataServer(ba); diff --git a/main.cpp b/main.cpp index 92b40f6b..b5349592 100644 --- a/main.cpp +++ b/main.cpp @@ -3,11 +3,46 @@ int main(int argc, char * argv[]) { PICLI cli(argc, argv); + PITimer tm; cli.addArgument("connect", true); cli.addArgument("name", true); PICloudClient c("127.0.0.1:10101"); // c.setReopenEnabled(true); PICloudServer s("127.0.0.1:10101"); + PIVector clients; + CONNECTL(&tm, tickEvent, ([&](void *, int){ + if (c.isConnected()) { + PIString str = "ping"; + piCout << "[Client] send:" << str; + c.write(str.toByteArray()); + } + if (s.isRunning()) { + for (auto cl : clients) { + if (cl->isOpened()) { + PIString str = "ping_S"; + piCout << "[Server] send to" << cl << ":" << str; + cl->write(str.toByteArray()); + } + } + } + })); + CONNECTL(&c, threadedReadEvent, ([&](uchar * readed, int size){ + PIByteArray ba(readed, size); + PIString str = PIString(ba); + piCout << "[Client] data:" << str; + if (str == "ping_S") c.write(PIString("pong_S").toByteArray()); + })); + CONNECTL(&s, newConnection, ([&](PICloudServer::Client * cl){ + piCout << "[Server] new client:" << cl; + clients << cl; + CONNECTL(cl, threadedReadEvent, ([&c, &s, cl](uchar * readed, int size){ + PIByteArray ba(readed, size); + PIString str = PIString(ba); + piCout << "[Server] data from" << cl << ":" << str; + if (str == "ping") cl->write(PIString("pong").toByteArray()); + })); + cl->startThreadedRead(); + })); if (cli.hasArgument("name")) s.setServerName(cli.argumentValue("name")); if (cli.hasArgument("connect")) { c.setServerName(cli.argumentValue("connect")); @@ -15,6 +50,7 @@ int main(int argc, char * argv[]) { } else { s.startThreadedRead(); } + tm.start(1000); PIKbdListener ls; ls.enableExitCapture(PIKbdListener::F10); ls.start(); diff --git a/utils/cloud_dispatcher/cloudserver.cpp b/utils/cloud_dispatcher/cloudserver.cpp index 386c2b54..8080b1da 100644 --- a/utils/cloud_dispatcher/cloudserver.cpp +++ b/utils/cloud_dispatcher/cloudserver.cpp @@ -3,6 +3,10 @@ CloudServer::CloudServer(DispatcherClient * c, const PIString & sname) : server(c) { setName(sname); server_name = sname; + CONNECTL(c, dataReadedServer, ([this](uint id, PIByteArray & ba){ + DispatcherClient * cl = index_clients.value(id, nullptr); + if (cl) cl->sendData(ba); + })); } @@ -21,7 +25,14 @@ PIString CloudServer::serverName() const { void CloudServer::addClient(DispatcherClient * c) { clients << c; index_clients.insert(c->clientId(), c); - c->sendConnected(); + c->sendConnected(1); + server->sendConnected(c->clientId()); + CONNECTL(c, dataReaded, ([this, c](PIByteArray & ba){ +// piCoutObj << c->clientId() << "dataReaded"; + if (clients.contains(c)) { + server->sendDataToClient(ba, c->clientId()); + } + })); } @@ -41,9 +52,10 @@ void CloudServer::printStatus() { for (auto c: clients) { piCout << " " << c->address() << c->clientId(); } - for (auto c: clients) { - c->sendData(PIByteArray::fromHex("000000")); - server->sendDataToClient(PIByteArray::fromHex("000000"), c->clientId()); - } +// for (auto c: clients) { +// c->sendData(PIByteArray::fromHex("000000")); +// server->sendDataToClient(PIByteArray::fromHex("000000"), c->clientId()); +// } } + diff --git a/utils/cloud_dispatcher/dispatcherclient.cpp b/utils/cloud_dispatcher/dispatcherclient.cpp index fca8acd6..0faf1fd1 100644 --- a/utils/cloud_dispatcher/dispatcherclient.cpp +++ b/utils/cloud_dispatcher/dispatcherclient.cpp @@ -30,9 +30,9 @@ void DispatcherClient::close() { } -void DispatcherClient::sendConnected() { - piCoutObj << "sendConnected"; - tcp.sendConnected(1); +void DispatcherClient::sendConnected(uint client_id) { + //piCoutObj << "sendConnected"; + tcp.sendConnected(client_id); } @@ -48,8 +48,13 @@ void DispatcherClient::sendDataToClient(const PIByteArray & data, uint client_id } +void DispatcherClient::authorise(bool ok) { + authorised = ok; +} + + void DispatcherClient::disconnected(bool withError) { - piCoutObj << "client disconnected" << eth->sendAddress(); + //piCoutObj << "client disconnected" << eth->sendAddress(); disconnectEvent(this); } @@ -62,18 +67,30 @@ void DispatcherClient::readed(PIByteArray & ba) { return; } if (authorised) { - switch (hdr.first) { - case PICloud::TCP::Connect: - return; - case PICloud::TCP::Disconnect: - disconnected(false); - return; - case PICloud::TCP::Data: - dataReaded(tcp.parseData(ba)); - return; - default: - disconnected(true); - return; + if (hdr.second == tcp.role()) { + switch (hdr.first) { + case PICloud::TCP::Connect: + return; + case PICloud::TCP::Disconnect: + disconnected(false); + return; + case PICloud::TCP::Data: +// piCoutObj << "TCP::Data"; + if (tcp.role() == PICloud::TCP::Client) { + PIByteArray data = tcp.parseData(ba); + if (!data.isEmpty()) dataReaded(data); + else piCoutObj << "invalid data from client"; + } + if (tcp.role() == PICloud::TCP::Server) { + PIPair dp = tcp.parseDataServer(ba); + if (!dp.second.isEmpty()) dataReadedServer(dp.first, dp.second); + else piCoutObj << "invalid data from server"; + } + return; + default: + //disconnected(true); + return; + } } } else { switch (hdr.first) { @@ -82,7 +99,7 @@ void DispatcherClient::readed(PIByteArray & ba) { PIString sn = tcp.parseConnect_d(ba); if (hdr.second == PICloud::TCP::Server) registerServer(sn, this); if (hdr.second == PICloud::TCP::Client) registerClient(sn, this); - return;} + return;} case PICloud::TCP::Disconnect: disconnected(false); return; diff --git a/utils/cloud_dispatcher/dispatcherclient.h b/utils/cloud_dispatcher/dispatcherclient.h index c4bc3534..1df69102 100644 --- a/utils/cloud_dispatcher/dispatcherclient.h +++ b/utils/cloud_dispatcher/dispatcherclient.h @@ -13,22 +13,24 @@ public: ~DispatcherClient(); void start(); void close(); - void sendConnected(); + void sendConnected(uint client_id); void sendData(const PIByteArray & data); void sendDataToClient(const PIByteArray & data, uint client_id); + void authorise(bool ok); PIString address(); uint clientId() const {return client_id;} EVENT1(disconnectEvent, DispatcherClient *, client) EVENT2(registerServer, PIString, sname, DispatcherClient *, client) EVENT2(registerClient, PIString, sname, DispatcherClient *, client) - EVENT1(dataReaded, PIByteArray, ba) + EVENT1(dataReaded, PIByteArray &, ba) + EVENT2(dataReadedServer, uint, id, PIByteArray &, ba) private: EVENT_HANDLER1(void, readed, PIByteArray &, data); EVENT_HANDLER1(void, disconnected, bool, withError); PITimer disconnect_tm; - bool authorised; + std::atomic_bool authorised; PIEthernet * eth; PIStreamPacker streampacker; PICloud::TCP tcp; diff --git a/utils/cloud_dispatcher/dispatcherserver.cpp b/utils/cloud_dispatcher/dispatcherserver.cpp index eaf2c4bc..f3cc127a 100644 --- a/utils/cloud_dispatcher/dispatcherserver.cpp +++ b/utils/cloud_dispatcher/dispatcherserver.cpp @@ -16,7 +16,7 @@ DispatcherServer::DispatcherServer(PIEthernet::Address addr) : eth(PIEthernet::T DispatcherServer::~DispatcherServer() { eth.close(); - piCoutObj << "server stoped"; + //piCoutObj << "server stoped"; } @@ -59,23 +59,25 @@ void DispatcherServer::printStatus() { void DispatcherServer::disconnectClient(DispatcherClient *client) { if (!clients.contains(client)) { - piCoutObj << "INVALID client" << client; + //piCoutObj << "INVALID client" << client; return; } - piCoutObj << "remove client" << client; + //piCoutObj << "remove client" << client; map_mutex.lock(); clients.removeOne(client); CloudServer * cs = index_c_servers.value(client, nullptr); if (cs) { PIVector cscv = cs->getClients(); for(auto csc : cscv) { - csc->close(); clients.removeOne(csc); index_c_clients.removeOne(csc); + cs->removeClient(csc); + csc->close(); csc->deleteLater(); } c_servers.remove(cs->serverName()); index_c_servers.removeOne(client); + delete cs; } CloudServer * cc = index_c_clients.value(client, nullptr); if (cc) { @@ -93,10 +95,16 @@ void DispatcherServer::newConnection(PIEthernet *cl) { CONNECTU(client, disconnectEvent, this, disconnectClient); CONNECTL(client, registerServer, [this](PIString sname, DispatcherClient * c){ map_mutex.lock(); - piCoutObj << "add new Server ->" << sname; - CloudServer * cs = new CloudServer(c, sname); - c_servers.insert(sname, cs); - index_c_servers.insert(c, cs); + CloudServer * cs = c_servers.value(sname, nullptr); + if (cs) { + rm_clients << c; + } else { + piCoutObj << "add new Server ->" << sname; + CloudServer * cs = new CloudServer(c, sname); + c_servers.insert(sname, cs); + index_c_servers.insert(c, cs); + c->authorise(true); + } map_mutex.unlock(); }); CONNECTL(client, registerClient, [this](PIString sname, DispatcherClient * c){ @@ -106,12 +114,13 @@ void DispatcherServer::newConnection(PIEthernet *cl) { piCoutObj << "add new Client to Server ->" << sname; cs->addClient(c); index_c_clients.insert(c, cs); + c->authorise(true); } else { rm_clients << c; } map_mutex.unlock(); }); - piCoutObj << "add client" << client; + //piCoutObj << "add client" << client; client->start(); map_mutex.lock(); clients.push_back(client);