diff --git a/libs/cloud/picloudbase.cpp b/libs/cloud/picloudbase.cpp new file mode 100644 index 00000000..8ccd9f72 --- /dev/null +++ b/libs/cloud/picloudbase.cpp @@ -0,0 +1,6 @@ +#include "picloudbase.h" + + +PICloudBase::PICloudBase() : eth(PIEthernet::TCP_Client) { + +} diff --git a/libs/cloud/picloudclient.cpp b/libs/cloud/picloudclient.cpp index b35c7a82..01a3d29b 100644 --- a/libs/cloud/picloudclient.cpp +++ b/libs/cloud/picloudclient.cpp @@ -21,7 +21,7 @@ #include "picloudtcp.h" -PICloudClient::PICloudClient(const PIString & path, PIIODevice::DeviceMode mode) : PIIODevice(path, mode), eth(PIEthernet::TCP_Client) { +PICloudClient::PICloudClient(const PIString & path, PIIODevice::DeviceMode mode) : PIIODevice(path, mode) { tcp.setRole(PICloud::TCP::Client); setName("cloud_client"); } @@ -39,28 +39,49 @@ void PICloudClient::setServerName(const PIString & server_name) { } +void PICloudClient::setKeepConnection(bool on) { + eth.setParameter(PIEthernet::KeepConnection, on); +} + + bool PICloudClient::openDevice() { piCout << "PICloudClient open device" << path(); bool op = eth.connect(path(), false); if (op) { - CONNECTL(ð, disconnected, [this](bool){opened_ = false;}); + CONNECTL(ð, connected, [this](){tcp.sendStart(ð);}); CONNECTU(ð, threadedReadEvent, this, readed); + CONNECTL(ð, disconnected, [this](bool){ + opened_ = false; + eth.close(); + piCoutObj << "disconnected";// << !isOpened() << isClosed(); + piMSleep(100); + }); eth.startThreadedRead(); tcp.sendStart(ð); return true; + } else { + eth.close(); + return false; } - eth.close(); - return false; } bool PICloudClient::closeDevice() { - return eth.close(); + eth.stop(); + eth.close(); + return true; } int PICloudClient::readDevice(void * read_to, int max_size) { - return eth.read(read_to, max_size); + piCoutObj << "readDevice"; + mutex_buff.lock(); + cond_buff.wait(mutex_buff, [this](){return !buff.isEmpty();}); + int sz = piMini(max_size, buff.size()); + memcpy(read_to, buff.data(), sz); + buff.remove(0, sz); + mutex_buff.unlock(); + return sz; } @@ -70,5 +91,15 @@ int PICloudClient::writeDevice(const void * data, int max_size) { void PICloudClient::readed(uchar *data, int size) { - + PIByteArray ba(data, size); + mutex_buff.lock(); + PIPair hdr = tcp.parseHeader(ba); + if (hdr.second == tcp.role()) { + piCoutObj << "readed" << ba.toHex(); + buff.append(data, size); + cond_buff.notifyOne(); + } + mutex_buff.unlock(); + while (buff.size() > threadedReadBufferSize()) piMSleep(100); } + diff --git a/libs/cloud/picloudserver.cpp b/libs/cloud/picloudserver.cpp index 64415822..429c87b1 100644 --- a/libs/cloud/picloudserver.cpp +++ b/libs/cloud/picloudserver.cpp @@ -20,27 +20,41 @@ #include "picloudserver.h" -PICloudServer::PICloudServer(const PIString & path, PIIODevice::DeviceMode mode) : PIIODevice(path, mode), eth(PIEthernet::TCP_Client) { - PIString name = "cloud_server_" + PIString::fromNumber(randomi()%1000); +PICloudServer::PICloudServer(const PIString & path, PIIODevice::DeviceMode mode) : PIIODevice(path, mode), PICloudBase() { + PIString name = "PCS_" + PIString::fromNumber(randomi()%1000); tcp.setRole(PICloud::TCP::Server); tcp.setServerName(name); - setName(name); + setName("cloud_server__" + name); } PICloudServer::~PICloudServer() { + for (auto & c : clients) { + c.stop(); + c.close(); + } stop(); close(); } +void PICloudServer::setServerName(const PIString & server_name) { + setName("cloud_server__" + server_name); + tcp.setServerName(server_name); +} + + bool PICloudServer::openDevice() { piCout << "PICloudServer open device" << path(); bool op = eth.connect(path(), false); if (op) { - CONNECTL(ð, disconnected, [this](bool){opened_ = false;}); CONNECTU(ð, threadedReadEvent, this, readed); CONNECTL(ð, connected, [this](){tcp.sendStart(ð);}); + CONNECTL(ð, disconnected, [this](bool){ + opened_ = false; + eth.close(); + piCoutObj << "disconnected" << !isOpened() << isClosed(); + }); eth.startThreadedRead(); tcp.sendStart(ð); return true; @@ -51,7 +65,13 @@ bool PICloudServer::openDevice() { bool PICloudServer::closeDevice() { - return eth.close(); + eth.stop(); + for (auto & c : clients) { + c.stop(); + c.close(); + } + eth.close(); + return true; } @@ -71,10 +91,14 @@ PICloudServer::Client::Client(PICloudServer * srv) : server(srv) { bool PICloudServer::Client::openDevice() { - return server; + return true; } void PICloudServer::readed(uchar *data, int size) { - + PIByteArray ba(data, size); + PIPair hdr = tcp.parseHeader(ba); + if (hdr.second == tcp.role()) { + piCoutObj << "readed" << ba.toHex(); + } } diff --git a/libs/cloud/picloudtcp.cpp b/libs/cloud/picloudtcp.cpp index 42864ab0..3c4cd892 100644 --- a/libs/cloud/picloudtcp.cpp +++ b/libs/cloud/picloudtcp.cpp @@ -53,6 +53,24 @@ void PICloud::TCP::sendStart(PIEthernet * eth) { } +void PICloud::TCP::sendConnected(PIEthernet * eth, uint client_id) { + header.type = PICloud::TCP::Connect; + PIByteArray ba; + ba << header << client_id; + eth->send(ba); +} + + +void PICloud::TCP::sendData(PIEthernet * eth, const PIByteArray & data) { + header.type = PICloud::TCP::Data; + PIByteArray ba; + ba << header; + ba.append(data); +// piCout << "sendData" << ba.toHex(); + eth->send(ba); +} + + PIPair PICloud::TCP::parseHeader(PIByteArray & ba) { PIPair ret; ret.first = Invalid; diff --git a/libs/main/cloud/picloudbase.h b/libs/main/cloud/picloudbase.h new file mode 100644 index 00000000..7ad4b7dd --- /dev/null +++ b/libs/main/cloud/picloudbase.h @@ -0,0 +1,40 @@ +/*! \file picloudbase.h + * \brief PICloud Base - Base class for PICloudClient and PICloud Server +*/ +/* + PIP - Platform Independent Primitives + PICloud Base - Base class for PICloudClient and PICloud Server + Ivan Pelipenko peri4ko@yandex.ru, Andrey Bychkov work.a.b@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef PICLOUDBASE_H +#define PICLOUDBASE_H + +#include "picloudtcp.h" +#include "piethernet.h" + + +class PICloudBase +{ +public: + PICloudBase(); + +protected: + PIEthernet eth; + PICloud::TCP tcp; +}; + +#endif // PICLOUDBASE_H diff --git a/libs/main/cloud/picloudclient.h b/libs/main/cloud/picloudclient.h index 27e86303..1d2513a7 100644 --- a/libs/main/cloud/picloudclient.h +++ b/libs/main/cloud/picloudclient.h @@ -23,11 +23,11 @@ #ifndef PICLOUDCLIENT_H #define PICLOUDCLIENT_H -#include "picloudtcp.h" -#include "piethernet.h" +#include "picloudbase.h" +#include "piconditionvar.h" -class PIP_CLOUD_EXPORT PICloudClient : public PIIODevice +class PIP_CLOUD_EXPORT PICloudClient : public PIIODevice, private PICloudBase { PIIODEVICE(PICloudClient) public: @@ -36,6 +36,7 @@ public: virtual ~PICloudClient(); void setServerName(const PIString & server_name); + void setKeepConnection(bool on); protected: bool openDevice(); @@ -45,9 +46,9 @@ protected: private: EVENT_HANDLER2(void, readed, uchar * , data, int, size); - - PIEthernet eth; - PICloud::TCP tcp; + PIByteArray buff; + PIMutex mutex_buff; + PIConditionVariable cond_buff; }; #endif // PICLOUDCLIENT_H diff --git a/libs/main/cloud/picloudserver.h b/libs/main/cloud/picloudserver.h index a432481b..2ab2a79d 100644 --- a/libs/main/cloud/picloudserver.h +++ b/libs/main/cloud/picloudserver.h @@ -23,11 +23,10 @@ #ifndef PICLOUDSERVER_H #define PICLOUDSERVER_H -#include "picloudtcp.h" -#include "piethernet.h" +#include "picloudbase.h" -class PIP_CLOUD_EXPORT PICloudServer : public PIIODevice +class PIP_CLOUD_EXPORT PICloudServer : public PIIODevice, private PICloudBase { PIIODEVICE(PICloudServer) public: @@ -45,6 +44,8 @@ public: PICloudServer * server; }; + void setServerName(const PIString & server_name); + EVENT1(newConnection, PICloudServer::Client * , client) protected: @@ -56,9 +57,7 @@ protected: private: EVENT_HANDLER2(void, readed, uchar * , data, int, size); - PIEthernet eth; PIVector clients; - PICloud::TCP tcp; }; #endif // PICLOUDSERVER_H diff --git a/libs/main/cloud/picloudtcp.h b/libs/main/cloud/picloudtcp.h index 8f533ff6..ca492e2d 100644 --- a/libs/main/cloud/picloudtcp.h +++ b/libs/main/cloud/picloudtcp.h @@ -53,9 +53,12 @@ public: TCP(); void setRole(Role r); + Role role() const {return (Role)header.role;} void setServerName(const PIString & server_name); void sendStart(PIEthernet * eth); + void sendConnected(PIEthernet * eth, uint client_id); + void sendData(PIEthernet * eth, const PIByteArray & data); PIPair parseHeader(PIByteArray & ba); PIByteArray parseData(PIByteArray & ba); PIString parseConnect(PIByteArray & ba); diff --git a/main.cpp b/main.cpp index 24975a80..92b40f6b 100644 --- a/main.cpp +++ b/main.cpp @@ -4,8 +4,11 @@ int main(int argc, char * argv[]) { PICLI cli(argc, argv); cli.addArgument("connect", true); + cli.addArgument("name", true); PICloudClient c("127.0.0.1:10101"); +// c.setReopenEnabled(true); PICloudServer s("127.0.0.1:10101"); + if (cli.hasArgument("name")) s.setServerName(cli.argumentValue("name")); if (cli.hasArgument("connect")) { c.setServerName(cli.argumentValue("connect")); c.startThreadedRead(); diff --git a/utils/cloud_dispatcher/cloudserver.cpp b/utils/cloud_dispatcher/cloudserver.cpp index f114ea8c..80d27df4 100644 --- a/utils/cloud_dispatcher/cloudserver.cpp +++ b/utils/cloud_dispatcher/cloudserver.cpp @@ -20,11 +20,13 @@ PIString CloudServer::serverName() const { void CloudServer::addClient(DispatcherClient * c) { clients << c; + index_clients.insert(c->clientId(), c); } void CloudServer::removeClient(DispatcherClient * c) { clients.removeOne(c); + index_clients.removeOne(c->clientId()); } @@ -34,9 +36,11 @@ PIVector CloudServer::getClients() { void CloudServer::printStatus() { - piCout << " " << "Clients for" << server->address() << ":"; + piCout << " " << "Clients for" << server->address() << server_name << ":"; for (auto c: clients) { - piCout << " " << c->address(); + piCout << " " << c->address() << c->clientId(); } + for (auto c: clients) c->sendData(PIByteArray::fromHex("000000")); + server->sendData(PIByteArray::fromHex("000000")); } diff --git a/utils/cloud_dispatcher/cloudserver.h b/utils/cloud_dispatcher/cloudserver.h index 5a169bd5..8800e0bf 100644 --- a/utils/cloud_dispatcher/cloudserver.h +++ b/utils/cloud_dispatcher/cloudserver.h @@ -18,6 +18,7 @@ public: private: DispatcherClient * server; PIVector clients; + PIMap index_clients; PIString server_name; }; diff --git a/utils/cloud_dispatcher/dispatcherclient.cpp b/utils/cloud_dispatcher/dispatcherclient.cpp index 1995c25a..e2a9ef48 100644 --- a/utils/cloud_dispatcher/dispatcherclient.cpp +++ b/utils/cloud_dispatcher/dispatcherclient.cpp @@ -2,7 +2,7 @@ #include "picloudtcp.h" -DispatcherClient::DispatcherClient(PIEthernet * eth_) : eth(eth_), authorised(false) { +DispatcherClient::DispatcherClient(PIEthernet * eth_, int id) : eth(eth_), authorised(false), client_id(id) { CONNECTU(&disconnect_tm, tickEvent, eth, close); CONNECTU(eth, threadedReadEvent, this, readed); CONNECTU(eth, disconnected, this, disconnected); @@ -12,7 +12,7 @@ DispatcherClient::DispatcherClient(PIEthernet * eth_) : eth(eth_), authorised(fa void DispatcherClient::start() { eth->startThreadedRead(); - disconnect_tm.start(10000); + //disconnect_tm.start(10000); } @@ -30,6 +30,16 @@ void DispatcherClient::close() { } +void DispatcherClient::sendConnected() { + tcp.sendConnected(eth, client_id); +} + + +void DispatcherClient::sendData(const PIByteArray & data) { + tcp.sendData(eth, data); +} + + void DispatcherClient::disconnected(bool withError) { piCoutObj << "client disconnected" << eth->sendAddress(); disconnectEvent(this); @@ -37,7 +47,7 @@ void DispatcherClient::disconnected(bool withError) { void DispatcherClient::readed(uchar *data, int size) { - piCout << size; +// piCout << size; PIByteArray ba(data, size); PIPair hdr = tcp.parseHeader(ba); if (hdr.first == PICloud::TCP::Invalid) { @@ -61,6 +71,7 @@ void DispatcherClient::readed(uchar *data, int size) { } else { switch (hdr.first) { case PICloud::TCP::Connect: { + tcp.setRole(hdr.second); PIString sn = tcp.parseConnect(ba); if (hdr.second == PICloud::TCP::Server) registerServer(sn, this); if (hdr.second == PICloud::TCP::Client) registerClient(sn, this); diff --git a/utils/cloud_dispatcher/dispatcherclient.h b/utils/cloud_dispatcher/dispatcherclient.h index 22d3cce6..1584a6d8 100644 --- a/utils/cloud_dispatcher/dispatcherclient.h +++ b/utils/cloud_dispatcher/dispatcherclient.h @@ -8,14 +8,17 @@ class DispatcherClient: public PIObject { PIOBJECT(DispatcherClient) public: - DispatcherClient(PIEthernet * eth_); - void start(); + DispatcherClient(PIEthernet * eth_, int id); ~DispatcherClient(); + void start(); + void close(); + void sendConnected(); + void sendData(const PIByteArray & data); + PIString address(); + uint clientId() const {return client_id;} EVENT1(disconnectEvent, DispatcherClient *, client) EVENT2(registerServer, PIString, sname, DispatcherClient *, client) EVENT2(registerClient, PIString, sname, DispatcherClient *, client) - PIString address(); - void close(); EVENT1(dataReaded, PIByteArray, ba) private: @@ -26,6 +29,7 @@ private: PITimer disconnect_tm; bool authorised; PICloud::TCP tcp; + uint client_id; }; diff --git a/utils/cloud_dispatcher/dispatcherserver.cpp b/utils/cloud_dispatcher/dispatcherserver.cpp index 702af9f9..eaf2c4bc 100644 --- a/utils/cloud_dispatcher/dispatcherserver.cpp +++ b/utils/cloud_dispatcher/dispatcherserver.cpp @@ -2,12 +2,15 @@ DispatcherServer::DispatcherServer(PIEthernet::Address addr) : eth(PIEthernet::TCP_Server) { + client_gid = 0; eth.setParameter(PIEthernet::ReuseAddress); CONNECTU(ð, newConnection, this, newConnection); eth.listen(addr, true); piCoutObj << "server started" << addr; CONNECTU(&status_timer, tickEvent, this, printStatus); + CONNECTU(&timeout_timer, tickEvent, this, cleanClients); status_timer.start(1000); + timeout_timer.start(5000); } @@ -17,6 +20,26 @@ DispatcherServer::~DispatcherServer() { } +void DispatcherServer::cleanClients() { + PIVector rm; + map_mutex.lock(); + for (auto c: clients) { + if (!index_c_servers.contains(c) && !index_c_clients.contains(c)) { + if (rm_clients.contains(c)) rm << c; + else rm_clients << c; + } else rm_clients.removeAll(c); + } + for (auto c: rm_clients) { + if (clients.contains(c)) rm << c; + } + map_mutex.unlock(); + for (auto c: rm) { + c->close(); +// c->deleteLater(); + } +} + + void DispatcherServer::printStatus() { map_mutex.lock(); piCout << PICoutManipulators::NewLine; @@ -48,20 +71,25 @@ void DispatcherServer::disconnectClient(DispatcherClient *client) { for(auto csc : cscv) { csc->close(); clients.removeOne(csc); - delete csc; + index_c_clients.removeOne(csc); + csc->deleteLater(); } c_servers.remove(cs->serverName()); + index_c_servers.removeOne(client); } CloudServer * cc = index_c_clients.value(client, nullptr); - if (cc) cc->removeClient(client); + if (cc) { + cc->removeClient(client); + index_c_clients.removeOne(client); + } client->close(); map_mutex.unlock(); - delete client; + client->deleteLater(); } void DispatcherServer::newConnection(PIEthernet *cl) { - DispatcherClient * client = new DispatcherClient(cl); + DispatcherClient * client = new DispatcherClient(cl, client_gid++); CONNECTU(client, disconnectEvent, this, disconnectClient); CONNECTL(client, registerServer, [this](PIString sname, DispatcherClient * c){ map_mutex.lock(); @@ -78,6 +106,8 @@ void DispatcherServer::newConnection(PIEthernet *cl) { piCoutObj << "add new Client to Server ->" << sname; cs->addClient(c); index_c_clients.insert(c, cs); + } else { + rm_clients << c; } map_mutex.unlock(); }); diff --git a/utils/cloud_dispatcher/dispatcherserver.h b/utils/cloud_dispatcher/dispatcherserver.h index c4161aba..e1d6fa9a 100644 --- a/utils/cloud_dispatcher/dispatcherserver.h +++ b/utils/cloud_dispatcher/dispatcherserver.h @@ -14,14 +14,18 @@ public: private: EVENT_HANDLER1(void, newConnection, PIEthernet * , cl); EVENT_HANDLER1(void, disconnectClient, DispatcherClient *, client); + EVENT_HANDLER0(void, cleanClients); PIEthernet eth; PIVector clients; PIMap c_servers; PIMap index_c_servers; PIMap index_c_clients; + PIVector rm_clients; PITimer status_timer; + PITimer timeout_timer; PIMutex map_mutex; + uint client_gid; }; #endif // DISPATCHERSERVER_H