Compare commits

3 Commits

Author SHA1 Message Date
Бычков Андрей
d9eac06749 pithread, pitimer stop, stopAndWait 2022-11-09 17:17:21 +03:00
Бычков Андрей
f9c1ef5ba4 Merge branch 'thread' of https://git.shs.tools/SHS/pip into thread 2022-11-09 17:04:28 +03:00
Бычков Андрей
8738043dce some PICloud and PIEthernet fixes 2022-11-09 17:04:13 +03:00
14 changed files with 45 additions and 30 deletions

View File

@@ -128,7 +128,7 @@ ssize_t PICloudClient::readDevice(void * read_to, ssize_t max_size) {
ssize_t PICloudClient::writeDevice(const void * data, ssize_t size) { ssize_t PICloudClient::writeDevice(const void * data, ssize_t size) {
if (is_deleted) return -1; if (is_deleted || !is_connected) return -1;
piCoutObj << "writeDevice" << size; piCoutObj << "writeDevice" << size;
return tcp.sendData(PIByteArray(data, size)); return tcp.sendData(PIByteArray(data, size));
} }

View File

@@ -25,10 +25,12 @@ PICloudServer::PICloudServer(const PIString & path, PIIODevice::DeviceMode mode)
tcp.setRole(PICloud::TCP::Server); tcp.setRole(PICloud::TCP::Server);
tcp.setServerName(server_name); tcp.setServerName(server_name);
setName("cloud_server__" + server_name); setName("cloud_server__" + server_name);
is_deleted = false;
eth.setReopenEnabled(false); eth.setReopenEnabled(false);
CONNECT1(void, PIByteArray, &streampacker, packetReceiveEvent, this, _readed); CONNECT1(void, PIByteArray, &streampacker, packetReceiveEvent, this, _readed);
CONNECTL(&eth, connected, [this](){opened_ = true; piCoutObj << "connected" << &eth; tcp.sendStart();}); CONNECTL(&eth, connected, [this](){opened_ = true; piCoutObj << "connected" << &eth; tcp.sendStart();});
CONNECTL(&eth, disconnected, [this](bool){ CONNECTL(&eth, disconnected, [this](bool){
if (is_deleted) return;
piCoutObj << "disconnected" << &eth; piCoutObj << "disconnected" << &eth;
for (auto c : clients_) { for (auto c : clients_) {
delete c; delete c;
@@ -45,6 +47,7 @@ PICloudServer::PICloudServer(const PIString & path, PIIODevice::DeviceMode mode)
PICloudServer::~PICloudServer() { PICloudServer::~PICloudServer() {
piCoutObj << "~PICloudServer ..." << this; piCoutObj << "~PICloudServer ..." << this;
is_deleted = true;
stop(); stop();
close(); close();
piCout << "wait"; piCout << "wait";
@@ -94,6 +97,7 @@ bool PICloudServer::closeDevice() {
ssize_t PICloudServer::readDevice(void * read_to, ssize_t max_size) { ssize_t PICloudServer::readDevice(void * read_to, ssize_t max_size) {
if (is_deleted) return -1;
//piCoutObj << "readDevice"; //piCoutObj << "readDevice";
if (!opened_) openDevice(); if (!opened_) openDevice();
//else piMSleep(eth.readTimeout()); //else piMSleep(eth.readTimeout());
@@ -118,6 +122,7 @@ void PICloudServer::clientDisconnect(uint client_id) {
int PICloudServer::sendData(const PIByteArray & data, uint client_id) { int PICloudServer::sendData(const PIByteArray & data, uint client_id) {
if (!opened_) return -1;
return tcp.sendData(data, client_id); return tcp.sendData(data, client_id);
} }
@@ -173,6 +178,7 @@ ssize_t PICloudServer::Client::readDevice(void * read_to, ssize_t max_size) {
ssize_t PICloudServer::Client::writeDevice(const void * data, ssize_t size) { ssize_t PICloudServer::Client::writeDevice(const void * data, ssize_t size) {
if (!is_connected) return -1;
return server->sendData(PIByteArray(data, size), client_id); return server->sendData(PIByteArray(data, size), client_id);
} }
@@ -193,6 +199,7 @@ void PICloudServer::Client::pushBuffer(const PIByteArray & ba) {
void PICloudServer::_readed(PIByteArray & ba) { void PICloudServer::_readed(PIByteArray & ba) {
if (is_deleted) return;
PIPair<PICloud::TCP::Type, PICloud::TCP::Role> hdr = tcp.parseHeader(ba); PIPair<PICloud::TCP::Type, PICloud::TCP::Role> hdr = tcp.parseHeader(ba);
if (hdr.second == tcp.role()) { if (hdr.second == tcp.role()) {
switch (hdr.first) { switch (hdr.first) {
@@ -222,8 +229,7 @@ void PICloudServer::_readed(PIByteArray & ba) {
clients_mutex.unlock(); clients_mutex.unlock();
if (oc) { if (oc) {
oc->stopAndWait(); oc->stopAndWait();
//oc->is_connected = false; oc->is_connected = false;
//oc->close();
delete oc; delete oc;
} }
} break; } break;

View File

@@ -588,7 +588,7 @@ void PIScreen::waitForFinish() {
void PIScreen::stop(bool clear) { void PIScreen::stop(bool clear) {
PIThread::stop(true); PIThread::stopAndWait();
if (clear) console.clearScreen(); if (clear) console.clearScreen();
#ifndef WINDOWS #ifndef WINDOWS
fflush(0); fflush(0);

View File

@@ -86,6 +86,7 @@ private:
PIMap<uint, Client *> index_clients; PIMap<uint, Client *> index_clients;
PITimer ping_timer; PITimer ping_timer;
mutable PIMutex clients_mutex; mutable PIMutex clients_mutex;
std::atomic_bool is_deleted;
}; };
#endif // PICLOUDSERVER_H #endif // PICLOUDSERVER_H

View File

@@ -764,8 +764,9 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) {
return -1; return -1;
} }
if (connected_) { if (connected_) {
connected_ = false;
opened_ = false;
piCoutObj << "Disconnect on read," << ethErrorString(); piCoutObj << "Disconnect on read," << ethErrorString();
opened_ = connected_ = false;
closeSocket(sock); closeSocket(sock);
init(); init();
disconnected(rs < 0); disconnected(rs < 0);
@@ -855,8 +856,10 @@ ssize_t PIEthernet::writeDevice(const void * data, ssize_t max_size) {
} }
if (!connected_) return -1; if (!connected_) return -1;
auto disconnectFunc = [this](){ auto disconnectFunc = [this](){
if (!connected_) return;
connected_ = false;
opened_ = false;
piCoutObj << "Disconnect on write," << ethErrorString(); piCoutObj << "Disconnect on write," << ethErrorString();
opened_ = connected_ = false;
closeSocket(sock); closeSocket(sock);
init(); init();
disconnected(true); disconnected(true);

View File

@@ -192,7 +192,7 @@ PIPeer::~PIPeer() {
if (p._data) { if (p._data) {
p._data->dt_in.stop(); p._data->dt_in.stop();
p._data->dt_out.stop(); p._data->dt_out.stop();
p._data->t.stop(true); p._data->t.stopAndWait();
} }
destroyEths(); destroyEths();
piForeach (PIEthernet * i, eths_mcast) { piForeach (PIEthernet * i, eths_mcast) {

View File

@@ -620,10 +620,9 @@ void PIThread::interrupt() {
} }
void PIThread::stop(bool wait) { void PIThread::stop() {
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "stop ..." << running_ << wait; //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "stop ..." << running_ << wait;
terminating = true; terminating = true;
if (wait) waitForFinish();
} }

View File

@@ -107,12 +107,11 @@ public:
bool start(std::function<void()> func, int timer_delay) {ret_func = [func](void*){func();}; return start(timer_delay);} bool start(std::function<void()> func, int timer_delay) {ret_func = [func](void*){func();}; return start(timer_delay);}
EVENT_HANDLER0(bool, startOnce); EVENT_HANDLER0(bool, startOnce);
EVENT_HANDLER1(bool, startOnce, ThreadFunc, func) {ret_func = func; return startOnce();} EVENT_HANDLER1(bool, startOnce, ThreadFunc, func) {ret_func = func; return startOnce();}
EVENT_HANDLER0(void, stop) {stop(false);} EVENT_HANDLER0(void, stop);
EVENT_HANDLER1(void, stop, bool, wait);
EVENT_HANDLER0(void, terminate); EVENT_HANDLER0(void, terminate);
//! \~english Stop thread and wait for finish. //! \~english Stop thread and wait for finish.
//! \~russian Останавливает потоков и ожидает завершения. //! \~russian Останавливает поток и ожидает завершения.
void stopAndWait(int timeout_ms = -1); void stopAndWait(int timeout_ms = -1);
void interrupt(); void interrupt();
@@ -201,7 +200,7 @@ public:
//! \~english Start thread without internal loop //! \~english Start thread without internal loop
//! \~russian Запускает поток без внутреннего цикла //! \~russian Запускает поток без внутреннего цикла
//! \fn void stop(bool wait = false) //! \fn void stop()
//! \brief //! \brief
//! \~english Stop thread //! \~english Stop thread
//! \~russian Останавливает поток //! \~russian Останавливает поток
@@ -209,7 +208,7 @@ public:
//! \fn void terminate() //! \fn void terminate()
//! \brief //! \brief
//! \~english Strongly stop thread //! \~english Strongly stop thread
//! \~russian Жестко останавливает поток //! \~russian Жёстко прерывает поток
//! \fn bool waitForStart(int timeout_msecs = -1) //! \fn bool waitForStart(int timeout_msecs = -1)
//! \brief //! \brief

View File

@@ -114,7 +114,7 @@ PIThreadPoolLoop::PIThreadPoolLoop(int thread_cnt) {
PIThreadPoolLoop::~PIThreadPoolLoop() { PIThreadPoolLoop::~PIThreadPoolLoop() {
for (auto * t: threads) { for (auto * t: threads) {
t->stop(false); t->stop();
if (!t->waitForFinish(100)) if (!t->waitForFinish(100))
t->terminate(); t->terminate();
delete t; delete t;
@@ -135,7 +135,7 @@ void PIThreadPoolLoop::start(int index_start, int index_count) {
while (1) { while (1) {
int cc = counter.fetch_add(1); int cc = counter.fetch_add(1);
if (cc >= end) { if (cc >= end) {
t->stop(false); t->stop();
return; return;
} }
func(cc); func(cc);

View File

@@ -274,15 +274,12 @@ bool _PITimerImp_Thread::startTimer(double interval_ms) {
bool _PITimerImp_Thread::stopTimer(bool wait) { bool _PITimerImp_Thread::stopTimer(bool wait) {
#ifndef FREERTOS
thread_.stop(wait);
#else
thread_.stop(); thread_.stop();
if (wait) if (wait) return thread_.waitForFinish();
if (!thread_.waitForFinish(10)) // if (wait)
if (thread_.isRunning()) // if (!thread_.waitForFinish(10))
thread_.terminate(); // if (thread_.isRunning())
#endif // thread_.terminate();
return true; return true;
} }
@@ -739,3 +736,9 @@ bool PITimer::waitForFinish(int timeout_msecs) {
piMinSleep(); piMinSleep();
return tm.elapsed_m() < timeout_msecs; return tm.elapsed_m() < timeout_msecs;
} }
void PITimer::stopAndWait(int timeout_ms) {
init();
imp->stop(false);
waitForFinish(timeout_ms);
}

View File

@@ -164,6 +164,10 @@ public:
EVENT_HANDLER1(bool, stop, bool, wait); EVENT_HANDLER1(bool, stop, bool, wait);
bool waitForFinish() {return waitForFinish(-1);} bool waitForFinish() {return waitForFinish(-1);}
bool waitForFinish(int timeout_msecs); bool waitForFinish(int timeout_msecs);
//! \~english Stop timer and wait for finish.
//! \~russian Останавливает таймер и ожидает завершения.
void stopAndWait(int timeout_ms = -1);
//! \~english Set custom data //! \~english Set custom data
//! \~russian Установить данные, передаваемые в метод таймера //! \~russian Установить данные, передаваемые в метод таймера

View File

@@ -11,7 +11,7 @@ int main(int argc, char * argv[]) {
PICloudClient c("127.0.0.1:10101"); PICloudClient c("127.0.0.1:10101");
// c.setReopenEnabled(true); // c.setReopenEnabled(true);
PICloudServer s("127.0.0.1:10101"); PICloudServer s("127.0.0.1:10101");
PIVector<PICloudServer::Client *> clients; auto clients = new PIVector<PICloudServer::Client *>();
CONNECTL(&tm, tickEvent, ([&](void *, int){ CONNECTL(&tm, tickEvent, ([&](void *, int){
if (c.isConnected()) { if (c.isConnected()) {
PIString str = "ping"; PIString str = "ping";
@@ -19,7 +19,7 @@ int main(int argc, char * argv[]) {
c.write(str.toByteArray()); c.write(str.toByteArray());
} }
if (s.isThreadedRead()) { if (s.isThreadedRead()) {
for (auto cl : clients) { for (auto cl : *clients) {
if (cl->isOpened()) { if (cl->isOpened()) {
PIString str = "ping_S"; PIString str = "ping_S";
piCout << "[Server] send to" << cl << ":" << str; piCout << "[Server] send to" << cl << ":" << str;
@@ -40,7 +40,7 @@ int main(int argc, char * argv[]) {
CONNECTL(&c, disconnected, ([](){piCout << "disconnected";})); CONNECTL(&c, disconnected, ([](){piCout << "disconnected";}));
CONNECTL(&s, newConnection, ([&](PICloudServer::Client * cl){ CONNECTL(&s, newConnection, ([&](PICloudServer::Client * cl){
piCout << "[Server] new client:" << cl; piCout << "[Server] new client:" << cl;
clients << cl; clients->append(cl);
CONNECTL(cl, threadedReadEvent, ([cl, &rnd](const uchar * readed, ssize_t size){ CONNECTL(cl, threadedReadEvent, ([cl, &rnd](const uchar * readed, ssize_t size){
PIByteArray ba(readed, size); PIByteArray ba(readed, size);
PIString str = PIString(ba); PIString str = PIString(ba);
@@ -53,7 +53,7 @@ int main(int argc, char * argv[]) {
CONNECTL(cl, closed, ([&clients, cl](){ CONNECTL(cl, closed, ([&clients, cl](){
piCout << "[Server] client closed ..." << cl; piCout << "[Server] client closed ..." << cl;
cl->stop(); cl->stop();
clients.removeAll(cl); clients->removeAll(cl);
piCout << "[Server] client closed ok" << cl; piCout << "[Server] client closed ok" << cl;
})); }));
cl->startThreadedRead(); cl->startThreadedRead();

View File

@@ -21,7 +21,7 @@ DispatcherServer::~DispatcherServer() {
void DispatcherServer::start() { void DispatcherServer::start() {
eth.listen(true); eth.listen(true);
timeout_timer.start(5000); timeout_timer.start(2000);
piCoutObj << "server started" << eth.readAddress(); piCoutObj << "server started" << eth.readAddress();
} }

View File

@@ -24,7 +24,7 @@ Daemon::Remote::Remote(const PIString & n): PIThread() {
Daemon::Remote::~Remote() { Daemon::Remote::~Remote() {
shellClose(); shellClose();
ft.stop(); ft.stop();
stop(true); stopAndWait();
} }