PIThreadNotifier, rewrite PIObject::deleteLater()

tests for PIThreadNotifier and PIObject::deleteLater()
This commit is contained in:
Andrey
2021-10-29 18:20:48 +03:00
parent 6e5a5a6ade
commit 48c885e12a
9 changed files with 196 additions and 75 deletions

View File

@@ -698,28 +698,30 @@ bool PIObject::Connection::disconnect() {
PRIVATE_DEFINITION_START(PIObject::Deleter) PRIVATE_DEFINITION_START(PIObject::Deleter)
PIThread thread; PIThread thread;
PIConditionVariable cond_var; PIConditionVariable cond_var;
PIMutex cond_mutex, queue_mutex;
PIVector<PIObject*> obj_queue; PIVector<PIObject*> obj_queue;
PRIVATE_DEFINITION_END(PIObject::Deleter) PRIVATE_DEFINITION_END(PIObject::Deleter)
PIObject::Deleter::Deleter() { PIObject::Deleter::Deleter() {
//piCout << "Deleter start ..."; //piCout << "Deleter start ...";
stopping = started = posted = false; PRIVATE->thread.setSlot([this](){
CONNECTL(&(PRIVATE->thread), started, [this](){proc();}); PIVector<PIObject*> oq;
PRIVATE->thread.startOnce(); PRIVATE->thread.lock();
while (!started) piMSleep(PIP_MIN_MSLEEP); while(PRIVATE->obj_queue.isEmpty()) PRIVATE->cond_var.wait(PRIVATE->thread.mutex());
oq.swap(PRIVATE->obj_queue);
PRIVATE->thread.unlock();
for (PIObject * o : oq) deleteObject(o);
});
PRIVATE->thread.start();
} }
PIObject::Deleter::~Deleter() { PIObject::Deleter::~Deleter() {
//piCout << "~Deleter ..."; //piCout << "~Deleter ...";
stopping = true; PRIVATE->thread.stop();
PRIVATE->cond_var.notifyAll(); PRIVATE->cond_var.notifyAll();
#ifndef WINDOWS PRIVATE->thread.waitForFinish();
while (PRIVATE->thread.isRunning()) piMSleep(PIP_MIN_MSLEEP); for (PIObject * o : PRIVATE->obj_queue) deleteObject(o);
#endif
deleteAll();
//piCout << "~Deleter ok"; //piCout << "~Deleter ok";
} }
@@ -733,56 +735,23 @@ PIObject::Deleter * PIObject::Deleter::instance() {
void PIObject::Deleter::post(PIObject * o) { void PIObject::Deleter::post(PIObject * o) {
if (!o->isPIObject()) return; if (!o->isPIObject()) return;
//piCout << "[Deleter] post" << o << "..."; //piCout << "[Deleter] post" << o << "...";
PRIVATE->queue_mutex.lock(); PRIVATE->thread.lock();
if (!PRIVATE->obj_queue.contains(o)) if (!PRIVATE->obj_queue.contains(o)) {
PRIVATE->obj_queue << o; PRIVATE->obj_queue << o;
PRIVATE->queue_mutex.unlock();
PRIVATE->cond_var.notifyAll(); PRIVATE->cond_var.notifyAll();
posted = true; }
PRIVATE->thread.unlock();
//piCout << "[Deleter] post" << o << "done"; //piCout << "[Deleter] post" << o << "done";
} }
void PIObject::Deleter::proc() {
//piCout << "[Deleter] proc start";
while (!stopping) {
//piMSleep(1);
//piCout << "[Deleter] proc wait ...";
if (posted) {
posted = false;
started = true;
} else {
PRIVATE->cond_mutex.lock();
started = true;
PRIVATE->cond_var.wait(PRIVATE->cond_mutex);
PRIVATE->cond_mutex.unlock();
}
//piCout << "[Deleter] proc wait done";
deleteAll();
}
//piCout << "[Deleter] proc end ok";
}
void PIObject::Deleter::deleteAll() {
PIVector<PIObject*> oq;
PRIVATE->queue_mutex.lock();
oq = PRIVATE->obj_queue;
//piCout << "[Deleter] deleteAll" << oq.size_s() << "...";
PRIVATE->obj_queue.clear();
PRIVATE->queue_mutex.unlock();
piForeach (PIObject * o, oq)
deleteObject(o);
}
void PIObject::Deleter::deleteObject(PIObject * o) { void PIObject::Deleter::deleteObject(PIObject * o) {
//piCout << "[Deleter] delete" << (uintptr_t)o << "..."; //piCout << "[Deleter] delete" << (uintptr_t)o << "...";
if (o->isPIObject()) { if (o->isPIObject()) {
//piCout << "[Deleter] delete" << (uintptr_t)o << "wait atomic ..."; //piCout << "[Deleter] delete" << (uintptr_t)o << "wait atomic ...";
while (o->isInEvent()) piMSleep(PIP_MIN_MSLEEP); while (o->isInEvent()) piMSleep(PIP_MIN_MSLEEP);
//piCout << "[Deleter] delete" << (uintptr_t)o << "wait atomic done"; //piCout << "[Deleter] delete" << (uintptr_t)o << "wait atomic done";
if (o->isPIObject()) delete o; delete o;
} }
//piCout << "[Deleter] delete" << (uintptr_t)o << "done"; //piCout << "[Deleter] delete" << (uintptr_t)o << "done";
} }

View File

@@ -503,10 +503,7 @@ private:
static Deleter * instance(); static Deleter * instance();
void post(PIObject * o); void post(PIObject * o);
private: private:
void proc();
void deleteAll();
void deleteObject(PIObject * o); void deleteObject(PIObject * o);
std::atomic_bool stopping, started, posted;
PRIVATE_DECLARATION(PIP_EXPORT) PRIVATE_DECLARATION(PIP_EXPORT)
}; };

View File

@@ -29,5 +29,6 @@
#include "piconditionvar.h" #include "piconditionvar.h"
#include "pithreadpoolexecutor.h" #include "pithreadpoolexecutor.h"
#include "pithreadpoolloop.h" #include "pithreadpoolloop.h"
#include "pithreadnotifier.h"
#endif // PITHREADMODULE_H #endif // PITHREADMODULE_H

View File

@@ -0,0 +1,39 @@
/*
PIP - Platform Independent Primitives
Class for simply notify and wait in different threads
Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pithreadnotifier.h"
PIThreadNotifier::PIThreadNotifier() : cnt(0) {}
void PIThreadNotifier::wait() {
m.lock();
while (cnt == 0) v.wait(m);
cnt--;
m.unlock();
}
void PIThreadNotifier::notifyOnce() {
m.lock();
cnt++;
v.notifyAll();
m.unlock();
}

View File

@@ -0,0 +1,52 @@
/*! @file pithreadnotifier.h
* @brief Class for simply notify and wait in different threads
*/
/*
PIP - Platform Independent Primitives
Class for simply notify and wait in different threads
Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PITHREADNOTIFIER_H
#define PITHREADNOTIFIER_H
#include "piconditionvar.h"
class PIP_EXPORT PIThreadNotifier {
public:
PIThreadNotifier();
//! Start waiting, return if other thread call \a notifyOnce().
//! If \a notifyOnce() has been called before, then returns immediately.
//! If notifyOnce() has been called "n" times, then returns immediately "n" times,
//! but only if wait in one thread.
//! If many threads waiting, then If notifyOnce() has been called "n" times,
//! All threads total returns "n" times in random sequence.
void wait();
//! Notify one waiting thread, wich waiting on \a wait() function.
//! If many threads waiting, then notify randomly one.
//! If call this "n" times, then notify any waiting threads totally "n" times.
void notifyOnce();
private:
ullong cnt;
PIMutex m;
PIConditionVariable v;
};
#endif // PITHREADNOTIFIER_H

View File

@@ -1,27 +1,5 @@
#include "pip.h" #include "pip.h"
class PIThreadNotifier {
public:
PIThreadNotifier(): cnt(0) {}
void wait() {
m.lock();
while (cnt == 0) v.wait(m);
cnt--;
m.unlock();
}
void notifyOnce() {
m.lock();
cnt++;
v.notifyAll();
m.unlock();
}
private:
ullong cnt;
PIMutex m;
PIConditionVariable v;
};
int main(int argc, char * argv[]) { int main(int argc, char * argv[]) {
PIThreadNotifier n; PIThreadNotifier n;

View File

@@ -198,6 +198,6 @@ TEST_F(ConditionVariable, waitFor_is_unblock_when_condition_and_notifyOne) {
condition = true; condition = true;
m.unlock(); m.unlock();
variable->notifyOne(); variable->notifyOne();
msleep(WAIT_THREAD_TIME_MS); piMSleep(WAIT_THREAD_TIME_MS);
ASSERT_FALSE(thread->isRunning()); ASSERT_FALSE(thread->isRunning());
} }

View File

@@ -0,0 +1,34 @@
#include "gtest/gtest.h"
#include "pithreadnotifier.h"
TEST(PIThreadNotifierTest, One) {
PIThreadNotifier n;
int cnt = 0;
PIThread t1([&n, &cnt](){n.wait(); cnt++;}, true);
piMSleep(10);
n.notifyOnce();
piMSleep(10);
ASSERT_EQ(cnt, 1);
n.notifyOnce();
piMSleep(10);
ASSERT_EQ(cnt, 2);
}
TEST(PIThreadNotifierTest, Two) {
PIThreadNotifier n;
int cnt1 = 0;
int cnt2 = 0;
int cnt3 = 0;
PIThread t1([&n, &cnt1](){n.wait(); cnt1++; piMSleep(2);}, true);
PIThread t2([&n, &cnt2](){n.wait(); cnt2++; piMSleep(2);}, true);
PIThread t3([&n, &cnt3](){n.notifyOnce(); cnt3++; piMSleep(1);}, true);
piMSleep(20);
t3.stop(true);
piMSleep(100);
t1.stop();
t2.stop();
ASSERT_EQ(cnt1+cnt2, cnt3);
}

View File

@@ -0,0 +1,51 @@
#include "gtest/gtest.h"
#include "piobject.h"
std::atomic<int> obj_cnt;
class Send: public PIObject {
PIOBJECT(Send)
public:
Send() {obj_cnt++;}
~Send() {obj_cnt--;}
EVENT1(ev, PIObject * , o)
};
class Recv: public PIObject {
PIOBJECT(Recv)
public:
Recv() {obj_cnt++;}
~Recv() {obj_cnt--;}
EVENT_HANDLER1(void, eh, PIObject * , o) {
o->deleteLater();
piMSleep(10);
}
};
TEST(Piobject, deleteLater) {
obj_cnt = 0;
Send * s = new Send();
Recv * r = new Recv();
CONNECTU(s, ev, r, eh);
s->ev(r);
r->deleteLater();
s->deleteLater();
piMSleep(100);
ASSERT_EQ(obj_cnt, 0);
PIVector<Send *> s2;
s2.resize(100, new Send());
for (auto o : s2) o->deleteLater();
piMSleep(10);
ASSERT_EQ(obj_cnt, 0);
s2.clear();
PIVector<Recv *> r2;
r2.resize(100, [](size_t i){return new Recv();});
for (auto o : r2) o->deleteLater();
piMSleep(10);
ASSERT_EQ(obj_cnt, 0);
r2.clear();
}