/*! \file pipipelinethread.h
* \ingroup Thread
* \~\brief
* \~english Class for create multihread pipeline
* \~russian Класс для создания многопоточного конвейера
*/
/*
PIP - Platform Independent Primitives
Class for create multihread pipeline
Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see .
*/
#ifndef PIPIPELINETHREAD_H
#define PIPIPELINETHREAD_H
#include "pithread.h"
#include "piqueue.h"
#include "piconditionvar.h"
template
class PIPipelineThread : public PIThread
{
PIOBJECT_SUBCLASS(PIPipelineThread, PIThread)
public:
PIPipelineThread() {
cnt = 0;
max_size = 0;
wait_next_pipe = false;
}
~PIPipelineThread() {
stop();
cv.notifyAll();
if (!waitForFinish(1000)) {
piCoutObj << "terminating self thread";
terminate();
}
}
template
void connectTo(PIPipelineThread * next) {
CONNECT3(void, Tout, bool, bool *, this, calculated, next, enqueue)
}
EVENT3(calculated, const Tout &, v, bool, wait, bool *, overload)
EVENT_HANDLER3(void, enqueue, const Tin &, v, bool, wait, bool *, overload) {
mutex.lock();
//piCoutObj << "enque" << overload;
if (wait && max_size != 0) {
mutex_wait.lock();
while (in.size() >= max_size) cv_wait.wait(mutex_wait);
mutex_wait.unlock();
}
if (max_size == 0 || in.size() < max_size) {
in.enqueue(v);
cv.notifyAll();
if (overload) *overload = false;
} else {
if (overload) *overload = true;
}
mutex.unlock();
}
void enqueue(const Tin &v, bool wait = false) {enqueue(v, wait, nullptr);}
const ullong * counterPtr() const {return &cnt;}
ullong counter() const {return cnt;}
bool isEmpty() {
bool ret;
mutex.lock();
ret = in.isEmpty();
mutex.unlock();
return ret;
}
int queSize() {
int ret;
mutex.lock();
ret = in.size();
mutex.unlock();
return ret;
}
void clear() {
mutex.lock();
mutex_wait.lock();
in.clear();
cv_wait.notifyAll();
mutex_wait.unlock();
mutex.unlock();
}
void stopCalc(int wait_delay = 100) {
if (isRunning()) {
stop();
cv.notifyAll();
if (!waitForFinish(wait_delay)) {
mutex_last.unlock();
mutex.unlock();
terminate();
}
}
}
Tout getLast() {
Tout ret;
mutex_last.lock();
ret = last;
mutex_last.unlock();
return ret;
}
uint maxQueSize() {
return max_size;
}
void setMaxQueSize(uint count) {
mutex.lock();
max_size = count;
if (max_size > 0 && in.size() > max_size) in.resize(max_size);
mutex.unlock();
}
bool isWaitNextPipe() {return wait_next_pipe;}
void setWaitNextPipe(bool wait) {wait_next_pipe = wait;}
protected:
virtual Tout calc(Tin &v, bool &ok) = 0;
uint max_size;
private:
void begin() {cnt = 0;}
void run() {
mutex.lock();
while (in.isEmpty()) {
cv.wait(mutex);
if (terminating) {
mutex.unlock();
return;
}
}
mutex_wait.lock();
Tin t = in.dequeue();
mutex.unlock();
cv_wait.notifyAll();
mutex_wait.unlock();
bool ok = true;
Tout r = calc(t, ok);
if (ok) {
mutex_last.lock();
last = r;
mutex_last.unlock();
cnt++;
//piCoutObj << "calc ok";
calculated(r, wait_next_pipe);
}
//piCoutObj << "run ok";
}
PIMutex mutex, mutex_wait;
PIConditionVariable cv, cv_wait;
PIMutex mutex_last;
bool wait_next_pipe;
ullong cnt;
PIQueue in;
Tout last;
};
#endif // PIPIPELINETHREAD_H