/*! \file pipipelinethread.h * \ingroup Thread * \~\brief * \~english Class for create multihread pipeline * \~russian Класс для создания многопоточного конвейера */ /* PIP - Platform Independent Primitives Class for create multihread pipeline Andrey Bychkov work.a.b@yandex.ru This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #ifndef PIPIPELINETHREAD_H #define PIPIPELINETHREAD_H #include "pithread.h" #include "piqueue.h" #include "piconditionvar.h" template class PIPipelineThread : public PIThread { PIOBJECT_SUBCLASS(PIPipelineThread, PIThread); public: PIPipelineThread() { cnt = 0; max_size = 0; wait_next_pipe = false; } ~PIPipelineThread() { stop(); cv.notifyAll(); if (!waitForFinish(1000)) { piCoutObj << "terminating self thread"; terminate(); } } template void connectTo(PIPipelineThread * next) { CONNECT3(void, Tout, bool, bool *, this, calculated, next, enqueue); } EVENT3(calculated, const Tout &, v, bool, wait, bool *, overload); EVENT_HANDLER3(void, enqueue, const Tin &, v, bool, wait, bool *, overload) { mutex.lock(); //piCoutObj << "enque" << overload; if (wait && max_size != 0) { mutex_wait.lock(); while (in.size() >= max_size) cv_wait.wait(mutex_wait); mutex_wait.unlock(); } if (max_size == 0 || in.size() < max_size) { in.enqueue(v); cv.notifyAll(); if (overload) *overload = false; } else { if (overload) *overload = true; } mutex.unlock(); } void enqueue(const Tin &v, bool wait = false) {enqueue(v, wait, nullptr);} const ullong * counterPtr() const {return &cnt;} ullong counter() const {return cnt;} bool isEmpty() { bool ret; mutex.lock(); ret = in.isEmpty(); mutex.unlock(); return ret; } int queSize() { int ret; mutex.lock(); ret = in.size(); mutex.unlock(); return ret; } void clear() { mutex.lock(); mutex_wait.lock(); in.clear(); cv_wait.notifyAll(); mutex_wait.unlock(); mutex.unlock(); } void stopCalc(int wait_delay = 100) { if (isRunning()) { stop(); cv.notifyAll(); if (!waitForFinish(wait_delay)) { mutex_last.unlock(); mutex.unlock(); terminate(); } } } Tout getLast() { Tout ret; mutex_last.lock(); ret = last; mutex_last.unlock(); return ret; } uint maxQueSize() { return max_size; } void setMaxQueSize(uint count) { mutex.lock(); max_size = count; if (max_size > 0 && in.size() > max_size) in.resize(max_size); mutex.unlock(); } bool isWaitNextPipe() {return wait_next_pipe;} void setWaitNextPipe(bool wait) {wait_next_pipe = wait;} protected: virtual Tout calc(Tin &v, bool &ok) = 0; uint max_size; private: void begin() {cnt = 0;} void run() { mutex.lock(); while (in.isEmpty()) { cv.wait(mutex); if (terminating) { mutex.unlock(); return; } } mutex_wait.lock(); Tin t = in.dequeue(); mutex.unlock(); cv_wait.notifyAll(); mutex_wait.unlock(); bool ok = true; Tout r = calc(t, ok); if (ok) { mutex_last.lock(); last = r; mutex_last.unlock(); cnt++; //piCoutObj << "calc ok"; calculated(r, wait_next_pipe); } //piCoutObj << "run ok"; } PIMutex mutex, mutex_wait; PIConditionVariable cv, cv_wait; PIMutex mutex_last; bool wait_next_pipe; ullong cnt; PIQueue in; Tout last; }; #endif // PIPIPELINETHREAD_H