Files
pip/libs/main/thread/pipipelinethread.h
2026-03-12 14:46:57 +03:00

286 lines
11 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! \~\file pipipelinethread.h
//! \~\ingroup Thread
//! \~\brief
//! \~english Thread-based pipeline for multi-stage data processing
//! \~russian Потоковый конвейер для многоэтапной обработки данных
/*
PIP - Platform Independent Primitives
Class for create multihread pipeline
Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PIPIPELINETHREAD_H
#define PIPIPELINETHREAD_H
#include "piconditionvar.h"
#include "piqueue.h"
#include "pithread.h"
//! \~\ingroup Thread
//! \~english Thread-based pipeline template class for multi-stage data processing
//! \~russian Шаблонный класс потокового конвейера для многоэтапной обработки данных
//! \~\details
//! \~english
//! Pipeline thread template class for processing data through multiple stages in separate threads.
//! \~Each stage processes incoming data and passes it to the next stage via event notification.
//! \~russian
//! Шаблонный класс конвейерного потока для многоэтапной обработки данных в отдельных потоках.
//! Каждый этап обрабатывает входные данные и передает их следующему этапу через event-уведомления.
template<typename Tin, typename Tout>
class PIPipelineThread: public PIThread {
PIOBJECT_SUBCLASS(PIPipelineThread, PIThread);
public:
//! \~english Constructs pipeline thread
//! \~russian Создает конвейерный поток
PIPipelineThread() {
cnt = 0;
max_size = 0;
wait_next_pipe = false;
}
//! \~english Stops the stage thread and may terminate it forcibly if it does not finish in time.
//! \~russian Останавливает поток стадии и может принудительно завершить его, если он не завершится вовремя.
~PIPipelineThread() {
stop();
cv.notifyAll();
if (!waitForFinish(1000)) {
piCoutObj << "terminating self thread";
terminate();
}
}
//! \~english Connects to next pipeline stage via event notification
//! \~russian Подключает к следующему этапу конвейера через event-уведомления
template<typename T>
void connectTo(PIPipelineThread<Tout, T> * next) {
CONNECT3(void, Tout, bool, bool *, this, calculated, next, enqueue);
}
//! \~\handlers
//! \~\{
//! \~english Event handler for data enqueue
//! \~russian Обработчик события добавления данных в очередь
//! \~\details
//! \~english
//! For bounded queues, \a wait decides whether to block until space is available or to drop the item immediately.
//! \~If \a overload is not null, it is set to \c true when the item is rejected because the queue is full.
//! \~russian
//! Для ограниченных очередей \a wait определяет, ждать ли освобождения места или сразу отбросить элемент.
//! \~Если \a overload не равен null, он получает значение \c true, когда элемент отклонен из-за переполнения очереди.
EVENT_HANDLER3(void, enqueue, const Tin &, v, bool, wait, bool *, overload) {
mutex.lock();
// piCoutObj << "enque" << overload;
if (wait && max_size != 0) {
mutex_wait.lock();
while (in.size() >= max_size)
cv_wait.wait(mutex_wait);
mutex_wait.unlock();
}
if (max_size == 0 || in.size() < max_size) {
in.enqueue(v);
cv.notifyAll();
if (overload) *overload = false;
} else {
if (overload) *overload = true;
}
mutex.unlock();
}
//! \~\}
//! \~\events
//! \~\{
//! \~\fn void calculated(const Tout & v, bool wait, bool * overload)
//! \~\brief
//! \~english Emitted when processing result is ready
//! \~russian Генерируется когда результат обработки готов
//! \~\details
//! \~english Raised after \a calc() succeeds.
//! \~russian Вызывается после успешного завершения \a calc().
//! \~\note
//! \~english The \a wait flag is propagated to a downstream stage and does not affect this stage's own queue.
//! \~russian Флаг \a wait передается следующей стадии и не влияет на собственную очередь этой стадии.
EVENT3(calculated, const Tout &, v, bool, wait, bool *, overload);
//! \}
//! \~english Enqueue data for processing
//! \~russian Добавляет данные в очередь на обработку
void enqueue(const Tin & v, bool wait = false) { enqueue(v, wait, nullptr); }
//! \~english Returns pointer to counter
//! \~russian Возвращает указатель на счетчик
const ullong * counterPtr() const { return &cnt; }
//! \~english Returns items processed counter
//! \~russian Возвращает количество обработанных элементов
ullong counter() const { return cnt; }
//! \~english Returns whether the input queue is empty.
//! \~russian Возвращает, пуста ли входная очередь.
bool isEmpty() {
bool ret;
mutex.lock();
ret = in.isEmpty();
mutex.unlock();
return ret;
}
//! \~english Returns the current input queue size.
//! \~russian Возвращает текущий размер входной очереди.
int queSize() {
int ret;
mutex.lock();
ret = in.size();
mutex.unlock();
return ret;
}
//! \~english Clear input queue
//! \~russian Очищает входную очередь
void clear() {
mutex.lock();
mutex_wait.lock();
in.clear();
cv_wait.notifyAll();
mutex_wait.unlock();
mutex.unlock();
}
//! \~english Stops calculation and waits for thread finish
//! \~russian Останавливает вычисления и ожидает завершения потока
//! \~\note
//! \~english If the stage does not stop within \a wait_delay, it may be terminated forcibly.
//! \~russian Если стадия не остановится за \a wait_delay, она может быть принудительно завершена.
void stopCalc(int wait_delay = 100) {
if (isRunning()) {
stop();
cv.notifyAll();
if (!waitForFinish(wait_delay)) {
mutex_last.unlock();
mutex.unlock();
terminate();
}
}
}
//! \~english Returns a copy of the last successfully calculated output.
//! \~russian Возвращает копию последнего успешно вычисленного выхода.
//! \~\note
//! \~english This snapshot is separate from the input queue.
//! \~russian Этот снимок хранится отдельно от входной очереди.
Tout getLast() {
Tout ret;
mutex_last.lock();
ret = last;
mutex_last.unlock();
return ret;
}
//! \~english Returns the configured input queue limit.
//! \~russian Возвращает настроенный предел входной очереди.
//! \~\note
//! \~english Value \c 0 means the queue is unbounded.
//! \~russian Значение \c 0 означает, что очередь не ограничена.
uint maxQueSize() { return max_size; }
//! \~english Sets the input queue limit.
//! \~russian Устанавливает предел входной очереди.
//! \~\note
//! \~english Value \c 0 removes the limit. If the queue already exceeds the new limit, it is resized immediately.
//! \~russian Значение \c 0 снимает ограничение. Если очередь уже превышает новый предел, она немедленно приводится к нему.
void setMaxQueSize(uint count) {
mutex.lock();
max_size = count;
if (max_size > 0 && in.size() > max_size) in.resize(max_size);
mutex.unlock();
}
//! \~english Returns if waiting for next pipeline stage
//! \~russian Возвращает ожидает ли следующий этап конвейера
bool isWaitNextPipe() { return wait_next_pipe; }
//! \~english Sets whether to wait for next pipeline stage
//! \~russian Устанавливает флаг ожидания следующего этапа конвейера
//! \~\details
//! \~english Sets whether the \a calculated() signal requests waiting in the downstream stage.
//! \~russian Устанавливает, должен ли сигнал \a calculated() запрашивать ожидание на следующей стадии.
//! \~\note
//! \~english This flag only affects downstream propagation and does not change how this stage accepts input.
//! \~russian Этот флаг влияет только на передачу вниз по конвейеру и не меняет правила приема входа этой стадией.
void setWaitNextPipe(bool wait) { wait_next_pipe = wait; }
protected:
//! \~english Calculates one output from queued input \a v.
//! \~russian Вычисляет один выход из очередного входного элемента \a v.
//! \~\note
//! \~english Leave \a ok equal to \c true to publish the result, or set it to \c false to drop this input without forwarding.
//! \~russian Оставьте \a ok равным \c true, чтобы опубликовать результат, или установите \a ok в \c false, чтобы отбросить этот вход
//! без пересылки дальше.
virtual Tout calc(Tin & v, bool & ok) = 0;
//! \~english Maximum queue size (0 means unlimited)
//! \~russian Максимальный размер очереди (0 означает без ограничений)
uint max_size;
private:
void begin() override { cnt = 0; }
void run() override {
mutex.lock();
while (in.isEmpty()) {
cv.wait(mutex);
if (terminating) {
mutex.unlock();
return;
}
}
mutex_wait.lock();
Tin t = in.dequeue();
mutex.unlock();
cv_wait.notifyAll();
mutex_wait.unlock();
bool ok = true;
Tout r = calc(t, ok);
if (ok) {
mutex_last.lock();
last = r;
mutex_last.unlock();
cnt++;
// piCoutObj << "calc ok";
calculated(r, wait_next_pipe);
}
// piCoutObj << "run ok";
}
PIMutex mutex, mutex_wait;
PIConditionVariable cv, cv_wait;
PIMutex mutex_last;
bool wait_next_pipe;
ullong cnt;
PIQueue<Tin> in;
Tout last;
};
#endif // PIPIPELINETHREAD_H