PICloud many important fixes

This commit is contained in:
Бычков Андрей
2022-11-11 16:18:05 +03:00
parent ec8fbcb112
commit cb59017ebb
9 changed files with 57 additions and 19 deletions

View File

@@ -23,6 +23,7 @@
PICloudClient::PICloudClient(const PIString & path, PIIODevice::DeviceMode mode) : PIIODevice(path, mode), PICloudBase() { PICloudClient::PICloudClient(const PIString & path, PIIODevice::DeviceMode mode) : PIIODevice(path, mode), PICloudBase() {
tcp.setRole(PICloud::TCP::Client); tcp.setRole(PICloud::TCP::Client);
setThreadedReadBufferSize(eth.threadedReadBufferSize());
setName("cloud_client"); setName("cloud_client");
is_connected = false; is_connected = false;
is_deleted = false; is_deleted = false;
@@ -182,6 +183,5 @@ void PICloudClient::_readed(PIByteArray & ba) {
} }
//piCoutObj << "readed" << ba.toHex(); //piCoutObj << "readed" << ba.toHex();
} }
if (buff.size_s() > threadedReadBufferSize()) piMSleep(100); // FIXME: sleep here is bad
//piCoutObj << "_readed done"; //piCoutObj << "_readed done";
} }

View File

@@ -27,27 +27,33 @@ PICloudServer::PICloudServer(const PIString & path, PIIODevice::DeviceMode mode)
setName("cloud_server__" + server_name); setName("cloud_server__" + server_name);
is_deleted = false; is_deleted = false;
eth.setReopenEnabled(false); eth.setReopenEnabled(false);
setThreadedReadBufferSize(eth.threadedReadBufferSize());
CONNECT1(void, PIByteArray, &streampacker, packetReceiveEvent, this, _readed); CONNECT1(void, PIByteArray, &streampacker, packetReceiveEvent, this, _readed);
CONNECTL(&eth, connected, [this](){ CONNECTL(&eth, connected, [this](){
open_mutex.lock();
opened_ = true; opened_ = true;
//piCoutObj << "connected" << &eth; cvar.notifyOne();
open_mutex.unlock();
//piCoutObj << "connected";
tcp.sendStart(); tcp.sendStart();
}); });
CONNECTL(&eth, disconnected, [this](bool){ CONNECTL(&eth, disconnected, [this](bool){
if (is_deleted) return; if (is_deleted) return;
//piCoutObj << "disconnected" << &eth; //piCoutObj << "disconnected";
clients_mutex.lock(); clients_mutex.lock();
removed_clients_.append(clients_);
for (auto c : clients_) { for (auto c : clients_) {
c->is_connected = false; c->is_connected = false;
c->close(); c->close();
} }
removed_clients_.append(clients_);
clients_.clear(); clients_.clear();
index_clients.clear(); index_clients.clear();
clients_mutex.unlock(); clients_mutex.unlock();
open_mutex.lock();
opened_ = false; opened_ = false;
cvar.notifyOne();
open_mutex.unlock();
ping_timer.stop(); ping_timer.stop();
piMSleep(100);
}); });
CONNECTL(&ping_timer, tickEvent, [this] (void *, int){ CONNECTL(&ping_timer, tickEvent, [this] (void *, int){
if (eth.isConnected()) tcp.sendPing(); if (eth.isConnected()) tcp.sendPing();
@@ -62,10 +68,10 @@ PICloudServer::~PICloudServer() {
close(); close();
waitThreadedReadFinished(); waitThreadedReadFinished();
//piCout << "wait"; //piCout << "wait";
for (auto c : removed_clients_) { while(removed_clients_.isNotEmpty()) {
Client * c = removed_clients_.take_back();
delete c; delete c;
} }
removed_clients_.clear();
//piCoutObj << "~PICloudServer done" << this; //piCoutObj << "~PICloudServer done" << this;
} }
@@ -83,7 +89,7 @@ PIVector<PICloudServer::Client *> PICloudServer::clients() const {
bool PICloudServer::openDevice() { bool PICloudServer::openDevice() {
//piCout << "PICloudServer open device" << path(); piCout << "PICloudServer open device" << path();
bool op = eth.connect(PIEthernet::Address::resolve(path()), false); bool op = eth.connect(PIEthernet::Address::resolve(path()), false);
if (op) { if (op) {
eth.startThreadedRead(); eth.startThreadedRead();
@@ -102,7 +108,12 @@ bool PICloudServer::closeDevice() {
eth.stopAndWait(); eth.stopAndWait();
ping_timer.stop(); ping_timer.stop();
eth.close(); eth.close();
cvar.notifyOne();
clients_mutex.lock(); clients_mutex.lock();
for (auto c : clients_) {
c->is_connected = false;
c->close();
}
removed_clients_.append(clients_); removed_clients_.append(clients_);
clients_.clear(); clients_.clear();
index_clients.clear(); index_clients.clear();
@@ -114,7 +125,10 @@ bool PICloudServer::closeDevice() {
ssize_t PICloudServer::readDevice(void * read_to, ssize_t max_size) { ssize_t PICloudServer::readDevice(void * read_to, ssize_t max_size) {
if (is_deleted) return -1; if (is_deleted) return -1;
//piCoutObj << "readDevice"; //piCoutObj << "readDevice";
if (!opened_) openDevice(); open_mutex.lock();
if (isOpened()) cvar.wait(open_mutex);
open_mutex.unlock();
//piCoutObj << "opened_ = " << opened_;
//else piMSleep(eth.readTimeout()); //else piMSleep(eth.readTimeout());
return -1; return -1;
} }
@@ -128,6 +142,7 @@ ssize_t PICloudServer::writeDevice(const void * data, ssize_t max_size) {
void PICloudServer::interrupt() { void PICloudServer::interrupt() {
eth.interrupt(); eth.interrupt();
cvar.notifyOne();
} }
@@ -145,6 +160,7 @@ int PICloudServer::sendData(const PIByteArray & data, uint client_id) {
PICloudServer::Client::Client(PICloudServer * srv, uint id) : server(srv), client_id(id) { PICloudServer::Client::Client(PICloudServer * srv, uint id) : server(srv), client_id(id) {
setMode(PIIODevice::ReadWrite); setMode(PIIODevice::ReadWrite);
setReopenEnabled(false); setReopenEnabled(false);
setThreadedReadBufferSize(server->threadedReadBufferSize());
is_connected = true; is_connected = true;
} }

View File

@@ -86,6 +86,8 @@ private:
PIVector<Client *> removed_clients_; PIVector<Client *> removed_clients_;
PIMap<uint, Client *> index_clients; PIMap<uint, Client *> index_clients;
PITimer ping_timer; PITimer ping_timer;
PIConditionVariable cvar;
PIMutex open_mutex;
mutable PIMutex clients_mutex; mutable PIMutex clients_mutex;
std::atomic_bool is_deleted; std::atomic_bool is_deleted;
}; };

View File

@@ -588,7 +588,7 @@ bool PIEthernet::connect(bool threaded) {
if (!connected_) { if (!connected_) {
piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString(); piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString();
} }
opened_ = connected_; opened_.exchange(connected_);
if (connected_) { if (connected_) {
connected(); connected();
} }
@@ -720,7 +720,7 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) {
piCoutObj << "connect to " << path() << connected_; piCoutObj << "connect to " << path() << connected_;
if (!connected_) if (!connected_)
piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString(); piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString();
opened_ = connected_; opened_.exchange(connected_);
if (connected_) { if (connected_) {
connecting_ = false; connecting_ = false;
connected(); connected();
@@ -847,7 +847,7 @@ ssize_t PIEthernet::writeDevice(const void * data, ssize_t max_size) {
connected_ = connectTCP(); connected_ = connectTCP();
if (!connected_) if (!connected_)
piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString(); piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString();
opened_ = connected_; opened_.exchange(connected_);
if (connected_) { if (connected_) {
connecting_ = false; connecting_ = false;
connected(); connected();

View File

@@ -304,6 +304,7 @@ ssize_t PIIODevice::write(const void * data, ssize_t max_size) {
void PIIODevice::_init() { void PIIODevice::_init() {
opened_ = false;
setOptions(0); setOptions(0);
setReopenEnabled(true); setReopenEnabled(true);
setReopenTimeout(1000); setReopenTimeout(1000);

View File

@@ -540,7 +540,7 @@ protected:
DeviceMode mode_; DeviceMode mode_;
DeviceOptions options_; DeviceOptions options_;
ReadRetFunc func_read = nullptr; ReadRetFunc func_read = nullptr;
bool opened_ = false; std::atomic_bool opened_;
void * ret_data_ = nullptr; void * ret_data_ = nullptr;
private: private:

View File

@@ -5,7 +5,9 @@ CloudServer::CloudServer(DispatcherClient * c, const PIByteArray & sname) : serv
server_uuid = sname; server_uuid = sname;
CONNECTL(c, dataReadedServer, ([this](uint id, PIByteArray & ba){ CONNECTL(c, dataReadedServer, ([this](uint id, PIByteArray & ba){
last_ping.reset(); last_ping.reset();
mutex_clients.lock();
DispatcherClient * cl = index_clients.value(id, nullptr); DispatcherClient * cl = index_clients.value(id, nullptr);
mutex_clients.unlock();
if (cl) cl->sendData(ba); if (cl) cl->sendData(ba);
})); }));
CONNECTL(c, pingReceived, [this]() {last_ping.reset();}); CONNECTL(c, pingReceived, [this]() {last_ping.reset();});
@@ -27,9 +29,11 @@ PIByteArray CloudServer::serverUUID() const {
void CloudServer::addClient(DispatcherClient * c) { void CloudServer::addClient(DispatcherClient * c) {
last_ping.reset(); last_ping.reset();
mutex_clients.lock();
clients << c; clients << c;
uint cid = c->clientId(); uint cid = c->clientId();
index_clients.insert(cid, c); index_clients.insert(cid, c);
mutex_clients.unlock();
c->sendConnected(1); c->sendConnected(1);
server->sendConnected(cid); server->sendConnected(cid);
CONNECTL(c, dataReaded, ([this, cid](PIByteArray & ba){ CONNECTL(c, dataReaded, ([this, cid](PIByteArray & ba){
@@ -41,14 +45,19 @@ void CloudServer::addClient(DispatcherClient * c) {
void CloudServer::removeClient(DispatcherClient * c) { void CloudServer::removeClient(DispatcherClient * c) {
last_ping.reset(); last_ping.reset();
mutex_clients.lock();
clients.removeOne(c); clients.removeOne(c);
index_clients.remove(c->clientId()); index_clients.remove(c->clientId());
mutex_clients.unlock();
server->sendDisconnected(c->clientId()); server->sendDisconnected(c->clientId());
} }
PIVector<DispatcherClient *> CloudServer::getClients() { PIVector<DispatcherClient *> CloudServer::getClients() {
return clients; mutex_clients.lock();
PIVector<DispatcherClient *> cl = clients;
mutex_clients.unlock();
return cl;
} }
@@ -58,14 +67,12 @@ double CloudServer::lastPing() {
void CloudServer::printStatus() { void CloudServer::printStatus() {
mutex_clients.lock();
piCout << " " << "Clients for" << server->address() << server_uuid.toHex() << ":"; piCout << " " << "Clients for" << server->address() << server_uuid.toHex() << ":";
for (auto c: clients) { for (auto c: clients) {
piCout << " " << c->address() << c->clientId(); piCout << " " << c->address() << c->clientId();
} }
// for (auto c: clients) { mutex_clients.unlock();
// c->sendData(PIByteArray::fromHex("000000"));
// server->sendDataToClient(PIByteArray::fromHex("000000"), c->clientId());
// }
} }

View File

@@ -23,6 +23,7 @@ private:
PIMap<uint, DispatcherClient*> index_clients; PIMap<uint, DispatcherClient*> index_clients;
PIByteArray server_uuid; PIByteArray server_uuid;
PITimeMeasurer last_ping; PITimeMeasurer last_ping;
PIMutex mutex_clients;
}; };
#endif // CLOUDSERVER_H #endif // CLOUDSERVER_H

View File

@@ -58,7 +58,18 @@ void DispatcherServer::cleanClients() {
for (auto c: ss) { for (auto c: ss) {
if (c->lastPing() > 15.0) { if (c->lastPing() > 15.0) {
piCout << "remove Server by ping timeout" << c->getConnection()->clientId(); piCout << "remove Server by ping timeout" << c->getConnection()->clientId();
PIVector<DispatcherClient *> cscv = c->getClients();
for(auto csc : cscv) {
clients.removeAll(csc);
index_c_clients.remove(csc);
c->removeClient(csc);
csc->close();
rmrf_clients << csc;
}
c_servers.remove(c->serverUUID());
index_c_servers.remove(c->getConnection());
rmrf_clients << const_cast<DispatcherClient *>(c->getConnection()); rmrf_clients << const_cast<DispatcherClient *>(c->getConnection());
delete c;
} }
} }
for (auto c: rm_clients) { for (auto c: rm_clients) {
@@ -250,8 +261,8 @@ void DispatcherServer::newConnection(PIEthernet *cl) {
map_mutex.unlock(); map_mutex.unlock();
}); });
//piCoutObj << "add client" << client; //piCoutObj << "add client" << client;
client->start();
map_mutex.lock(); map_mutex.lock();
clients.push_back(client); clients.push_back(client);
map_mutex.unlock(); map_mutex.unlock();
client->start();
} }