PIClout data send and receive test

This commit is contained in:
2021-04-07 18:03:28 +03:00
parent 21e00e7176
commit bf63b0e9f3
6 changed files with 115 additions and 36 deletions

View File

@@ -113,8 +113,8 @@ bool PICloudServer::Client::openDevice() {
bool PICloudServer::Client::closeDevice() {
server->clientDisconnect(client_id);
if (is_connected) {
server->clientDisconnect(client_id);
is_connected = false;
cond_buff.notifyOne();
}
@@ -170,7 +170,10 @@ void PICloudServer::_readed(PIByteArray & ba) {
case PICloud::TCP::Disconnect: {
uint id = tcp.parseDisconnect(ba);
Client * oc = index_clients.value(id, nullptr);
if (oc) oc->close();
if (oc) {
oc->is_connected = false;
oc->close();
}
} break;
case PICloud::TCP::Data: {
PIPair<uint, PIByteArray> d = tcp.parseDataServer(ba);

View File

@@ -3,11 +3,46 @@
int main(int argc, char * argv[]) {
PICLI cli(argc, argv);
PITimer tm;
cli.addArgument("connect", true);
cli.addArgument("name", true);
PICloudClient c("127.0.0.1:10101");
// c.setReopenEnabled(true);
PICloudServer s("127.0.0.1:10101");
PIVector<PICloudServer::Client *> clients;
CONNECTL(&tm, tickEvent, ([&](void *, int){
if (c.isConnected()) {
PIString str = "ping";
piCout << "[Client] send:" << str;
c.write(str.toByteArray());
}
if (s.isRunning()) {
for (auto cl : clients) {
if (cl->isOpened()) {
PIString str = "ping_S";
piCout << "[Server] send to" << cl << ":" << str;
cl->write(str.toByteArray());
}
}
}
}));
CONNECTL(&c, threadedReadEvent, ([&](uchar * readed, int size){
PIByteArray ba(readed, size);
PIString str = PIString(ba);
piCout << "[Client] data:" << str;
if (str == "ping_S") c.write(PIString("pong_S").toByteArray());
}));
CONNECTL(&s, newConnection, ([&](PICloudServer::Client * cl){
piCout << "[Server] new client:" << cl;
clients << cl;
CONNECTL(cl, threadedReadEvent, ([&c, &s, cl](uchar * readed, int size){
PIByteArray ba(readed, size);
PIString str = PIString(ba);
piCout << "[Server] data from" << cl << ":" << str;
if (str == "ping") cl->write(PIString("pong").toByteArray());
}));
cl->startThreadedRead();
}));
if (cli.hasArgument("name")) s.setServerName(cli.argumentValue("name"));
if (cli.hasArgument("connect")) {
c.setServerName(cli.argumentValue("connect"));
@@ -15,6 +50,7 @@ int main(int argc, char * argv[]) {
} else {
s.startThreadedRead();
}
tm.start(1000);
PIKbdListener ls;
ls.enableExitCapture(PIKbdListener::F10);
ls.start();

View File

@@ -3,6 +3,10 @@
CloudServer::CloudServer(DispatcherClient * c, const PIString & sname) : server(c) {
setName(sname);
server_name = sname;
CONNECTL(c, dataReadedServer, ([this](uint id, PIByteArray & ba){
DispatcherClient * cl = index_clients.value(id, nullptr);
if (cl) cl->sendData(ba);
}));
}
@@ -21,7 +25,14 @@ PIString CloudServer::serverName() const {
void CloudServer::addClient(DispatcherClient * c) {
clients << c;
index_clients.insert(c->clientId(), c);
c->sendConnected();
c->sendConnected(1);
server->sendConnected(c->clientId());
CONNECTL(c, dataReaded, ([this, c](PIByteArray & ba){
// piCoutObj << c->clientId() << "dataReaded";
if (clients.contains(c)) {
server->sendDataToClient(ba, c->clientId());
}
}));
}
@@ -41,9 +52,10 @@ void CloudServer::printStatus() {
for (auto c: clients) {
piCout << " " << c->address() << c->clientId();
}
for (auto c: clients) {
c->sendData(PIByteArray::fromHex("000000"));
server->sendDataToClient(PIByteArray::fromHex("000000"), c->clientId());
}
// for (auto c: clients) {
// c->sendData(PIByteArray::fromHex("000000"));
// server->sendDataToClient(PIByteArray::fromHex("000000"), c->clientId());
// }
}

View File

@@ -30,9 +30,9 @@ void DispatcherClient::close() {
}
void DispatcherClient::sendConnected() {
piCoutObj << "sendConnected";
tcp.sendConnected(1);
void DispatcherClient::sendConnected(uint client_id) {
//piCoutObj << "sendConnected";
tcp.sendConnected(client_id);
}
@@ -48,8 +48,13 @@ void DispatcherClient::sendDataToClient(const PIByteArray & data, uint client_id
}
void DispatcherClient::authorise(bool ok) {
authorised = ok;
}
void DispatcherClient::disconnected(bool withError) {
piCoutObj << "client disconnected" << eth->sendAddress();
//piCoutObj << "client disconnected" << eth->sendAddress();
disconnectEvent(this);
}
@@ -62,6 +67,7 @@ void DispatcherClient::readed(PIByteArray & ba) {
return;
}
if (authorised) {
if (hdr.second == tcp.role()) {
switch (hdr.first) {
case PICloud::TCP::Connect:
return;
@@ -69,12 +75,23 @@ void DispatcherClient::readed(PIByteArray & ba) {
disconnected(false);
return;
case PICloud::TCP::Data:
dataReaded(tcp.parseData(ba));
// piCoutObj << "TCP::Data";
if (tcp.role() == PICloud::TCP::Client) {
PIByteArray data = tcp.parseData(ba);
if (!data.isEmpty()) dataReaded(data);
else piCoutObj << "invalid data from client";
}
if (tcp.role() == PICloud::TCP::Server) {
PIPair<uint, PIByteArray> dp = tcp.parseDataServer(ba);
if (!dp.second.isEmpty()) dataReadedServer(dp.first, dp.second);
else piCoutObj << "invalid data from server";
}
return;
default:
disconnected(true);
//disconnected(true);
return;
}
}
} else {
switch (hdr.first) {
case PICloud::TCP::Connect: {

View File

@@ -13,22 +13,24 @@ public:
~DispatcherClient();
void start();
void close();
void sendConnected();
void sendConnected(uint client_id);
void sendData(const PIByteArray & data);
void sendDataToClient(const PIByteArray & data, uint client_id);
void authorise(bool ok);
PIString address();
uint clientId() const {return client_id;}
EVENT1(disconnectEvent, DispatcherClient *, client)
EVENT2(registerServer, PIString, sname, DispatcherClient *, client)
EVENT2(registerClient, PIString, sname, DispatcherClient *, client)
EVENT1(dataReaded, PIByteArray, ba)
EVENT1(dataReaded, PIByteArray &, ba)
EVENT2(dataReadedServer, uint, id, PIByteArray &, ba)
private:
EVENT_HANDLER1(void, readed, PIByteArray &, data);
EVENT_HANDLER1(void, disconnected, bool, withError);
PITimer disconnect_tm;
bool authorised;
std::atomic_bool authorised;
PIEthernet * eth;
PIStreamPacker streampacker;
PICloud::TCP tcp;

View File

@@ -16,7 +16,7 @@ DispatcherServer::DispatcherServer(PIEthernet::Address addr) : eth(PIEthernet::T
DispatcherServer::~DispatcherServer() {
eth.close();
piCoutObj << "server stoped";
//piCoutObj << "server stoped";
}
@@ -59,23 +59,25 @@ void DispatcherServer::printStatus() {
void DispatcherServer::disconnectClient(DispatcherClient *client) {
if (!clients.contains(client)) {
piCoutObj << "INVALID client" << client;
//piCoutObj << "INVALID client" << client;
return;
}
piCoutObj << "remove client" << client;
//piCoutObj << "remove client" << client;
map_mutex.lock();
clients.removeOne(client);
CloudServer * cs = index_c_servers.value(client, nullptr);
if (cs) {
PIVector<DispatcherClient *> cscv = cs->getClients();
for(auto csc : cscv) {
csc->close();
clients.removeOne(csc);
index_c_clients.removeOne(csc);
cs->removeClient(csc);
csc->close();
csc->deleteLater();
}
c_servers.remove(cs->serverName());
index_c_servers.removeOne(client);
delete cs;
}
CloudServer * cc = index_c_clients.value(client, nullptr);
if (cc) {
@@ -93,10 +95,16 @@ void DispatcherServer::newConnection(PIEthernet *cl) {
CONNECTU(client, disconnectEvent, this, disconnectClient);
CONNECTL(client, registerServer, [this](PIString sname, DispatcherClient * c){
map_mutex.lock();
CloudServer * cs = c_servers.value(sname, nullptr);
if (cs) {
rm_clients << c;
} else {
piCoutObj << "add new Server ->" << sname;
CloudServer * cs = new CloudServer(c, sname);
c_servers.insert(sname, cs);
index_c_servers.insert(c, cs);
c->authorise(true);
}
map_mutex.unlock();
});
CONNECTL(client, registerClient, [this](PIString sname, DispatcherClient * c){
@@ -106,12 +114,13 @@ void DispatcherServer::newConnection(PIEthernet *cl) {
piCoutObj << "add new Client to Server ->" << sname;
cs->addClient(c);
index_c_clients.insert(c, cs);
c->authorise(true);
} else {
rm_clients << c;
}
map_mutex.unlock();
});
piCoutObj << "add client" << client;
//piCoutObj << "add client" << client;
client->start();
map_mutex.lock();
clients.push_back(client);