try to fix hang

This commit is contained in:
2024-06-06 22:12:47 +03:00
parent 1184a66960
commit 7d3c135e99

View File

@@ -1,13 +1,15 @@
#include "server_client.h"
#include "server.h"
#include "piqt.h"
#include "SH_network_types.h"
#include "SH_packages.h"
#include <QDebug>
#include "ccm_SHS_shared.h"
#include "piqt.h"
#include "server.h"
#include <QCryptographicHash>
#include <QDebug>
#include <QMetaObject>
#include <piethernet.h>
#include "ccm_SHS_shared.h"
ServerClient::ServerClient(ServerCore * s, SHNetworkTypes::ChannelType c): PIThread(), server(s), channel_type(c) {
@@ -17,12 +19,12 @@ ServerClient::ServerClient(ServerCore * s, SHNetworkTypes::ChannelType c): PIThr
void ServerClient::init(PIIODevice * e) {
device = e;
tcp = new SHStreamTCP();
diag = new PIDiagnostics();
tcp = new SHStreamTCP();
diag = new PIDiagnostics();
diag->setDisconnectTimeout(3);
info.connected_time = QDateTime::currentDateTime();
info.address = PI2QString(e->path());
//ka_tm.reset();
info.address = PI2QString(e->path());
// ka_tm.reset();
CONNECTU(device, threadedReadEvent, tcp, received);
CONNECTU(tcp, packetReceivedSH, this, packetRec);
CONNECTU(tcp, sendRequest, this, sendRequest);
@@ -32,24 +34,25 @@ void ServerClient::init(PIIODevice * e) {
void ServerClient::close() {
if (device)
device->stop();
if (device) device->stop();
closing = true;
}
void ServerClient::destroy() {
//piCout << "~ServerClient ...";
// piCout << "~ServerClient ...";
PIThread::stop();
//piCout << "~ServerClient stop";
if (!PIThread::waitForFinish(10000))
PIThread::terminate();
if (device)
device->stopAndWait();
//piCout << "~ServerClient stop done";
// piCout << "~ServerClient stop";
if (device) device->stop();
if (!PIThread::waitForFinish(10000)) PIThread::terminate();
if (device) {
device->stopAndWait(10000);
if (device->isThreadedRead()) piCout << "[ServerClient::destroy]" << device << "can`t stop ...";
}
// piCout << "~ServerClient stop done";
piDeleteSafety(tcp);
piDeleteSafety(diag);
//piCout << "~ServerClient done";
// piCout << "~ServerClient done";
}
@@ -75,7 +78,7 @@ void ServerClient::queueSend(const PIByteArray & d, bool gr_force) {
new_gr_queue << d;
}
}
//piCout << "queueSend" << Q2PIString(hostname) << d.size();
// piCout << "queueSend" << Q2PIString(hostname) << d.size();
}
@@ -106,12 +109,11 @@ void ServerClient::run() {
}
setQueueSending(true);
for (const PIByteArray & d: dq) {
if (terminating)
break;
if (terminating) break;
tcp->send(d);
}
setQueueSending(false);
//ka_tm.reset();
// ka_tm.reset();
}
@@ -124,10 +126,10 @@ PIString ServerClient::hostType() {
void ServerClient::updateInfo() {
if (!diag) return;
info.sended_bytes = diag->state().sended_bytes;
info.sended_bytes = diag->state().sended_bytes;
info.received_bytes = diag->state().received_bytes;
info.send_speed = PI2QString(diag->sendSpeed());
info.receive_speed = PI2QString(diag->receiveSpeed());
info.send_speed = PI2QString(diag->sendSpeed());
info.receive_speed = PI2QString(diag->receiveSpeed());
}
@@ -143,13 +145,13 @@ void ServerClient::applyAdminParameter(const PacketParameter & p) {
void ServerClient::setRole(int index) {
if (new_user_role == index) return;
new_user_role = index;
role_changed = true;
role_changed = true;
}
void ServerClient::newRoleDone() {
info.user_role = user_role = new_user_role;
role_changed = false;
role_changed = false;
}
@@ -161,18 +163,20 @@ void ServerClient::sendRequest(PIByteArray data) {
void ServerClient::sendGraphicHistoty(const QString & vname) {
PIVector<PIPair<ullong, double>> history = server->engine.getVariableHistory(vname);
//piCout << "sendGraphicHistoty" << Q2PIString(vname) << "...";
const int frame_size = 1024;
// piCout << "sendGraphicHistoty" << Q2PIString(vname) << "...";
const int frame_size = 1024;
PIByteArray frag;
frag.reserve(frame_size * 8 + sizeof(PacketHeader));
if (!history.isEmpty()) {
int fcnt = (history.size_s() - 1) / frame_size + 1, pst = 0, pcnt = 0;
//piCout << "crypt_frag send" << fcnt << "frags";
// piCout << "crypt_frag send" << fcnt << "frags";
SHS::PacketGraphicData gd(vname);
gd.data.reserve(frame_size);
for (int i = 0; i < fcnt; ++i) {
if (i == fcnt - 1) pcnt = history.size_s() - pst;
else pcnt = frame_size;
if (i == fcnt - 1)
pcnt = history.size_s() - pst;
else
pcnt = frame_size;
gd.data = PIVector<PIPair<ullong, double>>(history.data(pst), pcnt);
pst += frame_size;
frag = SHNetworkTypes::makeHeader(SHNetworkTypes::GraphicData);
@@ -186,7 +190,7 @@ void ServerClient::sendGraphicHistoty(const QString & vname) {
frag = SHNetworkTypes::makeHeader(SHNetworkTypes::GraphicDone);
frag << Q2PIString(vname);
queueSend(frag, true);
//piCout << "sendGraphicHistoty" << Q2PIString(vname) << "done";
// piCout << "sendGraphicHistoty" << Q2PIString(vname) << "done";
}
@@ -204,14 +208,14 @@ void ServerClient::recGraphicsRequest(PIStringList gl) {
void ServerClient::packetRec(PIByteArray data) {
//piCout << "packetRec" << data.toHex() << "...";
// piCout << "packetRec" << data.toHex() << "...";
if (diag) diag->received(data.size_s());
//piCout << "packetRec" << data.toHex();
// piCout << "packetRec" << data.toHex();
PacketHeader hdr = SHNetworkTypes::takeHeader(data);
if (hdr.type < 0) return;
PIMutex & rmutex(server->rmutex);
if ((hdr.type != SHNetworkTypes::Identification) && (hdr.type != SHNetworkTypes::ServerInfoRequest)) {
//PIMutexLocker _ml(rmutex);
// PIMutexLocker _ml(rmutex);
if (!tcp->isCheckHeader() || tcp->wasError()) {
qDebug() << "wrong in" << info.hostname << ", disconnect" << PI2QString(device->path());
return;
@@ -226,56 +230,52 @@ void ServerClient::packetRec(PIByteArray data) {
if (diag) diag->sended(kap.size_s());
}
rmutex.unlock();
}
break;
} break;
case SHNetworkTypes::Identification: {
PIMutexLocker _ml(rmutex);
server->receivedIdentification(this, data);
} break;
} break;
case SHNetworkTypes::KeyProbe: {
PIMutexLocker _ml(rmutex);
server->receivedKey(this, data);
} break;
case SHNetworkTypes::GuiSyncEvent:
server->sync.receiveObjectEvent(piqt(data));
break;
case SHNetworkTypes::ClientReady:
active = true;
break;
} break;
case SHNetworkTypes::GuiSyncEvent: server->sync.receiveObjectEvent(piqt(data)); break;
case SHNetworkTypes::ClientReady: active = true; break;
case SHNetworkTypes::RemotePultPackageNameRequest: {
PIMutexLocker _ml(rmutex);
SHPackages::Package p = PACKAGES->packageForHost(hostType());
PIByteArray ba = SHNetworkTypes::makeHeader(SHNetworkTypes::RemotePultPackageName);
PIByteArray ba = SHNetworkTypes::makeHeader(SHNetworkTypes::RemotePultPackageName);
ba << p.name;
piCoutObj << "answer to RemotePultPackageNameRequest" << p.name << "(" << info.OS_type << Q2PIString(info.OS_arch) << ")";
queueSend(ba);
} break;
} break;
case SHNetworkTypes::RemotePultPackageContentRequest: {
PIString host_type;
{
PIMutexLocker _ml(rmutex);
host_type = hostType();
PIMutexLocker _ml(rmutex);
host_type = hostType();
}
PACKAGES->loadPackageForHost(host_type);
{
PIMutexLocker _ml(rmutex);
PIByteArray ba = SHNetworkTypes::makeHeader(SHNetworkTypes::RemotePultPackageContent);
ba << PACKAGES->packageForHost(host_type);
piCoutObj << "send package" << ba.size_s();
queueSend(ba);
} } break;
PIMutexLocker _ml(rmutex);
PIByteArray ba = SHNetworkTypes::makeHeader(SHNetworkTypes::RemotePultPackageContent);
ba << PACKAGES->packageForHost(host_type);
piCoutObj << "send package" << ba.size_s();
queueSend(ba);
}
} break;
case SHNetworkTypes::IdentificationRequest: {
PIMutexLocker _ml(rmutex);
PacketIdentification ident = PacketIdentification::makeLocal(0, SHS_NETWORK_VERSION, SHS_UISYNC_VERSION);
PIByteArray ba = SHNetworkTypes::makeHeader(SHNetworkTypes::Identification);
PIByteArray ba = SHNetworkTypes::makeHeader(SHNetworkTypes::Identification);
ba << ident;
queueSend(ba);
} break;
} break;
case SHNetworkTypes::ServerInfoRequest: {
PIMutexLocker _ml(rmutex);
PacketServerInfo info;
info.identify = PacketIdentification::makeLocal(0, SHS_NETWORK_VERSION, SHS_UISYNC_VERSION);
info.channel = channel_type;
info.channel = channel_type;
if (channel_type == SHNetworkTypes::Cloud)
info.connect_address = server->cloud_server.path() + "@" + server->cloud_server.serverName();
info.name = server->server_name_;
@@ -284,13 +284,13 @@ void ServerClient::packetRec(PIByteArray data) {
PIByteArray ba = SHNetworkTypes::makeHeader(SHNetworkTypes::ServerInfo);
ba << info;
queueSend(ba);
} break;
} break;
case SHNetworkTypes::ChangeUserRole: {
PIMutexLocker _ml(rmutex);
ServerUserRole role;
data >> role;
int role_index = role.index;
PIScopeExitCall failure([this,role_index](){
PIScopeExitCall failure([this, role_index]() {
PIByteArray ba = SHNetworkTypes::makeHeader(SHNetworkTypes::ChangeUserRoleResult);
ba << role_index << int(1);
queueSend(ba);
@@ -302,7 +302,7 @@ void ServerClient::packetRec(PIByteArray data) {
PIByteArray ba = SHNetworkTypes::makeHeader(SHNetworkTypes::ChangeUserRoleResult);
ba << role_index << int(0);
queueSend(ba);
} break;
} break;
case SHNetworkTypes::AdminBAFUpload: {
PIByteArray baf;
data >> baf;
@@ -310,11 +310,11 @@ void ServerClient::packetRec(PIByteArray data) {
rmutex.lock();
server->uploadedBAF(baf, this);
rmutex.unlock();
} break;
} break;
case SHNetworkTypes::AdminDeviceForwardingRequest: {
piCoutObj << "AdminDeviceForwardingRequest";
PIVector<ServerDevice> dl;
PIVector<PIIODevice * > devs = RUNTIME->connection.boundedDevices();
PIVector<PIIODevice *> devs = RUNTIME->connection.boundedDevices();
for (auto * d: devs) {
dl << ServerDevice(d, RUNTIME->connection.deviceNames(d));
}
@@ -329,7 +329,7 @@ void ServerClient::packetRec(PIByteArray data) {
server->df_device.clear();
server->engine.connection()->setInactive(true);
tcp->send(ba);
} break;
} break;
case SHNetworkTypes::AdminDeviceForwardingSelect: {
PIMutexLocker _ml(rmutex);
if (info.role != SHServer::RoleDeviceForward) {
@@ -338,14 +338,14 @@ void ServerClient::packetRec(PIByteArray data) {
}
data >> server->df_device;
piCoutObj << "AdminDeviceForwardingSelect" << server->df_device;
} break;
} break;
case SHNetworkTypes::AdminDeviceForwardingWrite: {
PIString path;
PIByteArray ba;
data >> path >> ba;
piCoutObj << "AdminDeviceForwardingWrite" << path << ba.size_s();
RUNTIME->connection.writeByFullPath(path, ba);
} break;
} break;
case SHNetworkTypes::AdminRestart:
piCoutObj << "AdminRestart";
tcp->send(SHNetworkTypes::makeHeader(SHNetworkTypes::AdminRestartQueued));
@@ -355,18 +355,18 @@ void ServerClient::packetRec(PIByteArray data) {
PacketParameter p;
data >> p;
applyAdminParameter(p);
} break;
} break;
case SHNetworkTypes::GraphicRequest: {
PIStringList vars;
data >> vars;
postGraphicsRequest(vars);
//QMetaObject::invokeMethod(this, "recGraphicsRequest", Qt::QueuedConnection, Q_ARG(QStringList, PI2QStringList(vars)));
} break;
// QMetaObject::invokeMethod(this, "recGraphicsRequest", Qt::QueuedConnection, Q_ARG(QStringList, PI2QStringList(vars)));
} break;
case SHNetworkTypes::AdminUpdatePlugins: {
QStringList names;
data >> names;
server->updatePlugins(names, this);
} break;
} break;
default: break;
}
}