From 6e5a5a6ade4cb645515196d294f7d195662b4193 Mon Sep 17 00:00:00 2001 From: Andrey Date: Fri, 29 Oct 2021 16:52:03 +0300 Subject: [PATCH 1/3] remove msleep, clean PIConditionVariable, rewrite pipipelinethread, etc... --- libs/main/core/piobject.cpp | 6 +- libs/main/core/pitime.cpp | 10 -- libs/main/core/pitime.h | 2 - libs/main/io_devices/piethernet.cpp | 4 +- libs/main/io_devices/piiodevice.cpp | 4 +- libs/main/io_devices/piserial.cpp | 14 +- libs/main/system/piprocess.cpp | 2 +- libs/main/system/pisystemmonitor.cpp | 8 +- libs/main/thread/piconditionvar.cpp | 5 - libs/main/thread/pipipelinethread.h | 85 ++++----- libs/main/thread/pithread.cpp | 22 +-- libs/main/thread/pithread.h | 12 +- libs/main/thread/pitimer.cpp | 4 +- main.cpp | 161 +++++------------- main_picloud_test.cpp | 73 ++++++++ .../ConditionVariableIntegrationTest.cpp | 4 +- 16 files changed, 203 insertions(+), 213 deletions(-) create mode 100644 main_picloud_test.cpp diff --git a/libs/main/core/piobject.cpp b/libs/main/core/piobject.cpp index 1f2bfc64..71749360 100644 --- a/libs/main/core/piobject.cpp +++ b/libs/main/core/piobject.cpp @@ -708,7 +708,7 @@ PIObject::Deleter::Deleter() { stopping = started = posted = false; CONNECTL(&(PRIVATE->thread), started, [this](){proc();}); PRIVATE->thread.startOnce(); - while (!started) piMSleep(1); + while (!started) piMSleep(PIP_MIN_MSLEEP); } @@ -717,7 +717,7 @@ PIObject::Deleter::~Deleter() { stopping = true; PRIVATE->cond_var.notifyAll(); #ifndef WINDOWS - while (PRIVATE->thread.isRunning()) piMSleep(1); + while (PRIVATE->thread.isRunning()) piMSleep(PIP_MIN_MSLEEP); #endif deleteAll(); //piCout << "~Deleter ok"; @@ -780,7 +780,7 @@ void PIObject::Deleter::deleteObject(PIObject * o) { //piCout << "[Deleter] delete" << (uintptr_t)o << "..."; if (o->isPIObject()) { //piCout << "[Deleter] delete" << (uintptr_t)o << "wait atomic ..."; - while (o->isInEvent()) piMSleep(1); + while (o->isInEvent()) piMSleep(PIP_MIN_MSLEEP); //piCout << "[Deleter] delete" << (uintptr_t)o << "wait atomic done"; if (o->isPIObject()) delete o; } diff --git a/libs/main/core/pitime.cpp b/libs/main/core/pitime.cpp index 106befaa..f19e038f 100644 --- a/libs/main/core/pitime.cpp +++ b/libs/main/core/pitime.cpp @@ -469,13 +469,3 @@ PICout operator <<(PICout s, const PIDateTime & v) { return s; } - -#ifdef WINDOWS -void msleep(int msecs) {Sleep(msecs);} -#else -# ifdef FREERTOS -void msleep(int msecs) {vTaskDelay(msecs / portTICK_PERIOD_MS);} -# else -void msleep(int msecs) {usleep(msecs * 1000);} -# endif -#endif diff --git a/libs/main/core/pitime.h b/libs/main/core/pitime.h index fa5d977f..8416ab70 100644 --- a/libs/main/core/pitime.h +++ b/libs/main/core/pitime.h @@ -29,8 +29,6 @@ #ifdef QNX # include #endif -//! @brief Sleep for "msecs" milliseconds -PIP_EXPORT void msleep(int msecs); /*! @brief Precise sleep for "usecs" microseconds * \details This function consider \c "usleep" offset diff --git a/libs/main/io_devices/piethernet.cpp b/libs/main/io_devices/piethernet.cpp index 7e3f657d..2277184b 100644 --- a/libs/main/io_devices/piethernet.cpp +++ b/libs/main/io_devices/piethernet.cpp @@ -686,7 +686,7 @@ int PIEthernet::readDevice(void * read_to, int max_size) { s = accept(sock, (sockaddr * )&client_addr, &slen); if (s == -1) { //piCoutObj << "Can`t accept new connection, " << ethErrorString(); - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); return -1; } rs = ethRecv(s, read_to, max_size); @@ -788,7 +788,7 @@ int PIEthernet::writeDevice(const void * data, int max_size) { //piCoutObj << "connect SingleTCP" << ip_s << ":" << port_s << "..."; if (::connect(sock, (sockaddr * )&PRIVATE->addr_, sizeof(PRIVATE->addr_)) != 0) { //piCoutObj << "Can`t connect to " << ip_s << ":" << port_s << ", " << ethErrorString(); - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); return -1; } //piCoutObj << "ok, write SingleTCP" << int(data) << max_size << "bytes ..."; diff --git a/libs/main/io_devices/piiodevice.cpp b/libs/main/io_devices/piiodevice.cpp index a50042e2..62d14038 100644 --- a/libs/main/io_devices/piiodevice.cpp +++ b/libs/main/io_devices/piiodevice.cpp @@ -254,7 +254,7 @@ void PIIODevice::write_func() { int ret = write(item.first); threadedWriteEvent(item.second, ret); } - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); } } @@ -338,7 +338,7 @@ PIByteArray PIIODevice::readForTime(double timeout_ms) { tm.reset(); while (tm.elapsed_m() < timeout_ms) { ret = read(td, threaded_read_buffer_size); - if (ret <= 0) msleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); else str.append(td, ret); } delete[] td; diff --git a/libs/main/io_devices/piserial.cpp b/libs/main/io_devices/piserial.cpp index fb86d170..0c4c64fa 100644 --- a/libs/main/io_devices/piserial.cpp +++ b/libs/main/io_devices/piserial.cpp @@ -434,7 +434,7 @@ bool PISerial::read(void * data, int size, double timeout_ms) { while (all < size && tm_.elapsed_m() < timeout_ms) { ret = readDevice(&((uchar * )data)[all], size - all); if (ret > 0) all += ret; - else msleep(PIP_MIN_MSLEEP); + else piMSleep(PIP_MIN_MSLEEP); } setOption(BlockingRead, br); received(data, all); @@ -473,13 +473,13 @@ PIString PISerial::read(int size, double timeout_ms) { if (size <= 0) { while (tm_.elapsed_m() < timeout_ms) { ret = readDevice(td, 1024); - if (ret <= 0) msleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); else str << PIString((char*)td, ret); } } else { while (all < size && tm_.elapsed_m() < timeout_ms) { ret = readDevice(td, size - all); - if (ret <= 0) msleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); else { str << PIString((char*)td, ret); all += ret; @@ -493,7 +493,7 @@ PIString PISerial::read(int size, double timeout_ms) { str << PIString((char*)td, all); while (all < size) { ret = readDevice(td, size - all); - if (ret <= 0) msleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); else { str << PIString((char*)td, ret); all += ret; @@ -525,13 +525,13 @@ PIByteArray PISerial::readData(int size, double timeout_ms) { if (size <= 0) { while (tm_.elapsed_m() < timeout_ms) { ret = readDevice(td, 1024); - if (ret <= 0) msleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); else str.append(td, ret); } } else { while (all < size && tm_.elapsed_m() < timeout_ms) { ret = readDevice(td, size - all); - if (ret <= 0) msleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); else { str.append(td, ret); all += ret; @@ -545,7 +545,7 @@ PIByteArray PISerial::readData(int size, double timeout_ms) { str.append(td, all); while (all < size) { ret = readDevice(td, size - all); - if (ret <= 0) msleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); else { str.append(td, ret); all += ret; diff --git a/libs/main/system/piprocess.cpp b/libs/main/system/piprocess.cpp index 05e2d9cd..2f00d975 100644 --- a/libs/main/system/piprocess.cpp +++ b/libs/main/system/piprocess.cpp @@ -66,7 +66,7 @@ void PIProcess::exec_() { startOnce(); //cout << "exec wait" << endl; while (!is_exec) - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); //cout << "exec end" << endl; } diff --git a/libs/main/system/pisystemmonitor.cpp b/libs/main/system/pisystemmonitor.cpp index 1e2e39e2..4e33b848 100644 --- a/libs/main/system/pisystemmonitor.cpp +++ b/libs/main/system/pisystemmonitor.cpp @@ -145,9 +145,9 @@ bool PISystemMonitor::startOnSelf(int interval_ms) { PIVector PISystemMonitor::threadsStatistic() const { - mutex_.lock(); + lock(); PIVector ret = cur_ts; - mutex_.unlock(); + unlock(); return ret; } @@ -341,9 +341,9 @@ void PISystemMonitor::run() { //piCout << ts_new.cpu_load_user; } last_tm = cur_tm; - mutex_.lock(); + lock(); cur_ts = cur_tm.values(); - mutex_.unlock(); + unlock(); tstat.ram_total = totalRAM(); tstat.ram_used = usedRAM(); tstat.ram_free = freeRAM(); diff --git a/libs/main/thread/piconditionvar.cpp b/libs/main/thread/piconditionvar.cpp index 7eb476d9..d679c777 100644 --- a/libs/main/thread/piconditionvar.cpp +++ b/libs/main/thread/piconditionvar.cpp @@ -36,7 +36,6 @@ PRIVATE_DEFINITION_START(PIConditionVariable) #else pthread_cond_t nativeHandle; #endif - bool isDestroying; PRIVATE_DEFINITION_END(PIConditionVariable) @@ -44,7 +43,6 @@ PIConditionVariable::PIConditionVariable() { #ifdef WINDOWS InitializeConditionVariable(&PRIVATE->nativeHandle); #else - PRIVATE->isDestroying = false; pthread_condattr_t condattr; pthread_condattr_init(&condattr); @@ -84,7 +82,6 @@ void PIConditionVariable::wait(PIMutex& lk, const std::function& conditi #else pthread_cond_wait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle()); #endif - if (PRIVATE->isDestroying) return; } } @@ -100,7 +97,6 @@ bool PIConditionVariable::waitFor(PIMutex &lk, int timeoutMs) { st.toTimespec(&expire_ts); isNotTimeout = pthread_cond_timedwait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle(), &expire_ts) == 0; #endif - if (PRIVATE->isDestroying) return false; return isNotTimeout; } @@ -127,7 +123,6 @@ bool PIConditionVariable::waitFor(PIMutex& lk, int timeoutMs, const std::functio bool isTimeout = pthread_cond_timedwait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle(), &expire_ts) != 0; #endif if (isTimeout) return false; - if (PRIVATE->isDestroying) return false; } return true; } diff --git a/libs/main/thread/pipipelinethread.h b/libs/main/thread/pipipelinethread.h index add674bf..fd300cb1 100644 --- a/libs/main/thread/pipipelinethread.h +++ b/libs/main/thread/pipipelinethread.h @@ -25,6 +25,8 @@ #include "pithread.h" #include "piqueue.h" +#include "piconditionvar.h" + template class PIPipelineThread : public PIThread @@ -35,10 +37,10 @@ public: cnt = 0; max_size = 0; wait_next_pipe = false; - next_overload = false; } ~PIPipelineThread() { stop(); + cv.notifyAll(); if (!waitForFinish(1000)) { piCoutObj << "terminating self thread"; terminate(); @@ -46,21 +48,27 @@ public: } template void connectTo(PIPipelineThread * next) { - CONNECT2(void, Tout, bool *, this, calculated, next, enqueue) + CONNECT3(void, Tout, bool, bool *, this, calculated, next, enqueue) } - EVENT2(calculated, const Tout &, v, bool *, overload) - void enqueue(const Tin &v) {enqueue(v, 0);} - EVENT_HANDLER2(void, enqueue, const Tin &, v, bool *, overload) { + EVENT3(calculated, const Tout &, v, bool, wait, bool *, overload) + EVENT_HANDLER3(void, enqueue, const Tin &, v, bool, wait, bool *, overload) { mutex.lock(); //piCoutObj << "enque" << overload; + if (wait && max_size != 0) { + mutex_wait.lock(); + while (in.size() >= max_size) cv_wait.wait(mutex_wait); + mutex_wait.unlock(); + } if (max_size == 0 || in.size() < max_size) { in.enqueue(v); + cv.notifyAll(); if (overload) *overload = false; } else { if (overload) *overload = true; } mutex.unlock(); } + void enqueue(const Tin &v, bool wait = false) {enqueue(v, wait, nullptr);} const ullong * counterPtr() const {return &cnt;} ullong counter() const {return cnt;} bool isEmpty() { @@ -79,15 +87,18 @@ public: } void clear() { mutex.lock(); + mutex_wait.lock(); in.clear(); - next_overload = false; + cv_wait.notifyAll(); + mutex_wait.unlock(); mutex.unlock(); } void stopCalc(int wait_delay = 100) { if (isRunning()) { stop(); + cv.notifyAll(); if (!waitForFinish(wait_delay)) { - mutex_l.unlock(); + mutex_last.unlock(); mutex.unlock(); terminate(); } @@ -95,18 +106,14 @@ public: } Tout getLast() { Tout ret; - mutex_l.lock(); + mutex_last.lock(); ret = last; - mutex_l.unlock(); + mutex_last.unlock(); return ret; } uint maxQueSize() { - uint ret; - mutex.lock(); - ret = max_size; - mutex.unlock(); - return ret; + return max_size; } void setMaxQueSize(uint count) { @@ -127,39 +134,35 @@ protected: private: void begin() {cnt = 0;} void run() { - //piCoutObj << "run ..."; mutex.lock(); - if (in.isEmpty()) { - mutex.unlock(); - piMSleep(10); - //piCoutObj << "run in empty"; - return; - } - if (next_overload && wait_next_pipe) { - mutex.unlock(); - //piCoutObj << "wait" << &next_overload; - calculated(last, &next_overload); - piMSleep(10); - } else { - Tin t = in.dequeue(); - mutex.unlock(); - bool ok = true; - Tout r = calc(t, ok); - if (ok) { - mutex_l.lock(); - last = r; - mutex_l.unlock(); - cnt++; - //piCoutObj << "calc ok" << &next_overload; - calculated(r, &next_overload); + while (in.isEmpty()) { + cv.wait(mutex); + if (terminating) { + mutex.unlock(); + return; } } + mutex_wait.lock(); + Tin t = in.dequeue(); + mutex.unlock(); + cv_wait.notifyAll(); + mutex_wait.unlock(); + bool ok = true; + Tout r = calc(t, ok); + if (ok) { + mutex_last.lock(); + last = r; + mutex_last.unlock(); + cnt++; + //piCoutObj << "calc ok"; + calculated(r, wait_next_pipe); + } //piCoutObj << "run ok"; } - PIMutex mutex; - PIMutex mutex_l; + PIMutex mutex, mutex_wait; + PIConditionVariable cv, cv_wait; + PIMutex mutex_last; bool wait_next_pipe; - bool next_overload; ullong cnt; PIQueue in; Tout last; diff --git a/libs/main/thread/pithread.cpp b/libs/main/thread/pithread.cpp index d478b857..b28b2c34 100644 --- a/libs/main/thread/pithread.cpp +++ b/libs/main/thread/pithread.cpp @@ -63,7 +63,7 @@ event started(); while (isRunning()) { run(); ThreadFunc(); - msleep(timer_delay); + piMSleep(timer_delay); } event stopped(); end(); @@ -402,7 +402,7 @@ bool PIThread::waitForFinish(int timeout_msecs) { if (!running_) return true; if (timeout_msecs < 0) { while (running_) { - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); #ifdef WINDOWS if (!isExists(PRIVATE->thread)) { unlock(); @@ -414,7 +414,7 @@ bool PIThread::waitForFinish(int timeout_msecs) { } tmf_.reset(); while (running_ && tmf_.elapsed_m() < timeout_msecs) { - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); #ifdef WINDOWS if (!isExists(PRIVATE->thread)) { unlock(); @@ -430,12 +430,12 @@ bool PIThread::waitForStart(int timeout_msecs) { if (running_) return true; if (timeout_msecs < 0) { while (!running_) - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); return true; } tms_.reset(); while (!running_ && tms_.elapsed_m() < timeout_msecs) - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); return tms_.elapsed_m() < timeout_msecs; } @@ -456,9 +456,9 @@ void PIThread::_beginThread() { PIINTROSPECTION_THREAD_START(this); REGISTER_THREAD(this); running_ = true; - if (lockRun) mutex_.lock(); + if (lockRun) thread_mutex.lock(); begin(); - if (lockRun) mutex_.unlock(); + if (lockRun) thread_mutex.unlock(); started(); } @@ -466,7 +466,7 @@ void PIThread::_beginThread() { void PIThread::_runThread() { PIINTROSPECTION_THREAD_RUN(this); //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "lock" << "..."; - if (lockRun) mutex_.lock(); + if (lockRun) thread_mutex.lock(); //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "lock" << "ok"; //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "run" << "..."; #ifdef PIP_INTROSPECTION @@ -482,7 +482,7 @@ void PIThread::_runThread() { if (ret_func != 0) ret_func(data_); //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "ret_func" << "ok"; //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "unlock" << "..."; - if (lockRun) mutex_.unlock(); + if (lockRun) thread_mutex.unlock(); //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "unlock" << "ok"; } @@ -491,11 +491,11 @@ void PIThread::_endThread() { //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "stop" << "..."; stopped(); //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "stop" << "ok"; - if (lockRun) mutex_.lock(); + if (lockRun) thread_mutex.lock(); //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "end" << "..."; end(); //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "stop" << "ok"; - if (lockRun) mutex_.unlock(); + if (lockRun) thread_mutex.unlock(); terminating = running_ = false; tid_ = -1; //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "exit"; diff --git a/libs/main/thread/pithread.h b/libs/main/thread/pithread.h index 77cb3339..db6053cd 100644 --- a/libs/main/thread/pithread.h +++ b/libs/main/thread/pithread.h @@ -131,11 +131,15 @@ public: //! @brief Set necessity of lock every \a run with internal mutex void needLockRun(bool need) {lockRun = need;} - EVENT_HANDLER0(void, lock) {mutex_.lock();} - EVENT_HANDLER0(void, unlock) {mutex_.unlock();} + + //! @brief Lock internal mutex + EVENT_HANDLER0(void, lock) const {thread_mutex.lock();} + + //! @brief Unlock internal mutex + EVENT_HANDLER0(void, unlock) const {thread_mutex.unlock();} //! @brief Returns internal mutex - PIMutex & mutex() {return mutex_;} + PIMutex & mutex() const {return thread_mutex;} //! @brief Returns thread ID llong tid() const {return tid_;} @@ -239,7 +243,7 @@ protected: int delay_, policy_; llong tid_; void * data_; - mutable PIMutex mutex_; + mutable PIMutex thread_mutex; PITimeMeasurer tmf_, tms_, tmr_; PIThread::Priority priority_; ThreadFunc ret_func; diff --git a/libs/main/thread/pitimer.cpp b/libs/main/thread/pitimer.cpp index 98cf5580..c94b6c73 100644 --- a/libs/main/thread/pitimer.cpp +++ b/libs/main/thread/pitimer.cpp @@ -648,11 +648,11 @@ bool PITimer::stop(bool wait) { bool PITimer::waitForFinish(int timeout_msecs) { if (timeout_msecs < 0) { while (isRunning()) - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); return true; } PITimeMeasurer tm; while (isRunning() && tm.elapsed_m() < timeout_msecs) - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); return tm.elapsed_m() < timeout_msecs; } diff --git a/main.cpp b/main.cpp index 1779d4dd..e3414108 100644 --- a/main.cpp +++ b/main.cpp @@ -1,123 +1,50 @@ #include "pip.h" +class PIThreadNotifier { +public: + PIThreadNotifier(): cnt(0) {} + + void wait() { + m.lock(); + while (cnt == 0) v.wait(m); + cnt--; + m.unlock(); + } + void notifyOnce() { + m.lock(); + cnt++; + v.notifyAll(); + m.unlock(); + } +private: + ullong cnt; + PIMutex m; + PIConditionVariable v; +}; + int main(int argc, char * argv[]) { - - PIVector data(10, [](int i)->int{return i;}); - - piCout << data; - - PIThreadPoolLoop pool; - pool.setFunction([&](int i){ - data[i] = data[i] + 10; - }); - pool.exec(0, data.size()); - - piCout << data; - - return 0; - - PIVector x(20, [](int i) {return i;}); - piCout << x; - piCout << x.any([](int v) {return v == 10;}); - piCout << x.every([](int v) {return v > 0;}); - piCout << x.etries([](int v) {return v % 5 == 0;}); - piCout << x.indexWhere([](int v) {return v % 8 == 0;}); - piCout << x.indexOf(4, -1); - piCout << x.lastIndexOf(1, 0); - piCout << x.lastIndexWhere([](int v) {return v % 8 == 0;}); - PIVector x2 = x.map([](int v) {return v / 10.0;}); - piCout << x2; - piCout << x.reduce([](int v, PIString s){return s + PIString::fromNumber(v);}); - piCout << x.removeWhere([](int v){return v % 2 == 0;}); - piCout << x.getRange(8, 1); - piCout << x.getRange(8, 100); - - piCout << "====================="; - - PIDeque y(20, [](int i) {return i;}); - piCout << y; - piCout << y.any([](int v) {return v == 10;}); - piCout << y.every([](int v) {return v > 0;}); - piCout << y.etries([](int v) {return v % 5 == 0;}); - piCout << y.indexWhere([](int v) {return v % 8 == 0;}); - piCout << y.indexOf(4, -1); - piCout << y.lastIndexOf(1, 0); - piCout << y.lastIndexWhere([](int v) {return v % 8 == 0;}); - PIDeque y2 = y.map([](int v) {return v / 10.0;}); - piCout << y2; - piCout << y.reduce([](int v, PIString s){return s + PIString::fromNumber(v);}); - piCout << y.removeWhere([](int v){return v % 2 == 0;}); - piCout << y.getRange(8, 1); - piCout << y.getRange(8, 100); - return 0; // TODO: - - PIByteArray rnd; - rnd.resize(1024*1024, 'x'); - PICLI cli(argc, argv); - PITimer tm; - cli.addArgument("connect", true); - cli.addArgument("name", true); - PICloudClient c("127.0.0.1:10101"); -// c.setReopenEnabled(true); - PICloudServer s("127.0.0.1:10101"); - PIVector clients; - CONNECTL(&tm, tickEvent, ([&](void *, int){ - if (c.isConnected()) { - PIString str = "ping"; - piCout << "[Client] send:" << str; - c.write(str.toByteArray()); - } - if (s.isRunning()) { - for (auto cl : clients) { - if (cl->isOpened()) { - PIString str = "ping_S"; - piCout << "[Server] send to" << cl << ":" << str; - cl->write(str.toByteArray()); - } - } - } - })); - CONNECTL(&c, threadedReadEvent, ([&](uchar * readed, int size){ - PIByteArray ba(readed, size); - if (size < 1024) { - PIString str = PIString(ba); - piCout << "[Client] data:" << str; - if (str == "ping_S") c.write(PIString("pong_S").toByteArray()); - } else piCout << "[Client] blob:" << size; - })); - CONNECTL(&c, connected, ([](){piCout << "connected";})); - CONNECTL(&c, disconnected, ([](){piCout << "disconnected";})); - CONNECTL(&s, newConnection, ([&](PICloudServer::Client * cl){ - piCout << "[Server] new client:" << cl; - clients << cl; - CONNECTL(cl, threadedReadEvent, ([&c, &s, cl, &rnd](uchar * readed, int size){ - PIByteArray ba(readed, size); - PIString str = PIString(ba); - piCout << "[Server] data from" << cl << ":" << str; - if (str == "ping") { - cl->write(PIString("pong").toByteArray()); - cl->write(rnd); - } - })); - CONNECTL(cl, closed, ([&clients, cl](){ - cl->stop(); - clients.removeAll(cl); - cl->deleteLater(); - })); - cl->startThreadedRead(); - })); - if (cli.hasArgument("name")) s.setServerName(cli.argumentValue("name")); - if (cli.hasArgument("connect")) { - c.setServerName(cli.argumentValue("connect")); - c.startThreadedRead(); - } else { - s.startThreadedRead(); - } - tm.start(1000); - PIKbdListener ls; - ls.enableExitCapture(PIKbdListener::F10); - ls.start(); - WAIT_FOR_EXIT + PIThreadNotifier n; + int cnt1 = 0; + int cnt2 = 0; + int cnt3 = 0; + PIThread t1([&n, &cnt1](){n.wait(); cnt1++; piMSleep(1);}, true); + PIThread t2([&n, &cnt2](){n.wait(); cnt2++; piMSleep(2);}, true); + piCout << "created"; + piMSleep(10); + piCout << "unlock" << cnt1 << cnt2 << cnt3; + n.notifyOnce(); cnt3++; + piMSleep(10); + piCout << "unlock" << cnt1 << cnt2 << cnt3; + n.notifyOnce(); cnt3++; + piMSleep(10); + piCout << "run" << cnt1 << cnt2 << cnt3; + PIThread t3([&n, &cnt3](){n.notifyOnce(); cnt3++; piMSleep(1);}, true); + piMSleep(20); + t3.stop(); + piMSleep(100); + piCout << "exit" << cnt1 << cnt2 << cnt3; +// m.unlock(); +// piMSleep(10); return 0; } diff --git a/main_picloud_test.cpp b/main_picloud_test.cpp new file mode 100644 index 00000000..296577fc --- /dev/null +++ b/main_picloud_test.cpp @@ -0,0 +1,73 @@ +#include "pip.h" + + +int main(int argc, char * argv[]) { + PIByteArray rnd; + rnd.resize(1024*1024, 'x'); + PICLI cli(argc, argv); + PITimer tm; + cli.addArgument("connect", true); + cli.addArgument("name", true); + PICloudClient c("127.0.0.1:10101"); +// c.setReopenEnabled(true); + PICloudServer s("127.0.0.1:10101"); + PIVector clients; + CONNECTL(&tm, tickEvent, ([&](void *, int){ + if (c.isConnected()) { + PIString str = "ping"; + piCout << "[Client] send:" << str; + c.write(str.toByteArray()); + } + if (s.isRunning()) { + for (auto cl : clients) { + if (cl->isOpened()) { + PIString str = "ping_S"; + piCout << "[Server] send to" << cl << ":" << str; + cl->write(str.toByteArray()); + } + } + } + })); + CONNECTL(&c, threadedReadEvent, ([&](uchar * readed, int size){ + PIByteArray ba(readed, size); + if (size < 1024) { + PIString str = PIString(ba); + piCout << "[Client] data:" << str; + if (str == "ping_S") c.write(PIString("pong_S").toByteArray()); + } else piCout << "[Client] blob:" << size; + })); + CONNECTL(&c, connected, ([](){piCout << "connected";})); + CONNECTL(&c, disconnected, ([](){piCout << "disconnected";})); + CONNECTL(&s, newConnection, ([&](PICloudServer::Client * cl){ + piCout << "[Server] new client:" << cl; + clients << cl; + CONNECTL(cl, threadedReadEvent, ([&c, &s, cl, &rnd](uchar * readed, int size){ + PIByteArray ba(readed, size); + PIString str = PIString(ba); + piCout << "[Server] data from" << cl << ":" << str; + if (str == "ping") { + cl->write(PIString("pong").toByteArray()); + cl->write(rnd); + } + })); + CONNECTL(cl, closed, ([&clients, cl](){ + cl->stop(); + clients.removeAll(cl); + cl->deleteLater(); + })); + cl->startThreadedRead(); + })); + if (cli.hasArgument("name")) s.setServerName(cli.argumentValue("name")); + if (cli.hasArgument("connect")) { + c.setServerName(cli.argumentValue("connect")); + c.startThreadedRead(); + } else { + s.startThreadedRead(); + } + tm.start(1000); + PIKbdListener ls; + ls.enableExitCapture(PIKbdListener::F10); + ls.start(); + WAIT_FOR_EXIT + return 0; +} diff --git a/tests/concurrent/ConditionVariableIntegrationTest.cpp b/tests/concurrent/ConditionVariableIntegrationTest.cpp index ad74b749..81bd6700 100644 --- a/tests/concurrent/ConditionVariableIntegrationTest.cpp +++ b/tests/concurrent/ConditionVariableIntegrationTest.cpp @@ -88,7 +88,7 @@ TEST_F(ConditionVariable, wait_is_protected_unblock_when_notifyOne) { // Missing unlock }); variable->notifyOne(); - msleep(WAIT_THREAD_TIME_MS); + piMSleep(WAIT_THREAD_TIME_MS); ASSERT_FALSE(m.tryLock()); } @@ -130,7 +130,7 @@ TEST_F(ConditionVariable, wait_condition_is_check_condition_when_notifyOne) { isConditionChecked = false; m.unlock(); variable->notifyOne(); - msleep(threadStartTime + 1); + piMSleep(threadStartTime + 1); m.lock(); ASSERT_TRUE(isConditionChecked); m.unlock(); From 48c885e12a602552f58a859887551d1d4e342524 Mon Sep 17 00:00:00 2001 From: Andrey Date: Fri, 29 Oct 2021 18:20:48 +0300 Subject: [PATCH 2/3] PIThreadNotifier, rewrite PIObject::deleteLater() tests for PIThreadNotifier and PIObject::deleteLater() --- libs/main/core/piobject.cpp | 67 +++++-------------- libs/main/core/piobject.h | 3 - libs/main/thread/pithreadmodule.h | 1 + libs/main/thread/pithreadnotifier.cpp | 39 +++++++++++ libs/main/thread/pithreadnotifier.h | 52 ++++++++++++++ main.cpp | 22 ------ .../ConditionVariableIntegrationTest.cpp | 2 +- tests/concurrent/pithreadnotifier_test.cpp | 34 ++++++++++ tests/piobject/delete_later.cpp | 51 ++++++++++++++ 9 files changed, 196 insertions(+), 75 deletions(-) create mode 100644 libs/main/thread/pithreadnotifier.cpp create mode 100644 libs/main/thread/pithreadnotifier.h create mode 100644 tests/concurrent/pithreadnotifier_test.cpp create mode 100644 tests/piobject/delete_later.cpp diff --git a/libs/main/core/piobject.cpp b/libs/main/core/piobject.cpp index 71749360..5a2c0eb6 100644 --- a/libs/main/core/piobject.cpp +++ b/libs/main/core/piobject.cpp @@ -698,28 +698,30 @@ bool PIObject::Connection::disconnect() { PRIVATE_DEFINITION_START(PIObject::Deleter) PIThread thread; PIConditionVariable cond_var; - PIMutex cond_mutex, queue_mutex; PIVector obj_queue; PRIVATE_DEFINITION_END(PIObject::Deleter) PIObject::Deleter::Deleter() { //piCout << "Deleter start ..."; - stopping = started = posted = false; - CONNECTL(&(PRIVATE->thread), started, [this](){proc();}); - PRIVATE->thread.startOnce(); - while (!started) piMSleep(PIP_MIN_MSLEEP); + PRIVATE->thread.setSlot([this](){ + PIVector oq; + PRIVATE->thread.lock(); + while(PRIVATE->obj_queue.isEmpty()) PRIVATE->cond_var.wait(PRIVATE->thread.mutex()); + oq.swap(PRIVATE->obj_queue); + PRIVATE->thread.unlock(); + for (PIObject * o : oq) deleteObject(o); + }); + PRIVATE->thread.start(); } PIObject::Deleter::~Deleter() { //piCout << "~Deleter ..."; - stopping = true; + PRIVATE->thread.stop(); PRIVATE->cond_var.notifyAll(); -#ifndef WINDOWS - while (PRIVATE->thread.isRunning()) piMSleep(PIP_MIN_MSLEEP); -#endif - deleteAll(); + PRIVATE->thread.waitForFinish(); + for (PIObject * o : PRIVATE->obj_queue) deleteObject(o); //piCout << "~Deleter ok"; } @@ -733,46 +735,13 @@ PIObject::Deleter * PIObject::Deleter::instance() { void PIObject::Deleter::post(PIObject * o) { if (!o->isPIObject()) return; //piCout << "[Deleter] post" << o << "..."; - PRIVATE->queue_mutex.lock(); - if (!PRIVATE->obj_queue.contains(o)) + PRIVATE->thread.lock(); + if (!PRIVATE->obj_queue.contains(o)) { PRIVATE->obj_queue << o; - PRIVATE->queue_mutex.unlock(); - PRIVATE->cond_var.notifyAll(); - posted = true; - //piCout << "[Deleter] post" << o << "done"; -} - - -void PIObject::Deleter::proc() { - //piCout << "[Deleter] proc start"; - while (!stopping) { - //piMSleep(1); - //piCout << "[Deleter] proc wait ..."; - if (posted) { - posted = false; - started = true; - } else { - PRIVATE->cond_mutex.lock(); - started = true; - PRIVATE->cond_var.wait(PRIVATE->cond_mutex); - PRIVATE->cond_mutex.unlock(); - } - //piCout << "[Deleter] proc wait done"; - deleteAll(); + PRIVATE->cond_var.notifyAll(); } - //piCout << "[Deleter] proc end ok"; -} - - -void PIObject::Deleter::deleteAll() { - PIVector oq; - PRIVATE->queue_mutex.lock(); - oq = PRIVATE->obj_queue; - //piCout << "[Deleter] deleteAll" << oq.size_s() << "..."; - PRIVATE->obj_queue.clear(); - PRIVATE->queue_mutex.unlock(); - piForeach (PIObject * o, oq) - deleteObject(o); + PRIVATE->thread.unlock(); + //piCout << "[Deleter] post" << o << "done"; } @@ -782,7 +751,7 @@ void PIObject::Deleter::deleteObject(PIObject * o) { //piCout << "[Deleter] delete" << (uintptr_t)o << "wait atomic ..."; while (o->isInEvent()) piMSleep(PIP_MIN_MSLEEP); //piCout << "[Deleter] delete" << (uintptr_t)o << "wait atomic done"; - if (o->isPIObject()) delete o; + delete o; } //piCout << "[Deleter] delete" << (uintptr_t)o << "done"; } diff --git a/libs/main/core/piobject.h b/libs/main/core/piobject.h index 72f54c48..5d11c0ed 100644 --- a/libs/main/core/piobject.h +++ b/libs/main/core/piobject.h @@ -503,10 +503,7 @@ private: static Deleter * instance(); void post(PIObject * o); private: - void proc(); - void deleteAll(); void deleteObject(PIObject * o); - std::atomic_bool stopping, started, posted; PRIVATE_DECLARATION(PIP_EXPORT) }; diff --git a/libs/main/thread/pithreadmodule.h b/libs/main/thread/pithreadmodule.h index 18706e05..e528c355 100644 --- a/libs/main/thread/pithreadmodule.h +++ b/libs/main/thread/pithreadmodule.h @@ -29,5 +29,6 @@ #include "piconditionvar.h" #include "pithreadpoolexecutor.h" #include "pithreadpoolloop.h" +#include "pithreadnotifier.h" #endif // PITHREADMODULE_H diff --git a/libs/main/thread/pithreadnotifier.cpp b/libs/main/thread/pithreadnotifier.cpp new file mode 100644 index 00000000..6b9ac3b6 --- /dev/null +++ b/libs/main/thread/pithreadnotifier.cpp @@ -0,0 +1,39 @@ +/* + PIP - Platform Independent Primitives + Class for simply notify and wait in different threads + Andrey Bychkov work.a.b@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "pithreadnotifier.h" + + +PIThreadNotifier::PIThreadNotifier() : cnt(0) {} + + +void PIThreadNotifier::wait() { + m.lock(); + while (cnt == 0) v.wait(m); + cnt--; + m.unlock(); +} + + +void PIThreadNotifier::notifyOnce() { + m.lock(); + cnt++; + v.notifyAll(); + m.unlock(); +} diff --git a/libs/main/thread/pithreadnotifier.h b/libs/main/thread/pithreadnotifier.h new file mode 100644 index 00000000..5755fc7c --- /dev/null +++ b/libs/main/thread/pithreadnotifier.h @@ -0,0 +1,52 @@ +/*! @file pithreadnotifier.h + * @brief Class for simply notify and wait in different threads +*/ +/* + PIP - Platform Independent Primitives + Class for simply notify and wait in different threads + Andrey Bychkov work.a.b@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef PITHREADNOTIFIER_H +#define PITHREADNOTIFIER_H + +#include "piconditionvar.h" + + +class PIP_EXPORT PIThreadNotifier { +public: + PIThreadNotifier(); + + //! Start waiting, return if other thread call \a notifyOnce(). + //! If \a notifyOnce() has been called before, then returns immediately. + //! If notifyOnce() has been called "n" times, then returns immediately "n" times, + //! but only if wait in one thread. + //! If many threads waiting, then If notifyOnce() has been called "n" times, + //! All threads total returns "n" times in random sequence. + void wait(); + + //! Notify one waiting thread, wich waiting on \a wait() function. + //! If many threads waiting, then notify randomly one. + //! If call this "n" times, then notify any waiting threads totally "n" times. + void notifyOnce(); + +private: + ullong cnt; + PIMutex m; + PIConditionVariable v; +}; + +#endif // PITHREADNOTIFIER_H diff --git a/main.cpp b/main.cpp index e3414108..6c91d54f 100644 --- a/main.cpp +++ b/main.cpp @@ -1,27 +1,5 @@ #include "pip.h" -class PIThreadNotifier { -public: - PIThreadNotifier(): cnt(0) {} - - void wait() { - m.lock(); - while (cnt == 0) v.wait(m); - cnt--; - m.unlock(); - } - void notifyOnce() { - m.lock(); - cnt++; - v.notifyAll(); - m.unlock(); - } -private: - ullong cnt; - PIMutex m; - PIConditionVariable v; -}; - int main(int argc, char * argv[]) { PIThreadNotifier n; diff --git a/tests/concurrent/ConditionVariableIntegrationTest.cpp b/tests/concurrent/ConditionVariableIntegrationTest.cpp index 81bd6700..b81accf1 100644 --- a/tests/concurrent/ConditionVariableIntegrationTest.cpp +++ b/tests/concurrent/ConditionVariableIntegrationTest.cpp @@ -198,6 +198,6 @@ TEST_F(ConditionVariable, waitFor_is_unblock_when_condition_and_notifyOne) { condition = true; m.unlock(); variable->notifyOne(); - msleep(WAIT_THREAD_TIME_MS); + piMSleep(WAIT_THREAD_TIME_MS); ASSERT_FALSE(thread->isRunning()); } diff --git a/tests/concurrent/pithreadnotifier_test.cpp b/tests/concurrent/pithreadnotifier_test.cpp new file mode 100644 index 00000000..0dd01a8a --- /dev/null +++ b/tests/concurrent/pithreadnotifier_test.cpp @@ -0,0 +1,34 @@ +#include "gtest/gtest.h" +#include "pithreadnotifier.h" + + +TEST(PIThreadNotifierTest, One) { + PIThreadNotifier n; + int cnt = 0; + PIThread t1([&n, &cnt](){n.wait(); cnt++;}, true); + piMSleep(10); + n.notifyOnce(); + piMSleep(10); + ASSERT_EQ(cnt, 1); + n.notifyOnce(); + piMSleep(10); + ASSERT_EQ(cnt, 2); +} + + +TEST(PIThreadNotifierTest, Two) { + PIThreadNotifier n; + int cnt1 = 0; + int cnt2 = 0; + int cnt3 = 0; + PIThread t1([&n, &cnt1](){n.wait(); cnt1++; piMSleep(2);}, true); + PIThread t2([&n, &cnt2](){n.wait(); cnt2++; piMSleep(2);}, true); + PIThread t3([&n, &cnt3](){n.notifyOnce(); cnt3++; piMSleep(1);}, true); + piMSleep(20); + t3.stop(true); + piMSleep(100); + t1.stop(); + t2.stop(); + ASSERT_EQ(cnt1+cnt2, cnt3); +} + diff --git a/tests/piobject/delete_later.cpp b/tests/piobject/delete_later.cpp new file mode 100644 index 00000000..78d72c4e --- /dev/null +++ b/tests/piobject/delete_later.cpp @@ -0,0 +1,51 @@ +#include "gtest/gtest.h" +#include "piobject.h" + +std::atomic obj_cnt; + +class Send: public PIObject { + PIOBJECT(Send) +public: + Send() {obj_cnt++;} + ~Send() {obj_cnt--;} + EVENT1(ev, PIObject * , o) +}; + + +class Recv: public PIObject { + PIOBJECT(Recv) +public: + Recv() {obj_cnt++;} + ~Recv() {obj_cnt--;} + EVENT_HANDLER1(void, eh, PIObject * , o) { + o->deleteLater(); + piMSleep(10); + } +}; + + +TEST(Piobject, deleteLater) { + obj_cnt = 0; + Send * s = new Send(); + Recv * r = new Recv(); + CONNECTU(s, ev, r, eh); + s->ev(r); + r->deleteLater(); + s->deleteLater(); + piMSleep(100); + ASSERT_EQ(obj_cnt, 0); + + PIVector s2; + s2.resize(100, new Send()); + for (auto o : s2) o->deleteLater(); + piMSleep(10); + ASSERT_EQ(obj_cnt, 0); + s2.clear(); + + PIVector r2; + r2.resize(100, [](size_t i){return new Recv();}); + for (auto o : r2) o->deleteLater(); + piMSleep(10); + ASSERT_EQ(obj_cnt, 0); + r2.clear(); +} From a2a205cfd210830d43f603583f281ac6c8a93e03 Mon Sep 17 00:00:00 2001 From: peri4 Date: Tue, 16 Nov 2021 14:43:57 +0300 Subject: [PATCH 3/3] version 2.33.0 piMinSleep() method --- CMakeLists.txt | 2 +- libs/main/core/pibase.h | 4 ++-- libs/main/core/piobject.cpp | 2 +- libs/main/core/pitime.h | 2 ++ libs/main/io_devices/piethernet.cpp | 4 ++-- libs/main/io_devices/piiodevice.cpp | 4 ++-- libs/main/io_devices/piserial.cpp | 14 +++++++------- libs/main/io_utils/pibasetransfer.cpp | 4 ++-- libs/main/system/piprocess.cpp | 4 ++-- libs/main/thread/pigrabberbase.h | 2 +- libs/main/thread/pithread.cpp | 8 ++++---- libs/main/thread/pitimer.cpp | 4 ++-- 12 files changed, 28 insertions(+), 26 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index bff4290e..eeb11129 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.0) cmake_policy(SET CMP0017 NEW) # need include() with .cmake project(pip) set(pip_MAJOR 2) -set(pip_MINOR 32) +set(pip_MINOR 33) set(pip_REVISION 0) set(pip_SUFFIX ) set(pip_COMPANY SHS) diff --git a/libs/main/core/pibase.h b/libs/main/core/pibase.h index d89d525e..8c978f93 100644 --- a/libs/main/core/pibase.h +++ b/libs/main/core/pibase.h @@ -285,10 +285,10 @@ #define FOREVER for (;;) //! Macro used for infinite wait -#define FOREVER_WAIT FOREVER msleep(PIP_MIN_MSLEEP); +#define FOREVER_WAIT FOREVER piMinSleep; //! Macro used for infinite wait -#define WAIT_FOREVER FOREVER msleep(PIP_MIN_MSLEEP); +#define WAIT_FOREVER FOREVER piMinSleep; //! global variable enabling output to piCout, default is true diff --git a/libs/main/core/piobject.cpp b/libs/main/core/piobject.cpp index 5a2c0eb6..7a7ae457 100644 --- a/libs/main/core/piobject.cpp +++ b/libs/main/core/piobject.cpp @@ -749,7 +749,7 @@ void PIObject::Deleter::deleteObject(PIObject * o) { //piCout << "[Deleter] delete" << (uintptr_t)o << "..."; if (o->isPIObject()) { //piCout << "[Deleter] delete" << (uintptr_t)o << "wait atomic ..."; - while (o->isInEvent()) piMSleep(PIP_MIN_MSLEEP); + while (o->isInEvent()) piMinSleep(); //piCout << "[Deleter] delete" << (uintptr_t)o << "wait atomic done"; delete o; } diff --git a/libs/main/core/pitime.h b/libs/main/core/pitime.h index 8416ab70..316811ed 100644 --- a/libs/main/core/pitime.h +++ b/libs/main/core/pitime.h @@ -46,6 +46,8 @@ inline void piMSleep(double msecs) {piUSleep(int(msecs * 1000.));} // on !Window * \details This function exec \a piUSleep (msecs * 1000000). */ inline void piSleep(double secs) {piUSleep(int(secs * 1000000.));} // on !Windows consider constant "usleep" offset +//! Shortest available on current system sleep +inline void piMinSleep() {piMSleep(PIP_MIN_MSLEEP);} diff --git a/libs/main/io_devices/piethernet.cpp b/libs/main/io_devices/piethernet.cpp index 2277184b..7f2e797e 100644 --- a/libs/main/io_devices/piethernet.cpp +++ b/libs/main/io_devices/piethernet.cpp @@ -686,7 +686,7 @@ int PIEthernet::readDevice(void * read_to, int max_size) { s = accept(sock, (sockaddr * )&client_addr, &slen); if (s == -1) { //piCoutObj << "Can`t accept new connection, " << ethErrorString(); - piMSleep(PIP_MIN_MSLEEP); + piMinSleep(); return -1; } rs = ethRecv(s, read_to, max_size); @@ -788,7 +788,7 @@ int PIEthernet::writeDevice(const void * data, int max_size) { //piCoutObj << "connect SingleTCP" << ip_s << ":" << port_s << "..."; if (::connect(sock, (sockaddr * )&PRIVATE->addr_, sizeof(PRIVATE->addr_)) != 0) { //piCoutObj << "Can`t connect to " << ip_s << ":" << port_s << ", " << ethErrorString(); - piMSleep(PIP_MIN_MSLEEP); + piMinSleep(); return -1; } //piCoutObj << "ok, write SingleTCP" << int(data) << max_size << "bytes ..."; diff --git a/libs/main/io_devices/piiodevice.cpp b/libs/main/io_devices/piiodevice.cpp index 62d14038..7e1c1863 100644 --- a/libs/main/io_devices/piiodevice.cpp +++ b/libs/main/io_devices/piiodevice.cpp @@ -254,7 +254,7 @@ void PIIODevice::write_func() { int ret = write(item.first); threadedWriteEvent(item.second, ret); } - piMSleep(PIP_MIN_MSLEEP); + piMinSleep(); } } @@ -338,7 +338,7 @@ PIByteArray PIIODevice::readForTime(double timeout_ms) { tm.reset(); while (tm.elapsed_m() < timeout_ms) { ret = read(td, threaded_read_buffer_size); - if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMinSleep(); else str.append(td, ret); } delete[] td; diff --git a/libs/main/io_devices/piserial.cpp b/libs/main/io_devices/piserial.cpp index 0c4c64fa..33484c1b 100644 --- a/libs/main/io_devices/piserial.cpp +++ b/libs/main/io_devices/piserial.cpp @@ -434,7 +434,7 @@ bool PISerial::read(void * data, int size, double timeout_ms) { while (all < size && tm_.elapsed_m() < timeout_ms) { ret = readDevice(&((uchar * )data)[all], size - all); if (ret > 0) all += ret; - else piMSleep(PIP_MIN_MSLEEP); + else piMinSleep(); } setOption(BlockingRead, br); received(data, all); @@ -473,13 +473,13 @@ PIString PISerial::read(int size, double timeout_ms) { if (size <= 0) { while (tm_.elapsed_m() < timeout_ms) { ret = readDevice(td, 1024); - if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMinSleep(); else str << PIString((char*)td, ret); } } else { while (all < size && tm_.elapsed_m() < timeout_ms) { ret = readDevice(td, size - all); - if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMinSleep(); else { str << PIString((char*)td, ret); all += ret; @@ -493,7 +493,7 @@ PIString PISerial::read(int size, double timeout_ms) { str << PIString((char*)td, all); while (all < size) { ret = readDevice(td, size - all); - if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMinSleep(); else { str << PIString((char*)td, ret); all += ret; @@ -525,13 +525,13 @@ PIByteArray PISerial::readData(int size, double timeout_ms) { if (size <= 0) { while (tm_.elapsed_m() < timeout_ms) { ret = readDevice(td, 1024); - if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMinSleep(); else str.append(td, ret); } } else { while (all < size && tm_.elapsed_m() < timeout_ms) { ret = readDevice(td, size - all); - if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMinSleep(); else { str.append(td, ret); all += ret; @@ -545,7 +545,7 @@ PIByteArray PISerial::readData(int size, double timeout_ms) { str.append(td, all); while (all < size) { ret = readDevice(td, size - all); - if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMinSleep(); else { str.append(td, ret); all += ret; diff --git a/libs/main/io_utils/pibasetransfer.cpp b/libs/main/io_utils/pibasetransfer.cpp index 1b2f9bf2..7f18c752 100644 --- a/libs/main/io_utils/pibasetransfer.cpp +++ b/libs/main/io_utils/pibasetransfer.cpp @@ -346,7 +346,7 @@ bool PIBaseTransfer::send_process() { if (chk == 0) return finish_send(true); if (chk != prev_chk) rtm.reset(); else if (rtm.elapsed_m() < 100) { - piMSleep(PIP_MIN_MSLEEP); + piMinSleep(); continue; } if (is_pause) { @@ -507,7 +507,7 @@ bool PIBaseTransfer::getStartRequest() { return true; } mutex_session.unlock(); - piMSleep(PIP_MIN_MSLEEP); + piMinSleep(); } return false; } diff --git a/libs/main/system/piprocess.cpp b/libs/main/system/piprocess.cpp index 2f00d975..7158ddcd 100644 --- a/libs/main/system/piprocess.cpp +++ b/libs/main/system/piprocess.cpp @@ -66,7 +66,7 @@ void PIProcess::exec_() { startOnce(); //cout << "exec wait" << endl; while (!is_exec) - piMSleep(PIP_MIN_MSLEEP); + piMinSleep(); //cout << "exec end" << endl; } @@ -172,7 +172,7 @@ void PIProcess::startProc(bool detached) { if (execve(str.data(), a, e) < 0) piCoutObj << "\"execve" << str << args << "\" error :" << errorString(); } else { - msleep(PIP_MIN_MSLEEP); + piMinSleep; //cout << "wait" << endl; if (!detached) { wait(&exit_code); diff --git a/libs/main/thread/pigrabberbase.h b/libs/main/thread/pigrabberbase.h index a45d2197..6bb2843b 100644 --- a/libs/main/thread/pigrabberbase.h +++ b/libs/main/thread/pigrabberbase.h @@ -162,7 +162,7 @@ private: return; } if (ret > 0) { - piMSleep(PIP_MIN_MSLEEP); + piMinSleep(); return; } diag_.received(1); diff --git a/libs/main/thread/pithread.cpp b/libs/main/thread/pithread.cpp index b28b2c34..863d6a63 100644 --- a/libs/main/thread/pithread.cpp +++ b/libs/main/thread/pithread.cpp @@ -402,7 +402,7 @@ bool PIThread::waitForFinish(int timeout_msecs) { if (!running_) return true; if (timeout_msecs < 0) { while (running_) { - piMSleep(PIP_MIN_MSLEEP); + piMinSleep(); #ifdef WINDOWS if (!isExists(PRIVATE->thread)) { unlock(); @@ -414,7 +414,7 @@ bool PIThread::waitForFinish(int timeout_msecs) { } tmf_.reset(); while (running_ && tmf_.elapsed_m() < timeout_msecs) { - piMSleep(PIP_MIN_MSLEEP); + piMinSleep(); #ifdef WINDOWS if (!isExists(PRIVATE->thread)) { unlock(); @@ -430,12 +430,12 @@ bool PIThread::waitForStart(int timeout_msecs) { if (running_) return true; if (timeout_msecs < 0) { while (!running_) - piMSleep(PIP_MIN_MSLEEP); + piMinSleep(); return true; } tms_.reset(); while (!running_ && tms_.elapsed_m() < timeout_msecs) - piMSleep(PIP_MIN_MSLEEP); + piMinSleep(); return tms_.elapsed_m() < timeout_msecs; } diff --git a/libs/main/thread/pitimer.cpp b/libs/main/thread/pitimer.cpp index c94b6c73..83e94885 100644 --- a/libs/main/thread/pitimer.cpp +++ b/libs/main/thread/pitimer.cpp @@ -648,11 +648,11 @@ bool PITimer::stop(bool wait) { bool PITimer::waitForFinish(int timeout_msecs) { if (timeout_msecs < 0) { while (isRunning()) - piMSleep(PIP_MIN_MSLEEP); + piMinSleep(); return true; } PITimeMeasurer tm; while (isRunning() && tm.elapsed_m() < timeout_msecs) - piMSleep(PIP_MIN_MSLEEP); + piMinSleep(); return tm.elapsed_m() < timeout_msecs; }