diff --git a/libs/main/core/piobject.cpp b/libs/main/core/piobject.cpp index 1f2bfc64..71749360 100644 --- a/libs/main/core/piobject.cpp +++ b/libs/main/core/piobject.cpp @@ -708,7 +708,7 @@ PIObject::Deleter::Deleter() { stopping = started = posted = false; CONNECTL(&(PRIVATE->thread), started, [this](){proc();}); PRIVATE->thread.startOnce(); - while (!started) piMSleep(1); + while (!started) piMSleep(PIP_MIN_MSLEEP); } @@ -717,7 +717,7 @@ PIObject::Deleter::~Deleter() { stopping = true; PRIVATE->cond_var.notifyAll(); #ifndef WINDOWS - while (PRIVATE->thread.isRunning()) piMSleep(1); + while (PRIVATE->thread.isRunning()) piMSleep(PIP_MIN_MSLEEP); #endif deleteAll(); //piCout << "~Deleter ok"; @@ -780,7 +780,7 @@ void PIObject::Deleter::deleteObject(PIObject * o) { //piCout << "[Deleter] delete" << (uintptr_t)o << "..."; if (o->isPIObject()) { //piCout << "[Deleter] delete" << (uintptr_t)o << "wait atomic ..."; - while (o->isInEvent()) piMSleep(1); + while (o->isInEvent()) piMSleep(PIP_MIN_MSLEEP); //piCout << "[Deleter] delete" << (uintptr_t)o << "wait atomic done"; if (o->isPIObject()) delete o; } diff --git a/libs/main/core/pitime.cpp b/libs/main/core/pitime.cpp index 106befaa..f19e038f 100644 --- a/libs/main/core/pitime.cpp +++ b/libs/main/core/pitime.cpp @@ -469,13 +469,3 @@ PICout operator <<(PICout s, const PIDateTime & v) { return s; } - -#ifdef WINDOWS -void msleep(int msecs) {Sleep(msecs);} -#else -# ifdef FREERTOS -void msleep(int msecs) {vTaskDelay(msecs / portTICK_PERIOD_MS);} -# else -void msleep(int msecs) {usleep(msecs * 1000);} -# endif -#endif diff --git a/libs/main/core/pitime.h b/libs/main/core/pitime.h index fa5d977f..8416ab70 100644 --- a/libs/main/core/pitime.h +++ b/libs/main/core/pitime.h @@ -29,8 +29,6 @@ #ifdef QNX # include #endif -//! @brief Sleep for "msecs" milliseconds -PIP_EXPORT void msleep(int msecs); /*! @brief Precise sleep for "usecs" microseconds * \details This function consider \c "usleep" offset diff --git a/libs/main/io_devices/piethernet.cpp b/libs/main/io_devices/piethernet.cpp index 7e3f657d..2277184b 100644 --- a/libs/main/io_devices/piethernet.cpp +++ b/libs/main/io_devices/piethernet.cpp @@ -686,7 +686,7 @@ int PIEthernet::readDevice(void * read_to, int max_size) { s = accept(sock, (sockaddr * )&client_addr, &slen); if (s == -1) { //piCoutObj << "Can`t accept new connection, " << ethErrorString(); - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); return -1; } rs = ethRecv(s, read_to, max_size); @@ -788,7 +788,7 @@ int PIEthernet::writeDevice(const void * data, int max_size) { //piCoutObj << "connect SingleTCP" << ip_s << ":" << port_s << "..."; if (::connect(sock, (sockaddr * )&PRIVATE->addr_, sizeof(PRIVATE->addr_)) != 0) { //piCoutObj << "Can`t connect to " << ip_s << ":" << port_s << ", " << ethErrorString(); - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); return -1; } //piCoutObj << "ok, write SingleTCP" << int(data) << max_size << "bytes ..."; diff --git a/libs/main/io_devices/piiodevice.cpp b/libs/main/io_devices/piiodevice.cpp index a50042e2..62d14038 100644 --- a/libs/main/io_devices/piiodevice.cpp +++ b/libs/main/io_devices/piiodevice.cpp @@ -254,7 +254,7 @@ void PIIODevice::write_func() { int ret = write(item.first); threadedWriteEvent(item.second, ret); } - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); } } @@ -338,7 +338,7 @@ PIByteArray PIIODevice::readForTime(double timeout_ms) { tm.reset(); while (tm.elapsed_m() < timeout_ms) { ret = read(td, threaded_read_buffer_size); - if (ret <= 0) msleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); else str.append(td, ret); } delete[] td; diff --git a/libs/main/io_devices/piserial.cpp b/libs/main/io_devices/piserial.cpp index fb86d170..0c4c64fa 100644 --- a/libs/main/io_devices/piserial.cpp +++ b/libs/main/io_devices/piserial.cpp @@ -434,7 +434,7 @@ bool PISerial::read(void * data, int size, double timeout_ms) { while (all < size && tm_.elapsed_m() < timeout_ms) { ret = readDevice(&((uchar * )data)[all], size - all); if (ret > 0) all += ret; - else msleep(PIP_MIN_MSLEEP); + else piMSleep(PIP_MIN_MSLEEP); } setOption(BlockingRead, br); received(data, all); @@ -473,13 +473,13 @@ PIString PISerial::read(int size, double timeout_ms) { if (size <= 0) { while (tm_.elapsed_m() < timeout_ms) { ret = readDevice(td, 1024); - if (ret <= 0) msleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); else str << PIString((char*)td, ret); } } else { while (all < size && tm_.elapsed_m() < timeout_ms) { ret = readDevice(td, size - all); - if (ret <= 0) msleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); else { str << PIString((char*)td, ret); all += ret; @@ -493,7 +493,7 @@ PIString PISerial::read(int size, double timeout_ms) { str << PIString((char*)td, all); while (all < size) { ret = readDevice(td, size - all); - if (ret <= 0) msleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); else { str << PIString((char*)td, ret); all += ret; @@ -525,13 +525,13 @@ PIByteArray PISerial::readData(int size, double timeout_ms) { if (size <= 0) { while (tm_.elapsed_m() < timeout_ms) { ret = readDevice(td, 1024); - if (ret <= 0) msleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); else str.append(td, ret); } } else { while (all < size && tm_.elapsed_m() < timeout_ms) { ret = readDevice(td, size - all); - if (ret <= 0) msleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); else { str.append(td, ret); all += ret; @@ -545,7 +545,7 @@ PIByteArray PISerial::readData(int size, double timeout_ms) { str.append(td, all); while (all < size) { ret = readDevice(td, size - all); - if (ret <= 0) msleep(PIP_MIN_MSLEEP); + if (ret <= 0) piMSleep(PIP_MIN_MSLEEP); else { str.append(td, ret); all += ret; diff --git a/libs/main/system/piprocess.cpp b/libs/main/system/piprocess.cpp index 05e2d9cd..2f00d975 100644 --- a/libs/main/system/piprocess.cpp +++ b/libs/main/system/piprocess.cpp @@ -66,7 +66,7 @@ void PIProcess::exec_() { startOnce(); //cout << "exec wait" << endl; while (!is_exec) - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); //cout << "exec end" << endl; } diff --git a/libs/main/system/pisystemmonitor.cpp b/libs/main/system/pisystemmonitor.cpp index 1e2e39e2..4e33b848 100644 --- a/libs/main/system/pisystemmonitor.cpp +++ b/libs/main/system/pisystemmonitor.cpp @@ -145,9 +145,9 @@ bool PISystemMonitor::startOnSelf(int interval_ms) { PIVector PISystemMonitor::threadsStatistic() const { - mutex_.lock(); + lock(); PIVector ret = cur_ts; - mutex_.unlock(); + unlock(); return ret; } @@ -341,9 +341,9 @@ void PISystemMonitor::run() { //piCout << ts_new.cpu_load_user; } last_tm = cur_tm; - mutex_.lock(); + lock(); cur_ts = cur_tm.values(); - mutex_.unlock(); + unlock(); tstat.ram_total = totalRAM(); tstat.ram_used = usedRAM(); tstat.ram_free = freeRAM(); diff --git a/libs/main/thread/piconditionvar.cpp b/libs/main/thread/piconditionvar.cpp index 7eb476d9..d679c777 100644 --- a/libs/main/thread/piconditionvar.cpp +++ b/libs/main/thread/piconditionvar.cpp @@ -36,7 +36,6 @@ PRIVATE_DEFINITION_START(PIConditionVariable) #else pthread_cond_t nativeHandle; #endif - bool isDestroying; PRIVATE_DEFINITION_END(PIConditionVariable) @@ -44,7 +43,6 @@ PIConditionVariable::PIConditionVariable() { #ifdef WINDOWS InitializeConditionVariable(&PRIVATE->nativeHandle); #else - PRIVATE->isDestroying = false; pthread_condattr_t condattr; pthread_condattr_init(&condattr); @@ -84,7 +82,6 @@ void PIConditionVariable::wait(PIMutex& lk, const std::function& conditi #else pthread_cond_wait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle()); #endif - if (PRIVATE->isDestroying) return; } } @@ -100,7 +97,6 @@ bool PIConditionVariable::waitFor(PIMutex &lk, int timeoutMs) { st.toTimespec(&expire_ts); isNotTimeout = pthread_cond_timedwait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle(), &expire_ts) == 0; #endif - if (PRIVATE->isDestroying) return false; return isNotTimeout; } @@ -127,7 +123,6 @@ bool PIConditionVariable::waitFor(PIMutex& lk, int timeoutMs, const std::functio bool isTimeout = pthread_cond_timedwait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle(), &expire_ts) != 0; #endif if (isTimeout) return false; - if (PRIVATE->isDestroying) return false; } return true; } diff --git a/libs/main/thread/pipipelinethread.h b/libs/main/thread/pipipelinethread.h index add674bf..fd300cb1 100644 --- a/libs/main/thread/pipipelinethread.h +++ b/libs/main/thread/pipipelinethread.h @@ -25,6 +25,8 @@ #include "pithread.h" #include "piqueue.h" +#include "piconditionvar.h" + template class PIPipelineThread : public PIThread @@ -35,10 +37,10 @@ public: cnt = 0; max_size = 0; wait_next_pipe = false; - next_overload = false; } ~PIPipelineThread() { stop(); + cv.notifyAll(); if (!waitForFinish(1000)) { piCoutObj << "terminating self thread"; terminate(); @@ -46,21 +48,27 @@ public: } template void connectTo(PIPipelineThread * next) { - CONNECT2(void, Tout, bool *, this, calculated, next, enqueue) + CONNECT3(void, Tout, bool, bool *, this, calculated, next, enqueue) } - EVENT2(calculated, const Tout &, v, bool *, overload) - void enqueue(const Tin &v) {enqueue(v, 0);} - EVENT_HANDLER2(void, enqueue, const Tin &, v, bool *, overload) { + EVENT3(calculated, const Tout &, v, bool, wait, bool *, overload) + EVENT_HANDLER3(void, enqueue, const Tin &, v, bool, wait, bool *, overload) { mutex.lock(); //piCoutObj << "enque" << overload; + if (wait && max_size != 0) { + mutex_wait.lock(); + while (in.size() >= max_size) cv_wait.wait(mutex_wait); + mutex_wait.unlock(); + } if (max_size == 0 || in.size() < max_size) { in.enqueue(v); + cv.notifyAll(); if (overload) *overload = false; } else { if (overload) *overload = true; } mutex.unlock(); } + void enqueue(const Tin &v, bool wait = false) {enqueue(v, wait, nullptr);} const ullong * counterPtr() const {return &cnt;} ullong counter() const {return cnt;} bool isEmpty() { @@ -79,15 +87,18 @@ public: } void clear() { mutex.lock(); + mutex_wait.lock(); in.clear(); - next_overload = false; + cv_wait.notifyAll(); + mutex_wait.unlock(); mutex.unlock(); } void stopCalc(int wait_delay = 100) { if (isRunning()) { stop(); + cv.notifyAll(); if (!waitForFinish(wait_delay)) { - mutex_l.unlock(); + mutex_last.unlock(); mutex.unlock(); terminate(); } @@ -95,18 +106,14 @@ public: } Tout getLast() { Tout ret; - mutex_l.lock(); + mutex_last.lock(); ret = last; - mutex_l.unlock(); + mutex_last.unlock(); return ret; } uint maxQueSize() { - uint ret; - mutex.lock(); - ret = max_size; - mutex.unlock(); - return ret; + return max_size; } void setMaxQueSize(uint count) { @@ -127,39 +134,35 @@ protected: private: void begin() {cnt = 0;} void run() { - //piCoutObj << "run ..."; mutex.lock(); - if (in.isEmpty()) { - mutex.unlock(); - piMSleep(10); - //piCoutObj << "run in empty"; - return; - } - if (next_overload && wait_next_pipe) { - mutex.unlock(); - //piCoutObj << "wait" << &next_overload; - calculated(last, &next_overload); - piMSleep(10); - } else { - Tin t = in.dequeue(); - mutex.unlock(); - bool ok = true; - Tout r = calc(t, ok); - if (ok) { - mutex_l.lock(); - last = r; - mutex_l.unlock(); - cnt++; - //piCoutObj << "calc ok" << &next_overload; - calculated(r, &next_overload); + while (in.isEmpty()) { + cv.wait(mutex); + if (terminating) { + mutex.unlock(); + return; } } + mutex_wait.lock(); + Tin t = in.dequeue(); + mutex.unlock(); + cv_wait.notifyAll(); + mutex_wait.unlock(); + bool ok = true; + Tout r = calc(t, ok); + if (ok) { + mutex_last.lock(); + last = r; + mutex_last.unlock(); + cnt++; + //piCoutObj << "calc ok"; + calculated(r, wait_next_pipe); + } //piCoutObj << "run ok"; } - PIMutex mutex; - PIMutex mutex_l; + PIMutex mutex, mutex_wait; + PIConditionVariable cv, cv_wait; + PIMutex mutex_last; bool wait_next_pipe; - bool next_overload; ullong cnt; PIQueue in; Tout last; diff --git a/libs/main/thread/pithread.cpp b/libs/main/thread/pithread.cpp index d478b857..b28b2c34 100644 --- a/libs/main/thread/pithread.cpp +++ b/libs/main/thread/pithread.cpp @@ -63,7 +63,7 @@ event started(); while (isRunning()) { run(); ThreadFunc(); - msleep(timer_delay); + piMSleep(timer_delay); } event stopped(); end(); @@ -402,7 +402,7 @@ bool PIThread::waitForFinish(int timeout_msecs) { if (!running_) return true; if (timeout_msecs < 0) { while (running_) { - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); #ifdef WINDOWS if (!isExists(PRIVATE->thread)) { unlock(); @@ -414,7 +414,7 @@ bool PIThread::waitForFinish(int timeout_msecs) { } tmf_.reset(); while (running_ && tmf_.elapsed_m() < timeout_msecs) { - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); #ifdef WINDOWS if (!isExists(PRIVATE->thread)) { unlock(); @@ -430,12 +430,12 @@ bool PIThread::waitForStart(int timeout_msecs) { if (running_) return true; if (timeout_msecs < 0) { while (!running_) - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); return true; } tms_.reset(); while (!running_ && tms_.elapsed_m() < timeout_msecs) - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); return tms_.elapsed_m() < timeout_msecs; } @@ -456,9 +456,9 @@ void PIThread::_beginThread() { PIINTROSPECTION_THREAD_START(this); REGISTER_THREAD(this); running_ = true; - if (lockRun) mutex_.lock(); + if (lockRun) thread_mutex.lock(); begin(); - if (lockRun) mutex_.unlock(); + if (lockRun) thread_mutex.unlock(); started(); } @@ -466,7 +466,7 @@ void PIThread::_beginThread() { void PIThread::_runThread() { PIINTROSPECTION_THREAD_RUN(this); //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "lock" << "..."; - if (lockRun) mutex_.lock(); + if (lockRun) thread_mutex.lock(); //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "lock" << "ok"; //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "run" << "..."; #ifdef PIP_INTROSPECTION @@ -482,7 +482,7 @@ void PIThread::_runThread() { if (ret_func != 0) ret_func(data_); //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "ret_func" << "ok"; //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "unlock" << "..."; - if (lockRun) mutex_.unlock(); + if (lockRun) thread_mutex.unlock(); //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "unlock" << "ok"; } @@ -491,11 +491,11 @@ void PIThread::_endThread() { //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "stop" << "..."; stopped(); //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "stop" << "ok"; - if (lockRun) mutex_.lock(); + if (lockRun) thread_mutex.lock(); //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "end" << "..."; end(); //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "stop" << "ok"; - if (lockRun) mutex_.unlock(); + if (lockRun) thread_mutex.unlock(); terminating = running_ = false; tid_ = -1; //PICout(PICoutManipulators::DefaultControls) << "thread" << this << "exit"; diff --git a/libs/main/thread/pithread.h b/libs/main/thread/pithread.h index 77cb3339..db6053cd 100644 --- a/libs/main/thread/pithread.h +++ b/libs/main/thread/pithread.h @@ -131,11 +131,15 @@ public: //! @brief Set necessity of lock every \a run with internal mutex void needLockRun(bool need) {lockRun = need;} - EVENT_HANDLER0(void, lock) {mutex_.lock();} - EVENT_HANDLER0(void, unlock) {mutex_.unlock();} + + //! @brief Lock internal mutex + EVENT_HANDLER0(void, lock) const {thread_mutex.lock();} + + //! @brief Unlock internal mutex + EVENT_HANDLER0(void, unlock) const {thread_mutex.unlock();} //! @brief Returns internal mutex - PIMutex & mutex() {return mutex_;} + PIMutex & mutex() const {return thread_mutex;} //! @brief Returns thread ID llong tid() const {return tid_;} @@ -239,7 +243,7 @@ protected: int delay_, policy_; llong tid_; void * data_; - mutable PIMutex mutex_; + mutable PIMutex thread_mutex; PITimeMeasurer tmf_, tms_, tmr_; PIThread::Priority priority_; ThreadFunc ret_func; diff --git a/libs/main/thread/pitimer.cpp b/libs/main/thread/pitimer.cpp index 98cf5580..c94b6c73 100644 --- a/libs/main/thread/pitimer.cpp +++ b/libs/main/thread/pitimer.cpp @@ -648,11 +648,11 @@ bool PITimer::stop(bool wait) { bool PITimer::waitForFinish(int timeout_msecs) { if (timeout_msecs < 0) { while (isRunning()) - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); return true; } PITimeMeasurer tm; while (isRunning() && tm.elapsed_m() < timeout_msecs) - msleep(PIP_MIN_MSLEEP); + piMSleep(PIP_MIN_MSLEEP); return tm.elapsed_m() < timeout_msecs; } diff --git a/main.cpp b/main.cpp index 1779d4dd..e3414108 100644 --- a/main.cpp +++ b/main.cpp @@ -1,123 +1,50 @@ #include "pip.h" +class PIThreadNotifier { +public: + PIThreadNotifier(): cnt(0) {} + + void wait() { + m.lock(); + while (cnt == 0) v.wait(m); + cnt--; + m.unlock(); + } + void notifyOnce() { + m.lock(); + cnt++; + v.notifyAll(); + m.unlock(); + } +private: + ullong cnt; + PIMutex m; + PIConditionVariable v; +}; + int main(int argc, char * argv[]) { - - PIVector data(10, [](int i)->int{return i;}); - - piCout << data; - - PIThreadPoolLoop pool; - pool.setFunction([&](int i){ - data[i] = data[i] + 10; - }); - pool.exec(0, data.size()); - - piCout << data; - - return 0; - - PIVector x(20, [](int i) {return i;}); - piCout << x; - piCout << x.any([](int v) {return v == 10;}); - piCout << x.every([](int v) {return v > 0;}); - piCout << x.etries([](int v) {return v % 5 == 0;}); - piCout << x.indexWhere([](int v) {return v % 8 == 0;}); - piCout << x.indexOf(4, -1); - piCout << x.lastIndexOf(1, 0); - piCout << x.lastIndexWhere([](int v) {return v % 8 == 0;}); - PIVector x2 = x.map([](int v) {return v / 10.0;}); - piCout << x2; - piCout << x.reduce([](int v, PIString s){return s + PIString::fromNumber(v);}); - piCout << x.removeWhere([](int v){return v % 2 == 0;}); - piCout << x.getRange(8, 1); - piCout << x.getRange(8, 100); - - piCout << "====================="; - - PIDeque y(20, [](int i) {return i;}); - piCout << y; - piCout << y.any([](int v) {return v == 10;}); - piCout << y.every([](int v) {return v > 0;}); - piCout << y.etries([](int v) {return v % 5 == 0;}); - piCout << y.indexWhere([](int v) {return v % 8 == 0;}); - piCout << y.indexOf(4, -1); - piCout << y.lastIndexOf(1, 0); - piCout << y.lastIndexWhere([](int v) {return v % 8 == 0;}); - PIDeque y2 = y.map([](int v) {return v / 10.0;}); - piCout << y2; - piCout << y.reduce([](int v, PIString s){return s + PIString::fromNumber(v);}); - piCout << y.removeWhere([](int v){return v % 2 == 0;}); - piCout << y.getRange(8, 1); - piCout << y.getRange(8, 100); - return 0; // TODO: - - PIByteArray rnd; - rnd.resize(1024*1024, 'x'); - PICLI cli(argc, argv); - PITimer tm; - cli.addArgument("connect", true); - cli.addArgument("name", true); - PICloudClient c("127.0.0.1:10101"); -// c.setReopenEnabled(true); - PICloudServer s("127.0.0.1:10101"); - PIVector clients; - CONNECTL(&tm, tickEvent, ([&](void *, int){ - if (c.isConnected()) { - PIString str = "ping"; - piCout << "[Client] send:" << str; - c.write(str.toByteArray()); - } - if (s.isRunning()) { - for (auto cl : clients) { - if (cl->isOpened()) { - PIString str = "ping_S"; - piCout << "[Server] send to" << cl << ":" << str; - cl->write(str.toByteArray()); - } - } - } - })); - CONNECTL(&c, threadedReadEvent, ([&](uchar * readed, int size){ - PIByteArray ba(readed, size); - if (size < 1024) { - PIString str = PIString(ba); - piCout << "[Client] data:" << str; - if (str == "ping_S") c.write(PIString("pong_S").toByteArray()); - } else piCout << "[Client] blob:" << size; - })); - CONNECTL(&c, connected, ([](){piCout << "connected";})); - CONNECTL(&c, disconnected, ([](){piCout << "disconnected";})); - CONNECTL(&s, newConnection, ([&](PICloudServer::Client * cl){ - piCout << "[Server] new client:" << cl; - clients << cl; - CONNECTL(cl, threadedReadEvent, ([&c, &s, cl, &rnd](uchar * readed, int size){ - PIByteArray ba(readed, size); - PIString str = PIString(ba); - piCout << "[Server] data from" << cl << ":" << str; - if (str == "ping") { - cl->write(PIString("pong").toByteArray()); - cl->write(rnd); - } - })); - CONNECTL(cl, closed, ([&clients, cl](){ - cl->stop(); - clients.removeAll(cl); - cl->deleteLater(); - })); - cl->startThreadedRead(); - })); - if (cli.hasArgument("name")) s.setServerName(cli.argumentValue("name")); - if (cli.hasArgument("connect")) { - c.setServerName(cli.argumentValue("connect")); - c.startThreadedRead(); - } else { - s.startThreadedRead(); - } - tm.start(1000); - PIKbdListener ls; - ls.enableExitCapture(PIKbdListener::F10); - ls.start(); - WAIT_FOR_EXIT + PIThreadNotifier n; + int cnt1 = 0; + int cnt2 = 0; + int cnt3 = 0; + PIThread t1([&n, &cnt1](){n.wait(); cnt1++; piMSleep(1);}, true); + PIThread t2([&n, &cnt2](){n.wait(); cnt2++; piMSleep(2);}, true); + piCout << "created"; + piMSleep(10); + piCout << "unlock" << cnt1 << cnt2 << cnt3; + n.notifyOnce(); cnt3++; + piMSleep(10); + piCout << "unlock" << cnt1 << cnt2 << cnt3; + n.notifyOnce(); cnt3++; + piMSleep(10); + piCout << "run" << cnt1 << cnt2 << cnt3; + PIThread t3([&n, &cnt3](){n.notifyOnce(); cnt3++; piMSleep(1);}, true); + piMSleep(20); + t3.stop(); + piMSleep(100); + piCout << "exit" << cnt1 << cnt2 << cnt3; +// m.unlock(); +// piMSleep(10); return 0; } diff --git a/main_picloud_test.cpp b/main_picloud_test.cpp new file mode 100644 index 00000000..296577fc --- /dev/null +++ b/main_picloud_test.cpp @@ -0,0 +1,73 @@ +#include "pip.h" + + +int main(int argc, char * argv[]) { + PIByteArray rnd; + rnd.resize(1024*1024, 'x'); + PICLI cli(argc, argv); + PITimer tm; + cli.addArgument("connect", true); + cli.addArgument("name", true); + PICloudClient c("127.0.0.1:10101"); +// c.setReopenEnabled(true); + PICloudServer s("127.0.0.1:10101"); + PIVector clients; + CONNECTL(&tm, tickEvent, ([&](void *, int){ + if (c.isConnected()) { + PIString str = "ping"; + piCout << "[Client] send:" << str; + c.write(str.toByteArray()); + } + if (s.isRunning()) { + for (auto cl : clients) { + if (cl->isOpened()) { + PIString str = "ping_S"; + piCout << "[Server] send to" << cl << ":" << str; + cl->write(str.toByteArray()); + } + } + } + })); + CONNECTL(&c, threadedReadEvent, ([&](uchar * readed, int size){ + PIByteArray ba(readed, size); + if (size < 1024) { + PIString str = PIString(ba); + piCout << "[Client] data:" << str; + if (str == "ping_S") c.write(PIString("pong_S").toByteArray()); + } else piCout << "[Client] blob:" << size; + })); + CONNECTL(&c, connected, ([](){piCout << "connected";})); + CONNECTL(&c, disconnected, ([](){piCout << "disconnected";})); + CONNECTL(&s, newConnection, ([&](PICloudServer::Client * cl){ + piCout << "[Server] new client:" << cl; + clients << cl; + CONNECTL(cl, threadedReadEvent, ([&c, &s, cl, &rnd](uchar * readed, int size){ + PIByteArray ba(readed, size); + PIString str = PIString(ba); + piCout << "[Server] data from" << cl << ":" << str; + if (str == "ping") { + cl->write(PIString("pong").toByteArray()); + cl->write(rnd); + } + })); + CONNECTL(cl, closed, ([&clients, cl](){ + cl->stop(); + clients.removeAll(cl); + cl->deleteLater(); + })); + cl->startThreadedRead(); + })); + if (cli.hasArgument("name")) s.setServerName(cli.argumentValue("name")); + if (cli.hasArgument("connect")) { + c.setServerName(cli.argumentValue("connect")); + c.startThreadedRead(); + } else { + s.startThreadedRead(); + } + tm.start(1000); + PIKbdListener ls; + ls.enableExitCapture(PIKbdListener::F10); + ls.start(); + WAIT_FOR_EXIT + return 0; +} diff --git a/tests/concurrent/ConditionVariableIntegrationTest.cpp b/tests/concurrent/ConditionVariableIntegrationTest.cpp index ad74b749..81bd6700 100644 --- a/tests/concurrent/ConditionVariableIntegrationTest.cpp +++ b/tests/concurrent/ConditionVariableIntegrationTest.cpp @@ -88,7 +88,7 @@ TEST_F(ConditionVariable, wait_is_protected_unblock_when_notifyOne) { // Missing unlock }); variable->notifyOne(); - msleep(WAIT_THREAD_TIME_MS); + piMSleep(WAIT_THREAD_TIME_MS); ASSERT_FALSE(m.tryLock()); } @@ -130,7 +130,7 @@ TEST_F(ConditionVariable, wait_condition_is_check_condition_when_notifyOne) { isConditionChecked = false; m.unlock(); variable->notifyOne(); - msleep(threadStartTime + 1); + piMSleep(threadStartTime + 1); m.lock(); ASSERT_TRUE(isConditionChecked); m.unlock();