//! \~\file pipipelinethread.h
//! \~\ingroup Thread
//! \~\brief
//! \~english Thread-based pipeline for multi-stage data processing
//! \~russian Потоковый конвейер для многоэтапной обработки данных
/*
PIP - Platform Independent Primitives
Class for create multihread pipeline
Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see .
*/
#ifndef PIPIPELINETHREAD_H
#define PIPIPELINETHREAD_H
#include "piconditionvar.h"
#include "piqueue.h"
#include "pithread.h"
//! \~\ingroup Thread
//! \~english Thread-based pipeline template class for multi-stage data processing
//! \~russian Шаблонный класс потокового конвейера для многоэтапной обработки данных
//! \~\details
//! \~english
//! Pipeline thread template class for processing data through multiple stages in separate threads.
//! \~Each stage processes incoming data and passes it to the next stage via event notification.
//! \~russian
//! Шаблонный класс конвейерного потока для многоэтапной обработки данных в отдельных потоках.
//! Каждый этап обрабатывает входные данные и передает их следующему этапу через event-уведомления.
template
class PIPipelineThread: public PIThread {
PIOBJECT_SUBCLASS(PIPipelineThread, PIThread);
public:
//! \~english Constructs pipeline thread
//! \~russian Создает конвейерный поток
PIPipelineThread() {
cnt = 0;
max_size = 0;
wait_next_pipe = false;
}
//! \~english Stops the stage thread and may terminate it forcibly if it does not finish in time.
//! \~russian Останавливает поток стадии и может принудительно завершить его, если он не завершится вовремя.
~PIPipelineThread() {
stop();
cv.notifyAll();
if (!waitForFinish(1000)) {
piCoutObj << "terminating self thread";
terminate();
}
}
//! \~english Connects to next pipeline stage via event notification
//! \~russian Подключает к следующему этапу конвейера через event-уведомления
template
void connectTo(PIPipelineThread * next) {
CONNECT3(void, Tout, bool, bool *, this, calculated, next, enqueue);
}
//! \~\handlers
//! \~\{
//! \~english Event handler for data enqueue
//! \~russian Обработчик события добавления данных в очередь
//! \~\details
//! \~english
//! For bounded queues, \a wait decides whether to block until space is available or to drop the item immediately.
//! \~If \a overload is not null, it is set to \c true when the item is rejected because the queue is full.
//! \~russian
//! Для ограниченных очередей \a wait определяет, ждать ли освобождения места или сразу отбросить элемент.
//! \~Если \a overload не равен null, он получает значение \c true, когда элемент отклонен из-за переполнения очереди.
EVENT_HANDLER3(void, enqueue, const Tin &, v, bool, wait, bool *, overload) {
mutex.lock();
// piCoutObj << "enque" << overload;
if (wait && max_size != 0) {
mutex_wait.lock();
while (in.size() >= max_size)
cv_wait.wait(mutex_wait);
mutex_wait.unlock();
}
if (max_size == 0 || in.size() < max_size) {
in.enqueue(v);
cv.notifyAll();
if (overload) *overload = false;
} else {
if (overload) *overload = true;
}
mutex.unlock();
}
//! \~\}
//! \~\events
//! \~\{
//! \~\fn void calculated(const Tout & v, bool wait, bool * overload)
//! \~\brief
//! \~english Emitted when processing result is ready
//! \~russian Генерируется когда результат обработки готов
//! \~\details
//! \~english Raised after \a calc() succeeds.
//! \~russian Вызывается после успешного завершения \a calc().
//! \~\note
//! \~english The \a wait flag is propagated to a downstream stage and does not affect this stage's own queue.
//! \~russian Флаг \a wait передается следующей стадии и не влияет на собственную очередь этой стадии.
EVENT3(calculated, const Tout &, v, bool, wait, bool *, overload);
//! \}
//! \~english Enqueue data for processing
//! \~russian Добавляет данные в очередь на обработку
void enqueue(const Tin & v, bool wait = false) { enqueue(v, wait, nullptr); }
//! \~english Returns pointer to counter
//! \~russian Возвращает указатель на счетчик
const ullong * counterPtr() const { return &cnt; }
//! \~english Returns items processed counter
//! \~russian Возвращает количество обработанных элементов
ullong counter() const { return cnt; }
//! \~english Returns whether the input queue is empty.
//! \~russian Возвращает, пуста ли входная очередь.
bool isEmpty() {
bool ret;
mutex.lock();
ret = in.isEmpty();
mutex.unlock();
return ret;
}
//! \~english Returns the current input queue size.
//! \~russian Возвращает текущий размер входной очереди.
int queSize() {
int ret;
mutex.lock();
ret = in.size();
mutex.unlock();
return ret;
}
//! \~english Clear input queue
//! \~russian Очищает входную очередь
void clear() {
mutex.lock();
mutex_wait.lock();
in.clear();
cv_wait.notifyAll();
mutex_wait.unlock();
mutex.unlock();
}
//! \~english Stops calculation and waits for thread finish
//! \~russian Останавливает вычисления и ожидает завершения потока
//! \~\note
//! \~english If the stage does not stop within \a wait_delay, it may be terminated forcibly.
//! \~russian Если стадия не остановится за \a wait_delay, она может быть принудительно завершена.
void stopCalc(int wait_delay = 100) {
if (isRunning()) {
stop();
cv.notifyAll();
if (!waitForFinish(wait_delay)) {
mutex_last.unlock();
mutex.unlock();
terminate();
}
}
}
//! \~english Returns a copy of the last successfully calculated output.
//! \~russian Возвращает копию последнего успешно вычисленного выхода.
//! \~\note
//! \~english This snapshot is separate from the input queue.
//! \~russian Этот снимок хранится отдельно от входной очереди.
Tout getLast() {
Tout ret;
mutex_last.lock();
ret = last;
mutex_last.unlock();
return ret;
}
//! \~english Returns the configured input queue limit.
//! \~russian Возвращает настроенный предел входной очереди.
//! \~\note
//! \~english Value \c 0 means the queue is unbounded.
//! \~russian Значение \c 0 означает, что очередь не ограничена.
uint maxQueSize() { return max_size; }
//! \~english Sets the input queue limit.
//! \~russian Устанавливает предел входной очереди.
//! \~\note
//! \~english Value \c 0 removes the limit. If the queue already exceeds the new limit, it is resized immediately.
//! \~russian Значение \c 0 снимает ограничение. Если очередь уже превышает новый предел, она немедленно приводится к нему.
void setMaxQueSize(uint count) {
mutex.lock();
max_size = count;
if (max_size > 0 && in.size() > max_size) in.resize(max_size);
mutex.unlock();
}
//! \~english Returns if waiting for next pipeline stage
//! \~russian Возвращает ожидает ли следующий этап конвейера
bool isWaitNextPipe() { return wait_next_pipe; }
//! \~english Sets whether to wait for next pipeline stage
//! \~russian Устанавливает флаг ожидания следующего этапа конвейера
//! \~\details
//! \~english Sets whether the \a calculated() signal requests waiting in the downstream stage.
//! \~russian Устанавливает, должен ли сигнал \a calculated() запрашивать ожидание на следующей стадии.
//! \~\note
//! \~english This flag only affects downstream propagation and does not change how this stage accepts input.
//! \~russian Этот флаг влияет только на передачу вниз по конвейеру и не меняет правила приема входа этой стадией.
void setWaitNextPipe(bool wait) { wait_next_pipe = wait; }
protected:
//! \~english Calculates one output from queued input \a v.
//! \~russian Вычисляет один выход из очередного входного элемента \a v.
//! \~\note
//! \~english Leave \a ok equal to \c true to publish the result, or set it to \c false to drop this input without forwarding.
//! \~russian Оставьте \a ok равным \c true, чтобы опубликовать результат, или установите \a ok в \c false, чтобы отбросить этот вход
//! без пересылки дальше.
virtual Tout calc(Tin & v, bool & ok) = 0;
//! \~english Maximum queue size (0 means unlimited)
//! \~russian Максимальный размер очереди (0 означает без ограничений)
uint max_size;
private:
void begin() override { cnt = 0; }
void run() override {
mutex.lock();
while (in.isEmpty()) {
cv.wait(mutex);
if (terminating) {
mutex.unlock();
return;
}
}
mutex_wait.lock();
Tin t = in.dequeue();
mutex.unlock();
cv_wait.notifyAll();
mutex_wait.unlock();
bool ok = true;
Tout r = calc(t, ok);
if (ok) {
mutex_last.lock();
last = r;
mutex_last.unlock();
cnt++;
// piCoutObj << "calc ok";
calculated(r, wait_next_pipe);
}
// piCoutObj << "run ok";
}
PIMutex mutex, mutex_wait;
PIConditionVariable cv, cv_wait;
PIMutex mutex_last;
bool wait_next_pipe;
ullong cnt;
PIQueue in;
Tout last;
};
#endif // PIPIPELINETHREAD_H