remove msleep, clean PIConditionVariable, rewrite pipipelinethread, etc...

This commit is contained in:
Andrey
2021-10-29 16:52:03 +03:00
parent 21e03fc8cb
commit 6e5a5a6ade
16 changed files with 203 additions and 213 deletions

View File

@@ -25,6 +25,8 @@
#include "pithread.h"
#include "piqueue.h"
#include "piconditionvar.h"
template <typename Tin, typename Tout>
class PIPipelineThread : public PIThread
@@ -35,10 +37,10 @@ public:
cnt = 0;
max_size = 0;
wait_next_pipe = false;
next_overload = false;
}
~PIPipelineThread() {
stop();
cv.notifyAll();
if (!waitForFinish(1000)) {
piCoutObj << "terminating self thread";
terminate();
@@ -46,21 +48,27 @@ public:
}
template <typename T>
void connectTo(PIPipelineThread<Tout, T> * next) {
CONNECT2(void, Tout, bool *, this, calculated, next, enqueue)
CONNECT3(void, Tout, bool, bool *, this, calculated, next, enqueue)
}
EVENT2(calculated, const Tout &, v, bool *, overload)
void enqueue(const Tin &v) {enqueue(v, 0);}
EVENT_HANDLER2(void, enqueue, const Tin &, v, bool *, overload) {
EVENT3(calculated, const Tout &, v, bool, wait, bool *, overload)
EVENT_HANDLER3(void, enqueue, const Tin &, v, bool, wait, bool *, overload) {
mutex.lock();
//piCoutObj << "enque" << overload;
if (wait && max_size != 0) {
mutex_wait.lock();
while (in.size() >= max_size) cv_wait.wait(mutex_wait);
mutex_wait.unlock();
}
if (max_size == 0 || in.size() < max_size) {
in.enqueue(v);
cv.notifyAll();
if (overload) *overload = false;
} else {
if (overload) *overload = true;
}
mutex.unlock();
}
void enqueue(const Tin &v, bool wait = false) {enqueue(v, wait, nullptr);}
const ullong * counterPtr() const {return &cnt;}
ullong counter() const {return cnt;}
bool isEmpty() {
@@ -79,15 +87,18 @@ public:
}
void clear() {
mutex.lock();
mutex_wait.lock();
in.clear();
next_overload = false;
cv_wait.notifyAll();
mutex_wait.unlock();
mutex.unlock();
}
void stopCalc(int wait_delay = 100) {
if (isRunning()) {
stop();
cv.notifyAll();
if (!waitForFinish(wait_delay)) {
mutex_l.unlock();
mutex_last.unlock();
mutex.unlock();
terminate();
}
@@ -95,18 +106,14 @@ public:
}
Tout getLast() {
Tout ret;
mutex_l.lock();
mutex_last.lock();
ret = last;
mutex_l.unlock();
mutex_last.unlock();
return ret;
}
uint maxQueSize() {
uint ret;
mutex.lock();
ret = max_size;
mutex.unlock();
return ret;
return max_size;
}
void setMaxQueSize(uint count) {
@@ -127,39 +134,35 @@ protected:
private:
void begin() {cnt = 0;}
void run() {
//piCoutObj << "run ...";
mutex.lock();
if (in.isEmpty()) {
mutex.unlock();
piMSleep(10);
//piCoutObj << "run in empty";
return;
}
if (next_overload && wait_next_pipe) {
mutex.unlock();
//piCoutObj << "wait" << &next_overload;
calculated(last, &next_overload);
piMSleep(10);
} else {
Tin t = in.dequeue();
mutex.unlock();
bool ok = true;
Tout r = calc(t, ok);
if (ok) {
mutex_l.lock();
last = r;
mutex_l.unlock();
cnt++;
//piCoutObj << "calc ok" << &next_overload;
calculated(r, &next_overload);
while (in.isEmpty()) {
cv.wait(mutex);
if (terminating) {
mutex.unlock();
return;
}
}
mutex_wait.lock();
Tin t = in.dequeue();
mutex.unlock();
cv_wait.notifyAll();
mutex_wait.unlock();
bool ok = true;
Tout r = calc(t, ok);
if (ok) {
mutex_last.lock();
last = r;
mutex_last.unlock();
cnt++;
//piCoutObj << "calc ok";
calculated(r, wait_next_pipe);
}
//piCoutObj << "run ok";
}
PIMutex mutex;
PIMutex mutex_l;
PIMutex mutex, mutex_wait;
PIConditionVariable cv, cv_wait;
PIMutex mutex_last;
bool wait_next_pipe;
bool next_overload;
ullong cnt;
PIQueue<Tin> in;
Tout last;