//! \~\file pipipelinethread.h //! \~\ingroup Thread //! \~\brief //! \~english Thread-based pipeline for multi-stage data processing //! \~russian Потоковый конвейер для многоэтапной обработки данных /* PIP - Platform Independent Primitives Class for create multihread pipeline Andrey Bychkov work.a.b@yandex.ru This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #ifndef PIPIPELINETHREAD_H #define PIPIPELINETHREAD_H #include "piconditionvar.h" #include "piqueue.h" #include "pithread.h" //! \~\ingroup Thread //! \~english Thread-based pipeline template class for multi-stage data processing //! \~russian Шаблонный класс потокового конвейера для многоэтапной обработки данных //! \~\details //! \~english //! Pipeline thread template class for processing data through multiple stages in separate threads. //! \~Each stage processes incoming data and passes it to the next stage via event notification. //! \~russian //! Шаблонный класс конвейерного потока для многоэтапной обработки данных в отдельных потоках. //! Каждый этап обрабатывает входные данные и передает их следующему этапу через event-уведомления. template class PIPipelineThread: public PIThread { PIOBJECT_SUBCLASS(PIPipelineThread, PIThread); public: //! \~english Constructs pipeline thread //! \~russian Создает конвейерный поток PIPipelineThread() { cnt = 0; max_size = 0; wait_next_pipe = false; } //! \~english Stops the stage thread and may terminate it forcibly if it does not finish in time. //! \~russian Останавливает поток стадии и может принудительно завершить его, если он не завершится вовремя. ~PIPipelineThread() { stop(); cv.notifyAll(); if (!waitForFinish(1000)) { piCoutObj << "terminating self thread"; terminate(); } } //! \~english Connects to next pipeline stage via event notification //! \~russian Подключает к следующему этапу конвейера через event-уведомления template void connectTo(PIPipelineThread * next) { CONNECT3(void, Tout, bool, bool *, this, calculated, next, enqueue); } //! \~\handlers //! \~\{ //! \~english Event handler for data enqueue //! \~russian Обработчик события добавления данных в очередь //! \~\details //! \~english //! For bounded queues, \a wait decides whether to block until space is available or to drop the item immediately. //! \~If \a overload is not null, it is set to \c true when the item is rejected because the queue is full. //! \~russian //! Для ограниченных очередей \a wait определяет, ждать ли освобождения места или сразу отбросить элемент. //! \~Если \a overload не равен null, он получает значение \c true, когда элемент отклонен из-за переполнения очереди. EVENT_HANDLER3(void, enqueue, const Tin &, v, bool, wait, bool *, overload) { mutex.lock(); // piCoutObj << "enque" << overload; if (wait && max_size != 0) { mutex_wait.lock(); while (in.size() >= max_size) cv_wait.wait(mutex_wait); mutex_wait.unlock(); } if (max_size == 0 || in.size() < max_size) { in.enqueue(v); cv.notifyAll(); if (overload) *overload = false; } else { if (overload) *overload = true; } mutex.unlock(); } //! \~\} //! \~\events //! \~\{ //! \~\fn void calculated(const Tout & v, bool wait, bool * overload) //! \~\brief //! \~english Emitted when processing result is ready //! \~russian Генерируется когда результат обработки готов //! \~\details //! \~english Raised after \a calc() succeeds. //! \~russian Вызывается после успешного завершения \a calc(). //! \~\note //! \~english The \a wait flag is propagated to a downstream stage and does not affect this stage's own queue. //! \~russian Флаг \a wait передается следующей стадии и не влияет на собственную очередь этой стадии. EVENT3(calculated, const Tout &, v, bool, wait, bool *, overload); //! \} //! \~english Enqueue data for processing //! \~russian Добавляет данные в очередь на обработку void enqueue(const Tin & v, bool wait = false) { enqueue(v, wait, nullptr); } //! \~english Returns pointer to counter //! \~russian Возвращает указатель на счетчик const ullong * counterPtr() const { return &cnt; } //! \~english Returns items processed counter //! \~russian Возвращает количество обработанных элементов ullong counter() const { return cnt; } //! \~english Returns whether the input queue is empty. //! \~russian Возвращает, пуста ли входная очередь. bool isEmpty() { bool ret; mutex.lock(); ret = in.isEmpty(); mutex.unlock(); return ret; } //! \~english Returns the current input queue size. //! \~russian Возвращает текущий размер входной очереди. int queSize() { int ret; mutex.lock(); ret = in.size(); mutex.unlock(); return ret; } //! \~english Clear input queue //! \~russian Очищает входную очередь void clear() { mutex.lock(); mutex_wait.lock(); in.clear(); cv_wait.notifyAll(); mutex_wait.unlock(); mutex.unlock(); } //! \~english Stops calculation and waits for thread finish //! \~russian Останавливает вычисления и ожидает завершения потока //! \~\note //! \~english If the stage does not stop within \a wait_delay, it may be terminated forcibly. //! \~russian Если стадия не остановится за \a wait_delay, она может быть принудительно завершена. void stopCalc(int wait_delay = 100) { if (isRunning()) { stop(); cv.notifyAll(); if (!waitForFinish(wait_delay)) { mutex_last.unlock(); mutex.unlock(); terminate(); } } } //! \~english Returns a copy of the last successfully calculated output. //! \~russian Возвращает копию последнего успешно вычисленного выхода. //! \~\note //! \~english This snapshot is separate from the input queue. //! \~russian Этот снимок хранится отдельно от входной очереди. Tout getLast() { Tout ret; mutex_last.lock(); ret = last; mutex_last.unlock(); return ret; } //! \~english Returns the configured input queue limit. //! \~russian Возвращает настроенный предел входной очереди. //! \~\note //! \~english Value \c 0 means the queue is unbounded. //! \~russian Значение \c 0 означает, что очередь не ограничена. uint maxQueSize() { return max_size; } //! \~english Sets the input queue limit. //! \~russian Устанавливает предел входной очереди. //! \~\note //! \~english Value \c 0 removes the limit. If the queue already exceeds the new limit, it is resized immediately. //! \~russian Значение \c 0 снимает ограничение. Если очередь уже превышает новый предел, она немедленно приводится к нему. void setMaxQueSize(uint count) { mutex.lock(); max_size = count; if (max_size > 0 && in.size() > max_size) in.resize(max_size); mutex.unlock(); } //! \~english Returns if waiting for next pipeline stage //! \~russian Возвращает ожидает ли следующий этап конвейера bool isWaitNextPipe() { return wait_next_pipe; } //! \~english Sets whether to wait for next pipeline stage //! \~russian Устанавливает флаг ожидания следующего этапа конвейера //! \~\details //! \~english Sets whether the \a calculated() signal requests waiting in the downstream stage. //! \~russian Устанавливает, должен ли сигнал \a calculated() запрашивать ожидание на следующей стадии. //! \~\note //! \~english This flag only affects downstream propagation and does not change how this stage accepts input. //! \~russian Этот флаг влияет только на передачу вниз по конвейеру и не меняет правила приема входа этой стадией. void setWaitNextPipe(bool wait) { wait_next_pipe = wait; } protected: //! \~english Calculates one output from queued input \a v. //! \~russian Вычисляет один выход из очередного входного элемента \a v. //! \~\note //! \~english Leave \a ok equal to \c true to publish the result, or set it to \c false to drop this input without forwarding. //! \~russian Оставьте \a ok равным \c true, чтобы опубликовать результат, или установите \a ok в \c false, чтобы отбросить этот вход //! без пересылки дальше. virtual Tout calc(Tin & v, bool & ok) = 0; //! \~english Maximum queue size (0 means unlimited) //! \~russian Максимальный размер очереди (0 означает без ограничений) uint max_size; private: void begin() override { cnt = 0; } void run() override { mutex.lock(); while (in.isEmpty()) { cv.wait(mutex); if (terminating) { mutex.unlock(); return; } } mutex_wait.lock(); Tin t = in.dequeue(); mutex.unlock(); cv_wait.notifyAll(); mutex_wait.unlock(); bool ok = true; Tout r = calc(t, ok); if (ok) { mutex_last.lock(); last = r; mutex_last.unlock(); cnt++; // piCoutObj << "calc ok"; calculated(r, wait_next_pipe); } // piCoutObj << "run ok"; } PIMutex mutex, mutex_wait; PIConditionVariable cv, cv_wait; PIMutex mutex_last; bool wait_next_pipe; ullong cnt; PIQueue in; Tout last; }; #endif // PIPIPELINETHREAD_H