diff --git a/libs/main/core/piobject.cpp b/libs/main/core/piobject.cpp index 71749360..5a2c0eb6 100644 --- a/libs/main/core/piobject.cpp +++ b/libs/main/core/piobject.cpp @@ -698,28 +698,30 @@ bool PIObject::Connection::disconnect() { PRIVATE_DEFINITION_START(PIObject::Deleter) PIThread thread; PIConditionVariable cond_var; - PIMutex cond_mutex, queue_mutex; PIVector obj_queue; PRIVATE_DEFINITION_END(PIObject::Deleter) PIObject::Deleter::Deleter() { //piCout << "Deleter start ..."; - stopping = started = posted = false; - CONNECTL(&(PRIVATE->thread), started, [this](){proc();}); - PRIVATE->thread.startOnce(); - while (!started) piMSleep(PIP_MIN_MSLEEP); + PRIVATE->thread.setSlot([this](){ + PIVector oq; + PRIVATE->thread.lock(); + while(PRIVATE->obj_queue.isEmpty()) PRIVATE->cond_var.wait(PRIVATE->thread.mutex()); + oq.swap(PRIVATE->obj_queue); + PRIVATE->thread.unlock(); + for (PIObject * o : oq) deleteObject(o); + }); + PRIVATE->thread.start(); } PIObject::Deleter::~Deleter() { //piCout << "~Deleter ..."; - stopping = true; + PRIVATE->thread.stop(); PRIVATE->cond_var.notifyAll(); -#ifndef WINDOWS - while (PRIVATE->thread.isRunning()) piMSleep(PIP_MIN_MSLEEP); -#endif - deleteAll(); + PRIVATE->thread.waitForFinish(); + for (PIObject * o : PRIVATE->obj_queue) deleteObject(o); //piCout << "~Deleter ok"; } @@ -733,46 +735,13 @@ PIObject::Deleter * PIObject::Deleter::instance() { void PIObject::Deleter::post(PIObject * o) { if (!o->isPIObject()) return; //piCout << "[Deleter] post" << o << "..."; - PRIVATE->queue_mutex.lock(); - if (!PRIVATE->obj_queue.contains(o)) + PRIVATE->thread.lock(); + if (!PRIVATE->obj_queue.contains(o)) { PRIVATE->obj_queue << o; - PRIVATE->queue_mutex.unlock(); - PRIVATE->cond_var.notifyAll(); - posted = true; - //piCout << "[Deleter] post" << o << "done"; -} - - -void PIObject::Deleter::proc() { - //piCout << "[Deleter] proc start"; - while (!stopping) { - //piMSleep(1); - //piCout << "[Deleter] proc wait ..."; - if (posted) { - posted = false; - started = true; - } else { - PRIVATE->cond_mutex.lock(); - started = true; - PRIVATE->cond_var.wait(PRIVATE->cond_mutex); - PRIVATE->cond_mutex.unlock(); - } - //piCout << "[Deleter] proc wait done"; - deleteAll(); + PRIVATE->cond_var.notifyAll(); } - //piCout << "[Deleter] proc end ok"; -} - - -void PIObject::Deleter::deleteAll() { - PIVector oq; - PRIVATE->queue_mutex.lock(); - oq = PRIVATE->obj_queue; - //piCout << "[Deleter] deleteAll" << oq.size_s() << "..."; - PRIVATE->obj_queue.clear(); - PRIVATE->queue_mutex.unlock(); - piForeach (PIObject * o, oq) - deleteObject(o); + PRIVATE->thread.unlock(); + //piCout << "[Deleter] post" << o << "done"; } @@ -782,7 +751,7 @@ void PIObject::Deleter::deleteObject(PIObject * o) { //piCout << "[Deleter] delete" << (uintptr_t)o << "wait atomic ..."; while (o->isInEvent()) piMSleep(PIP_MIN_MSLEEP); //piCout << "[Deleter] delete" << (uintptr_t)o << "wait atomic done"; - if (o->isPIObject()) delete o; + delete o; } //piCout << "[Deleter] delete" << (uintptr_t)o << "done"; } diff --git a/libs/main/core/piobject.h b/libs/main/core/piobject.h index 72f54c48..5d11c0ed 100644 --- a/libs/main/core/piobject.h +++ b/libs/main/core/piobject.h @@ -503,10 +503,7 @@ private: static Deleter * instance(); void post(PIObject * o); private: - void proc(); - void deleteAll(); void deleteObject(PIObject * o); - std::atomic_bool stopping, started, posted; PRIVATE_DECLARATION(PIP_EXPORT) }; diff --git a/libs/main/thread/pithreadmodule.h b/libs/main/thread/pithreadmodule.h index 18706e05..e528c355 100644 --- a/libs/main/thread/pithreadmodule.h +++ b/libs/main/thread/pithreadmodule.h @@ -29,5 +29,6 @@ #include "piconditionvar.h" #include "pithreadpoolexecutor.h" #include "pithreadpoolloop.h" +#include "pithreadnotifier.h" #endif // PITHREADMODULE_H diff --git a/libs/main/thread/pithreadnotifier.cpp b/libs/main/thread/pithreadnotifier.cpp new file mode 100644 index 00000000..6b9ac3b6 --- /dev/null +++ b/libs/main/thread/pithreadnotifier.cpp @@ -0,0 +1,39 @@ +/* + PIP - Platform Independent Primitives + Class for simply notify and wait in different threads + Andrey Bychkov work.a.b@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "pithreadnotifier.h" + + +PIThreadNotifier::PIThreadNotifier() : cnt(0) {} + + +void PIThreadNotifier::wait() { + m.lock(); + while (cnt == 0) v.wait(m); + cnt--; + m.unlock(); +} + + +void PIThreadNotifier::notifyOnce() { + m.lock(); + cnt++; + v.notifyAll(); + m.unlock(); +} diff --git a/libs/main/thread/pithreadnotifier.h b/libs/main/thread/pithreadnotifier.h new file mode 100644 index 00000000..5755fc7c --- /dev/null +++ b/libs/main/thread/pithreadnotifier.h @@ -0,0 +1,52 @@ +/*! @file pithreadnotifier.h + * @brief Class for simply notify and wait in different threads +*/ +/* + PIP - Platform Independent Primitives + Class for simply notify and wait in different threads + Andrey Bychkov work.a.b@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef PITHREADNOTIFIER_H +#define PITHREADNOTIFIER_H + +#include "piconditionvar.h" + + +class PIP_EXPORT PIThreadNotifier { +public: + PIThreadNotifier(); + + //! Start waiting, return if other thread call \a notifyOnce(). + //! If \a notifyOnce() has been called before, then returns immediately. + //! If notifyOnce() has been called "n" times, then returns immediately "n" times, + //! but only if wait in one thread. + //! If many threads waiting, then If notifyOnce() has been called "n" times, + //! All threads total returns "n" times in random sequence. + void wait(); + + //! Notify one waiting thread, wich waiting on \a wait() function. + //! If many threads waiting, then notify randomly one. + //! If call this "n" times, then notify any waiting threads totally "n" times. + void notifyOnce(); + +private: + ullong cnt; + PIMutex m; + PIConditionVariable v; +}; + +#endif // PITHREADNOTIFIER_H diff --git a/main.cpp b/main.cpp index e3414108..6c91d54f 100644 --- a/main.cpp +++ b/main.cpp @@ -1,27 +1,5 @@ #include "pip.h" -class PIThreadNotifier { -public: - PIThreadNotifier(): cnt(0) {} - - void wait() { - m.lock(); - while (cnt == 0) v.wait(m); - cnt--; - m.unlock(); - } - void notifyOnce() { - m.lock(); - cnt++; - v.notifyAll(); - m.unlock(); - } -private: - ullong cnt; - PIMutex m; - PIConditionVariable v; -}; - int main(int argc, char * argv[]) { PIThreadNotifier n; diff --git a/tests/concurrent/ConditionVariableIntegrationTest.cpp b/tests/concurrent/ConditionVariableIntegrationTest.cpp index 81bd6700..b81accf1 100644 --- a/tests/concurrent/ConditionVariableIntegrationTest.cpp +++ b/tests/concurrent/ConditionVariableIntegrationTest.cpp @@ -198,6 +198,6 @@ TEST_F(ConditionVariable, waitFor_is_unblock_when_condition_and_notifyOne) { condition = true; m.unlock(); variable->notifyOne(); - msleep(WAIT_THREAD_TIME_MS); + piMSleep(WAIT_THREAD_TIME_MS); ASSERT_FALSE(thread->isRunning()); } diff --git a/tests/concurrent/pithreadnotifier_test.cpp b/tests/concurrent/pithreadnotifier_test.cpp new file mode 100644 index 00000000..0dd01a8a --- /dev/null +++ b/tests/concurrent/pithreadnotifier_test.cpp @@ -0,0 +1,34 @@ +#include "gtest/gtest.h" +#include "pithreadnotifier.h" + + +TEST(PIThreadNotifierTest, One) { + PIThreadNotifier n; + int cnt = 0; + PIThread t1([&n, &cnt](){n.wait(); cnt++;}, true); + piMSleep(10); + n.notifyOnce(); + piMSleep(10); + ASSERT_EQ(cnt, 1); + n.notifyOnce(); + piMSleep(10); + ASSERT_EQ(cnt, 2); +} + + +TEST(PIThreadNotifierTest, Two) { + PIThreadNotifier n; + int cnt1 = 0; + int cnt2 = 0; + int cnt3 = 0; + PIThread t1([&n, &cnt1](){n.wait(); cnt1++; piMSleep(2);}, true); + PIThread t2([&n, &cnt2](){n.wait(); cnt2++; piMSleep(2);}, true); + PIThread t3([&n, &cnt3](){n.notifyOnce(); cnt3++; piMSleep(1);}, true); + piMSleep(20); + t3.stop(true); + piMSleep(100); + t1.stop(); + t2.stop(); + ASSERT_EQ(cnt1+cnt2, cnt3); +} + diff --git a/tests/piobject/delete_later.cpp b/tests/piobject/delete_later.cpp new file mode 100644 index 00000000..78d72c4e --- /dev/null +++ b/tests/piobject/delete_later.cpp @@ -0,0 +1,51 @@ +#include "gtest/gtest.h" +#include "piobject.h" + +std::atomic obj_cnt; + +class Send: public PIObject { + PIOBJECT(Send) +public: + Send() {obj_cnt++;} + ~Send() {obj_cnt--;} + EVENT1(ev, PIObject * , o) +}; + + +class Recv: public PIObject { + PIOBJECT(Recv) +public: + Recv() {obj_cnt++;} + ~Recv() {obj_cnt--;} + EVENT_HANDLER1(void, eh, PIObject * , o) { + o->deleteLater(); + piMSleep(10); + } +}; + + +TEST(Piobject, deleteLater) { + obj_cnt = 0; + Send * s = new Send(); + Recv * r = new Recv(); + CONNECTU(s, ev, r, eh); + s->ev(r); + r->deleteLater(); + s->deleteLater(); + piMSleep(100); + ASSERT_EQ(obj_cnt, 0); + + PIVector s2; + s2.resize(100, new Send()); + for (auto o : s2) o->deleteLater(); + piMSleep(10); + ASSERT_EQ(obj_cnt, 0); + s2.clear(); + + PIVector r2; + r2.resize(100, [](size_t i){return new Recv();}); + for (auto o : r2) o->deleteLater(); + piMSleep(10); + ASSERT_EQ(obj_cnt, 0); + r2.clear(); +}