//! \file pipipelinethread.h //! \ingroup Thread //! \brief //! \~english Class for creating multithread pipeline //! \~russian Класс для создания многопоточного конвейера //! //! \details //! \~english Pipeline thread for processing data through stages in separate threads. //! \~russian Конвейерный поток для обработки данных через этапы в отдельных потоках. /* PIP - Platform Independent Primitives Class for create multihread pipeline Andrey Bychkov work.a.b@yandex.ru This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #ifndef PIPIPELINETHREAD_H #define PIPIPELINETHREAD_H #include "piconditionvar.h" #include "piqueue.h" #include "pithread.h" //! \~english Pipeline thread template class //! \~russian Шаблонный класс конвейерного потока template class PIPipelineThread: public PIThread { PIOBJECT_SUBCLASS(PIPipelineThread, PIThread); public: //! \~english Constructs pipeline thread //! \~russian Создает конвейерный поток PIPipelineThread() { cnt = 0; max_size = 0; wait_next_pipe = false; } //! \~english Destroys pipeline thread //! \~russian Уничтожает конвейерный поток ~PIPipelineThread() { stop(); cv.notifyAll(); if (!waitForFinish(1000)) { piCoutObj << "terminating self thread"; terminate(); } } //! \~english Connect to next pipeline stage //! \~russian Подключает к следующему этапу конвейера template void connectTo(PIPipelineThread * next) { CONNECT3(void, Tout, bool, bool *, this, calculated, next, enqueue); } EVENT3(calculated, const Tout &, v, bool, wait, bool *, overload); EVENT_HANDLER3(void, enqueue, const Tin &, v, bool, wait, bool *, overload) { mutex.lock(); // piCoutObj << "enque" << overload; if (wait && max_size != 0) { mutex_wait.lock(); while (in.size() >= max_size) cv_wait.wait(mutex_wait); mutex_wait.unlock(); } if (max_size == 0 || in.size() < max_size) { in.enqueue(v); cv.notifyAll(); if (overload) *overload = false; } else { if (overload) *overload = true; } mutex.unlock(); } //! \~english Enqueue data for processing //! \~russian Добавляет данные в очередь на обработку void enqueue(const Tin & v, bool wait = false) { enqueue(v, wait, nullptr); } //! \~english Returns pointer to counter //! \~russian Возвращает указатель на счетчик const ullong * counterPtr() const { return &cnt; } //! \~english Returns items processed counter //! \~russian Возвращает количество обработанных элементов ullong counter() const { return cnt; } //! \~english Returns if input queue is empty //! \~russian Возвращает пустая ли входная очередь bool isEmpty() { bool ret; mutex.lock(); ret = in.isEmpty(); mutex.unlock(); return ret; } //! \~english Returns input queue size //! \~russian Возвращает размер входной очереди int queSize() { int ret; mutex.lock(); ret = in.size(); mutex.unlock(); return ret; } //! \~english Clear input queue //! \~russian Очищает входную очередь void clear() { mutex.lock(); mutex_wait.lock(); in.clear(); cv_wait.notifyAll(); mutex_wait.unlock(); mutex.unlock(); } //! \~english Stop calculation //! \~russian Останавливает вычисления void stopCalc(int wait_delay = 100) { if (isRunning()) { stop(); cv.notifyAll(); if (!waitForFinish(wait_delay)) { mutex_last.unlock(); mutex.unlock(); terminate(); } } } //! \~english Returns last processed result //! \~russian Возвращает последний обработанный результат Tout getLast() { Tout ret; mutex_last.lock(); ret = last; mutex_last.unlock(); return ret; } //! \~english Returns max queue size //! \~russian Возвращает максимальный размер очереди uint maxQueSize() { return max_size; } //! \~english Set max queue size //! \~russian Устанавливает максимальный размер очереди void setMaxQueSize(uint count) { mutex.lock(); max_size = count; if (max_size > 0 && in.size() > max_size) in.resize(max_size); mutex.unlock(); } //! \~english Returns if waiting for next pipeline //! \~russian Возвращает ожидает ли следующий конвейер bool isWaitNextPipe() { return wait_next_pipe; } //! \~english Set waiting for next pipeline //! \~russian Устанавливает ожидание следующего конвейера void setWaitNextPipe(bool wait) { wait_next_pipe = wait; } protected: //! \~english Processing function - must be implemented //! \~russian Функция обработки - должна быть реализована virtual Tout calc(Tin & v, bool & ok) = 0; uint max_size; private: void begin() override { cnt = 0; } void run() override { mutex.lock(); while (in.isEmpty()) { cv.wait(mutex); if (terminating) { mutex.unlock(); return; } } mutex_wait.lock(); Tin t = in.dequeue(); mutex.unlock(); cv_wait.notifyAll(); mutex_wait.unlock(); bool ok = true; Tout r = calc(t, ok); if (ok) { mutex_last.lock(); last = r; mutex_last.unlock(); cnt++; // piCoutObj << "calc ok"; calculated(r, wait_next_pipe); } // piCoutObj << "run ok"; } PIMutex mutex, mutex_wait; PIConditionVariable cv, cv_wait; PIMutex mutex_last; bool wait_next_pipe; ullong cnt; PIQueue in; Tout last; }; #endif // PIPIPELINETHREAD_H