important:

* PIThread::~PIThread() now unregister itself from introspection, if terminates than show warning
 * PISystemMonitor now correctly stops
 * PIPeer now can correctly stopAndWait
 * PIPeer::destroy(), protected method for close all eths and threads
 * new PIINTROSPECTION_STOP macro
 * Introspection now can be correctly stopped by macro, more safety

ClientServer:
 * ClientBase::close() stop and disconnect channel
 * Server clients clean-up now event-based
 * No warnings on client destructor
This commit is contained in:
2024-09-12 17:07:48 +03:00
parent da4b09be9e
commit 996b7ea403
17 changed files with 189 additions and 99 deletions

View File

@@ -2,6 +2,7 @@
#include "piclientserver_client.h"
#include "piclientserver_server.h"
#include "picodeparser.h"
#include "piintrospection_server.h"
#include "piiostream.h"
#include "pijson.h"
#include "pimathbase.h"
@@ -22,12 +23,15 @@ PIKbdListener kbd;
class MyServerClient: public PIClientServer::ServerClient {
public:
~MyServerClient() { send_thread.stopAndWait(); }
protected:
void readed(PIByteArray data) override { piCout << "readed" << (data.size()); }
void aboutDelete() override { piCout << "aboutDelete"; }
void disconnected() override { piCout << "disconnected"; }
// void readed(PIByteArray data) override { piCout << "readed" << (data.size()); }
// void aboutDelete() override { piCout << "aboutDelete"; }
// void disconnected() override { piCout << "disconnected"; }
void connected() override {
piCout << "connected";
// piCout << "connected";
send_thread.start(
[this] {
// write((PIString::fromNumber(++counter)).toUTF8());
@@ -42,16 +46,18 @@ protected:
class MyClient: public PIClientServer::Client {
public:
~MyClient() { send_thread.stopAndWait(); }
protected:
void readed(PIByteArray data) override { piCout << "readed" << (data.size()); }
// void readed(PIByteArray data) override { piCout << "readed" << (data.size()); }
void disconnected() override { piCout << "disconnected"; }
void connected() override {
piCout << "connected";
send_thread.start(
[this] {
// write((PIString::fromNumber(++counter)).toUTF8());
PIByteArray ba(64_KiB);
write(ba);
write(PIByteArray(64_KiB));
},
2_Hz);
}
@@ -61,11 +67,34 @@ protected:
int main(int argc, char * argv[]) {
/*PIPeer p("123");
piCout << "start ...";
p.start();
piCout << "start ok";
piSleep(1.);
piCout << "stop ...";
p.stopAndWait();
piCout << "stop ok";
piSleep(1.);
piCout << "exit";
return 0;*/
if (argc > 1) {
PIINTROSPECTION_START(server);
} else {
PIINTROSPECTION_START(client);
}
kbd.enableExitCapture();
PIClientServer::Server * s = nullptr;
PIThread s_thread;
PIThread * s_thread = new PIThread();
PIVector<PIClientServer::Client *> cv;
if (argc > 1) {
@@ -74,21 +103,22 @@ int main(int argc, char * argv[]) {
s->setClientFactory([] { return new MyServerClient(); });
s->enableSymmetricEncryption("1122334455667788"_hex);
s->listenAll(12345);
s_thread.start(
s_thread->start(
[s] {
piCout << "*** clients" << s->clientsCount();
int i = 0;
s->forEachClient([&i](PIClientServer::ServerClient * c) {
piCout << "client" << ++i << c;
c->write(PIByteArray(16_KiB));
piMSleep(200);
// piCout << "client" << ++i << c;
c->write(PIByteArray(1_KiB));
c->close();
// piMSleep(200);
});
},
0.5_Hz);
1._Hz);
} else {
piCout << "Client";
piForTimes(5) {
piMSleep(50);
piForTimes(20) {
piMSleep(25);
auto c = new MyClient();
c->enableSymmetricEncryption("1122334455667788"_hex);
c->connect(PINetworkAddress::resolve("127.0.0.1", 12345));
@@ -98,11 +128,13 @@ int main(int argc, char * argv[]) {
WAIT_FOR_EXIT;
s_thread.stopAndWait();
s_thread->stopAndWait();
piDeleteSafety(s);
piDeleteAllAndClear(cv);
PIINTROSPECTION_STOP
return 0;
/*PIPackedTCP * tcp_s =
PIIODevice::createFromFullPath("ptcp://s::8000")->cast<PIPackedTCP>(); // new PIPackedTCP(PIPackedTCP::Server, {"0.0.0.0:8000"});