174 lines
4.0 KiB
C++
174 lines
4.0 KiB
C++
/*! \file pipipelinethread.h
|
|
* \ingroup Thread
|
|
* \~\brief
|
|
* \~english Class for create multihread pipeline
|
|
* \~russian Класс для создания многопоточного конвейера
|
|
*/
|
|
/*
|
|
PIP - Platform Independent Primitives
|
|
Class for create multihread pipeline
|
|
Andrey Bychkov work.a.b@yandex.ru
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Lesser General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Lesser General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Lesser General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#ifndef PIPIPELINETHREAD_H
|
|
#define PIPIPELINETHREAD_H
|
|
|
|
#include "piconditionvar.h"
|
|
#include "piqueue.h"
|
|
#include "pithread.h"
|
|
|
|
|
|
template<typename Tin, typename Tout>
|
|
class PIPipelineThread: public PIThread {
|
|
PIOBJECT_SUBCLASS(PIPipelineThread, PIThread);
|
|
|
|
public:
|
|
PIPipelineThread() {
|
|
cnt = 0;
|
|
max_size = 0;
|
|
wait_next_pipe = false;
|
|
}
|
|
~PIPipelineThread() {
|
|
stop();
|
|
cv.notifyAll();
|
|
if (!waitForFinish(1000)) {
|
|
piCoutObj << "terminating self thread";
|
|
terminate();
|
|
}
|
|
}
|
|
template<typename T>
|
|
void connectTo(PIPipelineThread<Tout, T> * next) {
|
|
CONNECT3(void, Tout, bool, bool *, this, calculated, next, enqueue);
|
|
}
|
|
EVENT3(calculated, const Tout &, v, bool, wait, bool *, overload);
|
|
EVENT_HANDLER3(void, enqueue, const Tin &, v, bool, wait, bool *, overload) {
|
|
mutex.lock();
|
|
// piCoutObj << "enque" << overload;
|
|
if (wait && max_size != 0) {
|
|
mutex_wait.lock();
|
|
while (in.size() >= max_size)
|
|
cv_wait.wait(mutex_wait);
|
|
mutex_wait.unlock();
|
|
}
|
|
if (max_size == 0 || in.size() < max_size) {
|
|
in.enqueue(v);
|
|
cv.notifyAll();
|
|
if (overload) *overload = false;
|
|
} else {
|
|
if (overload) *overload = true;
|
|
}
|
|
mutex.unlock();
|
|
}
|
|
void enqueue(const Tin & v, bool wait = false) { enqueue(v, wait, nullptr); }
|
|
const ullong * counterPtr() const { return &cnt; }
|
|
ullong counter() const { return cnt; }
|
|
bool isEmpty() {
|
|
bool ret;
|
|
mutex.lock();
|
|
ret = in.isEmpty();
|
|
mutex.unlock();
|
|
return ret;
|
|
}
|
|
int queSize() {
|
|
int ret;
|
|
mutex.lock();
|
|
ret = in.size();
|
|
mutex.unlock();
|
|
return ret;
|
|
}
|
|
void clear() {
|
|
mutex.lock();
|
|
mutex_wait.lock();
|
|
in.clear();
|
|
cv_wait.notifyAll();
|
|
mutex_wait.unlock();
|
|
mutex.unlock();
|
|
}
|
|
void stopCalc(int wait_delay = 100) {
|
|
if (isRunning()) {
|
|
stop();
|
|
cv.notifyAll();
|
|
if (!waitForFinish(wait_delay)) {
|
|
mutex_last.unlock();
|
|
mutex.unlock();
|
|
terminate();
|
|
}
|
|
}
|
|
}
|
|
Tout getLast() {
|
|
Tout ret;
|
|
mutex_last.lock();
|
|
ret = last;
|
|
mutex_last.unlock();
|
|
return ret;
|
|
}
|
|
|
|
uint maxQueSize() { return max_size; }
|
|
|
|
void setMaxQueSize(uint count) {
|
|
mutex.lock();
|
|
max_size = count;
|
|
if (max_size > 0 && in.size() > max_size) in.resize(max_size);
|
|
mutex.unlock();
|
|
}
|
|
|
|
bool isWaitNextPipe() { return wait_next_pipe; }
|
|
void setWaitNextPipe(bool wait) { wait_next_pipe = wait; }
|
|
|
|
protected:
|
|
virtual Tout calc(Tin & v, bool & ok) = 0;
|
|
|
|
uint max_size;
|
|
|
|
private:
|
|
void begin() override { cnt = 0; }
|
|
void run() override {
|
|
mutex.lock();
|
|
while (in.isEmpty()) {
|
|
cv.wait(mutex);
|
|
if (terminating) {
|
|
mutex.unlock();
|
|
return;
|
|
}
|
|
}
|
|
mutex_wait.lock();
|
|
Tin t = in.dequeue();
|
|
mutex.unlock();
|
|
cv_wait.notifyAll();
|
|
mutex_wait.unlock();
|
|
bool ok = true;
|
|
Tout r = calc(t, ok);
|
|
if (ok) {
|
|
mutex_last.lock();
|
|
last = r;
|
|
mutex_last.unlock();
|
|
cnt++;
|
|
// piCoutObj << "calc ok";
|
|
calculated(r, wait_next_pipe);
|
|
}
|
|
// piCoutObj << "run ok";
|
|
}
|
|
PIMutex mutex, mutex_wait;
|
|
PIConditionVariable cv, cv_wait;
|
|
PIMutex mutex_last;
|
|
bool wait_next_pipe;
|
|
ullong cnt;
|
|
PIQueue<Tin> in;
|
|
Tout last;
|
|
};
|
|
|
|
#endif // PIPIPELINETHREAD_H
|