Files
pip/libs/cloud/picloudserver.cpp
peri4 1c7fc39b6c version 4.0.0_alpha
in almost all methods removed timeouts in milliseconds, replaced to PISystemTime
PITimer rewrite, remove internal impl, now only thread implementation, API similar to PIThread
PITimer API no longer pass void*
PIPeer, PIConnection improved stability on reinit and exit
PISystemTime new methods
pisd now exit without hanging
2024-07-30 14:18:02 +03:00

301 lines
7.4 KiB
C++

/*
PIP - Platform Independent Primitives
PICloud Server
Ivan Pelipenko peri4ko@yandex.ru, Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "picloudserver.h"
#include "piliterals_time.h"
PICloudServer::PICloudServer(const PIString & path, PIIODevice::DeviceMode mode): PIIODevice(path, mode), PICloudBase() {
PIString server_name = "PCS_" + PIString::fromNumber(randomi() % 1000);
tcp.setRole(PICloud::TCP::Server);
tcp.setServerName(server_name);
setName("cloud_server__" + server_name);
is_deleted = false;
eth.setReopenEnabled(false);
setThreadedReadBufferSize(eth.threadedReadBufferSize());
CONNECT1(void, PIByteArray, &streampacker, packetReceiveEvent, this, _readed);
CONNECTL(&eth, connected, [this]() {
open_mutex.lock();
opened_ = true;
cvar.notifyOne();
open_mutex.unlock();
piCoutObj << "connected";
tcp.sendStart();
});
CONNECTL(&eth, disconnected, [this](bool) {
if (is_deleted) return;
piCoutObj << "disconnected";
clients_mutex.lock();
for (auto c: clients_) {
c->is_connected = false;
c->close();
}
removed_clients_.append(clients_);
clients_.clear();
index_clients.clear();
clients_mutex.unlock();
open_mutex.lock();
opened_ = false;
cvar.notifyOne();
open_mutex.unlock();
ping_timer.stop();
});
ping_timer.setSlot([this]() {
if (eth.isConnected()) tcp.sendPing();
});
}
PICloudServer::~PICloudServer() {
// piCoutObj << "~PICloudServer ..." << this;
is_deleted = true;
stop();
close();
waitThreadedReadFinished();
// piCout << "wait";
while (removed_clients_.isNotEmpty()) {
Client * c = removed_clients_.take_back();
delete c;
}
// piCoutObj << "~PICloudServer done" << this;
}
void PICloudServer::setServerName(const PIString & server_name) {
setName("cloud_server__" + server_name);
tcp.setServerName(server_name);
}
PIVector<PICloudServer::Client *> PICloudServer::clients() const {
PIMutexLocker _ml(clients_mutex);
return clients_;
}
bool PICloudServer::openDevice() {
piCoutObj << "open device" << path();
if (is_deleted) return false;
bool op = eth.connect(PINetworkAddress::resolve(path()), false);
if (op) {
eth.startThreadedRead();
ping_timer.start(5_s);
return true;
} else {
ping_timer.stop();
eth.close();
return false;
}
}
bool PICloudServer::closeDevice() {
// piCoutObj << "closeDevice" << this;
eth.stopAndWait();
ping_timer.stop();
eth.close();
cvar.notifyOne();
clients_mutex.lock();
for (auto c: clients_) {
c->is_connected = false;
c->close();
}
removed_clients_.append(clients_);
clients_.clear();
index_clients.clear();
clients_mutex.unlock();
return true;
}
ssize_t PICloudServer::readDevice(void * read_to, ssize_t max_size) {
if (is_deleted) return -1;
// piCoutObj << "readDevice";
open_mutex.lock();
if (isOpened()) cvar.wait(open_mutex);
open_mutex.unlock();
// piCoutObj << "opened_ = " << opened_;
// else piMSleep(eth.readTimeout());
return -1;
}
ssize_t PICloudServer::writeDevice(const void * data, ssize_t max_size) {
// piCoutObj << "writeDevice";
return -1;
}
void PICloudServer::interrupt() {
eth.interrupt();
cvar.notifyOne();
}
void PICloudServer::clientDisconnect(uint client_id) {
tcp.sendDisconnected(client_id);
}
int PICloudServer::sendData(const PIByteArray & data, uint client_id) {
if (!opened_) return -1;
return tcp.sendData(data, client_id);
}
PICloudServer::Client::Client(PICloudServer * srv, uint id): server(srv), client_id(id) {
setMode(PIIODevice::ReadWrite);
setReopenEnabled(false);
setThreadedReadBufferSize(server->threadedReadBufferSize());
is_connected = true;
}
PICloudServer::Client::~Client() {
// piCoutObj << "~PICloudServer::Client..." << this;
close();
stopAndWait(10_s);
// piCoutObj << "~PICloudServer::Client done" << this;
}
bool PICloudServer::Client::openDevice() {
return is_connected;
}
bool PICloudServer::Client::closeDevice() {
// piCoutObj << "closeDevice" << this;
if (is_connected) {
server->clientDisconnect(client_id);
is_connected = false;
}
cond_buff.notifyOne();
return true;
}
ssize_t PICloudServer::Client::readDevice(void * read_to, ssize_t max_size) {
if (!is_connected) return -1;
ssize_t sz = -1;
mutex_buff.lock();
if (is_connected) {
if (buff.isEmpty()) {
sz = 0;
} else {
sz = piMini(max_size, buff.size());
memcpy(read_to, buff.data(), sz);
buff.remove(0, sz);
}
if (sz == 0) cond_buff.wait(mutex_buff);
}
mutex_buff.unlock();
return sz;
}
ssize_t PICloudServer::Client::writeDevice(const void * data, ssize_t size) {
if (!is_connected) return -1;
return server->sendData(PIByteArray(data, size), client_id);
}
void PICloudServer::Client::interrupt() {
cond_buff.notifyOne();
}
void PICloudServer::Client::pushBuffer(const PIByteArray & ba) {
if (!is_connected) return;
mutex_buff.lock();
if (buff.size_s() > threadedReadBufferSize()) {
piCoutObj << "Error: buffer overflow, drop" << ba.size() << "bytes";
mutex_buff.unlock();
return;
}
buff.append(ba);
cond_buff.notifyOne();
mutex_buff.unlock();
}
void PICloudServer::_readed(PIByteArray & ba) {
if (is_deleted) return;
PIPair<PICloud::TCP::Type, PICloud::TCP::Role> hdr = tcp.parseHeader(ba);
if (hdr.second == tcp.role()) {
switch (hdr.first) {
case PICloud::TCP::Connect: {
uint id = tcp.parseConnect(ba);
clients_mutex.lock();
Client * oc = index_clients.value(id, nullptr);
clients_mutex.unlock();
if (oc) {
piCoutObj << "Warning: reject client with duplicated ID";
tcp.sendDisconnected(id);
} else {
Client * c = new Client(this, id);
// piCoutObj << "new Client" << id << c;
CONNECT1(void, PIObject *, c, deleted, this, clientDeleted);
clients_mutex.lock();
clients_ << c;
index_clients.insert(id, c);
clients_mutex.unlock();
newConnection(c);
}
} break;
case PICloud::TCP::Disconnect: {
uint id = tcp.parseDisconnect(ba);
// piCoutObj << "Close on logic";
clients_mutex.lock();
Client * oc = index_clients.take(id, nullptr);
clients_.removeOne(oc);
clients_mutex.unlock();
if (oc) {
oc->stopAndWait();
oc->is_connected = false;
oc->close();
removed_clients_ << oc;
// delete oc;
}
} break;
case PICloud::TCP::Data: {
PIPair<uint, PIByteArray> d = tcp.parseDataServer(ba);
clients_mutex.lock();
Client * oc = index_clients.value(d.first, nullptr);
clients_mutex.unlock();
// piCoutObj << "data for" << d.first << d.second.size();
if (oc && !d.second.isEmpty()) oc->pushBuffer(d.second);
} break;
default: break;
}
}
}
void PICloudServer::clientDeleted(PIObject * o) {
PICloudServer::Client * c = (PICloudServer::Client *)o;
// piCoutObj << "clientDeleted" << c;
clients_mutex.lock();
clients_.removeOne(c);
removed_clients_.removeAll(c);
index_clients.removeWhere([c](uint, Client * v) { return v == c; });
clients_mutex.unlock();
}