#include "dispatcherserver.h" #include "piscreentiles.h" DispatcherServer::DispatcherServer(PINetworkAddress addr): eth(PIEthernet::TCP_Server) { client_gid = 0; max_connections = 1000; eth.setParameter(PIEthernet::ReuseAddress); eth.setReadAddress(addr); // eth.setDebug(false); CONNECTU(ð, newConnection, this, newConnection); CONNECTU(&timeout_timer, tickEvent, this, cleanClients); } DispatcherServer::~DispatcherServer() { eth.close(); // piCoutObj << "server stoped"; } void DispatcherServer::start() { eth.listen(true); timeout_timer.start(2000); piCoutObj << "server started" << eth.readAddress(); } void DispatcherServer::picoutStatus() { PIMutexLocker locker(map_mutex); piCout << PICoutManipulators::NewLine; piCout << "Connections:"; for (auto c: clients) { piCout << " " << c->address(); } piCout << "Servers:"; auto it = c_servers.makeIterator(); while (it.next()) { piCout << " " << it.key(); it.value()->printStatus(); } } void DispatcherServer::cleanClients() { PIMutexLocker locker(map_mutex); for (auto s: rmrf_servers) s->close(); piDeleteAllAndClear(rmrf_servers); for (auto c: rmrf_clients) { if (!c->isPIObject()) piCout << "ACHTUNG! Non-piobject client!"; } piDeleteAllAndClear(rmrf_clients); for (auto c: clients) { if (!index_c_servers.contains(c) && !index_c_clients.contains(c)) { if (!rm_clients.contains(c)) rm_clients << c; } else rm_clients.removeAll(c); } auto ss = c_servers.values(); for (auto c: ss) { if (c->lastPing() > 15.0) { piCout << "remove Server by ping timeout" << c->getConnection()->clientId(); c->close(); PIVector cscv = c->getClients(); for (auto csc: cscv) { if (!csc->isPIObject()) piCout << "ACHTUNG! Non-piobject DispatcherClient!"; clients.removeAll(csc); index_c_clients.remove(csc); c->removeClient(csc); csc->close(); if (!rmrf_clients.contains(csc)) rmrf_clients << csc; } c_servers.remove(c->serverUUID()); index_c_servers.remove(c->getConnection()); rmrf_clients << const_cast(c->getConnection()); rmrf_servers << c; } } for (auto c: rm_clients) { if (clients.contains(c)) { if (!rmrf_clients.contains(c)) rmrf_clients << c; } } for (auto c: rmrf_clients) { clients.removeAll(c); if (index_c_servers.contains(c)) { c_servers.remove(c_servers.key(index_c_servers[c])); index_c_servers.remove(c); } index_c_clients.remove(c); rm_clients.removeAll(c); } } void DispatcherServer::updateConnectionsTile(TileList * tl) { PIMutexLocker locker(map_mutex); tl->content.clear(); for (auto c: clients) { PIString role = "Invalid"; switch (c->role()) { case PICloud::TCP::Client: { role = "Client"; CloudServer * cs = index_c_clients.value(c, nullptr); if (cs) role += " \"" + cs->serverUUID().toHex().left(8) + "...\""; } break; case PICloud::TCP::Server: { role = "Server"; CloudServer * cs = index_c_servers.value(c, nullptr); if (cs) role += " \"" + cs->serverUUID().toHex().left(8) + "...\""; } break; default: break; } tl->content << TileList::Row(c->address() + " " + role, PIScreenTypes::CellFormat()); } for (auto c: rm_clients) { tl->content << TileList::Row("[deleting]" + c->address(), PIScreenTypes::CellFormat()); } for (auto c: rmrf_clients) { tl->content << TileList::Row("[NULL]" + c->address(), PIScreenTypes::CellFormat()); } } void DispatcherServer::updateServersTile(TileList * tl, PISet servers) { PIMutexLocker locker(map_mutex); tl->content.clear(); auto mi = c_servers.makeIterator(); while (mi.next()) { tl->content << TileList::Row(mi.value()->serverUUID().toHex().left(8) + "... - " + PIString::fromNumber(mi.value()->getClients().size()), PIScreenTypes::CellFormat()); if (servers.contains(mi.value()->getConnection())) tl->selected << (tl->content.size_s() - 1); } } void DispatcherServer::updateClientsTile(TileList * tl, PISet servers) { PIMutexLocker locker(map_mutex); tl->content.clear(); auto mi = c_servers.makeIterator(); while (mi.next()) { for (auto c: mi.value()->getClients()) { if (servers.isEmpty() || servers.contains(mi.value()->getConnection())) tl->content << TileList::Row(c->address(), PIScreenTypes::CellFormat()); } } } const DispatcherClient * DispatcherServer::getConnection(int index) { PIMutexLocker locker(map_mutex); const DispatcherClient * ret = nullptr; if (index >= 0 && index < clients.size_s()) ret = clients[index]; return ret; } const DispatcherClient * DispatcherServer::getServer(int index) { PIMutexLocker locker(map_mutex); const DispatcherClient * ret = nullptr; if (index >= 0 && index < clients.size_s()) { if (index_c_servers.contains(clients[index])) ret = clients[index]; } return ret; } PISet DispatcherServer::getServers(PISet ids) { PISet ret; if (ids.isEmpty()) return ret; PIMutexLocker locker(map_mutex); int i = 0; auto mi = c_servers.makeIterator(); while (mi.next()) { if (ids.contains(i)) ret << mi.value()->getConnection(); i++; } return ret; } void DispatcherServer::setMaxConnections(uint max_count) { max_connections = max_count; } void DispatcherServer::disconnectClient(DispatcherClient * client) { PIMutexLocker locker(map_mutex); if (!clients.contains(client)) { // piCoutObj << "INVALID client" << client; return; } piCoutObj << "remove ..." << client->clientId(); clients.removeAll(client); rm_clients.removeAll(client); CloudServer * cs = index_c_servers.value(client, nullptr); if (cs) { piCoutObj << "remove Server" << client->clientId(); cs->stop(); PIVector cscv = cs->getClients(); for (auto csc: cscv) { clients.removeAll(csc); index_c_clients.remove(csc); if (!rmrf_clients.contains(csc)) rmrf_clients << csc; } c_servers.remove(cs->serverUUID()); index_c_servers.remove(client); if (!rmrf_servers.contains(cs)) rmrf_servers << cs; } CloudServer * cc = index_c_clients.value(client, nullptr); if (cc) { piCoutObj << "remove Client" << client->clientId(); cc->removeClient(client); index_c_clients.remove(client); } // client->close(); if (!rmrf_clients.contains(client)) rmrf_clients << client; piCoutObj << "remove done" << client->clientId(); } void DispatcherServer::newConnection(PIEthernet * cl) { // piCout << "DispatcherServer::newConnection" << (void *)cl; PIMutexLocker locker(map_mutex); if (clients.size() >= max_connections) { // piCout << "DispatcherServer::newConnection overflow" << (void *)cl; delete cl; return; } DispatcherClient * client = new DispatcherClient(cl, client_gid++); CONNECTU(client, disconnectEvent, this, disconnectClient); CONNECTL(client, registerServer, [this](const PIByteArray & sname, DispatcherClient * c) { PIMutexLocker locker(map_mutex); CloudServer * cs = c_servers.value(sname, nullptr); if (cs) { rm_clients << c; piCoutObj << "duplicate Server ->" << sname.toHex(); } else { piCoutObj << "add new Server ->" << sname.toHex(); CloudServer * cs = new CloudServer(c, sname); c_servers.insert(sname, cs); index_c_servers.insert(c, cs); c->authorise(true); } }); CONNECTL(client, registerClient, [this](const PIByteArray & sname, DispatcherClient * c) { PIMutexLocker locker(map_mutex); CloudServer * cs = c_servers.value(sname, nullptr); if (cs) { piCoutObj << "add new Client to Server ->" << sname.toHex(); c->authorise(true); cs->addClient(c); index_c_clients.insert(c, cs); } else { rm_clients << c; piCoutObj << "Client can't connect to Server ->" << sname.toHex(); } }); // piCoutObj << "add client" << client; clients.push_back(client); client->start(); // piCout << "DispatcherServer::newConnection started" << (void *)cl; }