diff --git a/libs/main/thread/pithreadpoolexecutor.h b/libs/main/thread/pithreadpoolexecutor.h index 264d7168..c5e78142 100644 --- a/libs/main/thread/pithreadpoolexecutor.h +++ b/libs/main/thread/pithreadpoolexecutor.h @@ -7,4 +7,17 @@ namespace { DEPRECATEDM("Use pithreadpoolworker.h") const bool deprecated_header_is_used = true; } +class DEPRECATEDM("Use PIThreadPoolWorker") PIThreadPoolExecutor: public PIThreadPoolWorker { +public: + PIThreadPoolExecutor(int threads_count): PIThreadPoolWorker(threads_count) { start(); } + ~PIThreadPoolExecutor() { stopAndWait(); } + + void execute(std::function runnable) DEPRECATEDM("Use enqueueTask()") { enqueueTask(runnable); } + void shutdownNow() DEPRECATEDM("Use stopAndWait()") { stopAndWait(); } + void shutdown() DEPRECATEDM("Use stop()") { stop(); } + bool isShutdown() const DEPRECATEDM("Use !isRunning()") { return !isRunning(); } + bool awaitTermination(PISystemTime timeout) DEPRECATEDM("Use waitForFinish()") { return waitForFinish(timeout); } +}; + + #endif diff --git a/libs/main/thread/pithreadpoolworker.cpp b/libs/main/thread/pithreadpoolworker.cpp index c3505fe7..26032f12 100644 --- a/libs/main/thread/pithreadpoolworker.cpp +++ b/libs/main/thread/pithreadpoolworker.cpp @@ -1,7 +1,7 @@ /* PIP - Platform Independent Primitives - Stephan Fomenko, Ivan Pelipenko + Ivan Pelipenko, Stephan Fomenko This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by @@ -19,15 +19,25 @@ #include "pithreadpoolworker.h" -#include "piliterals_time.h" #include "pisysteminfo.h" -//! \class PIThreadPoolWorker -//! \brief -//! Thread pools address two different problems: they usually provide improved performance when executing large -//! numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and -//! managing the resources, including threads, consumed when executing a collection of tasks. +//! \addtogroup Thread +//! \{ +//! \class PIThreadPoolWorker pithreadpoolworker.h +//! \~\details +//! \~english +//! \PIThreadPoolWorker is a class that implements a fixed-size pool of worker threads for general-purpose task execution. It +//! allows starting threads, enqueuing tasks as functors or class member methods, and managing their execution. The class provides methods +//! to wait for all tasks or specific tasks by ID to complete, as well as to gracefully stop the pool with timeouts. The lifecycle of each +//! task can be monitored using the taskStarted and taskFinished events. It is a versatile tool for multithreaded asynchronous operations. +//! \~russian +//! PIThreadPoolWorker — это класс, реализующий фиксированный пул рабочих потоков для выполнения задач общего назначения. Он +//! позволяет запускать потоки, добавлять в очередь задачи в виде функторов или методов класса, а также управлять их выполнением. Класс +//! предоставляет методы для ожидания завершения всех задач или отдельных задач по их идентификатору, а также для корректной остановки пула +//! с таймаутами. С помощью событий taskStarted и taskFinished можно отслеживать жизненный цикл каждой задачи. Это универсальный инструмент +//! для многопоточного асинхронного выполнения операций. //! +//! \} PIThreadPoolWorker::PIThreadPoolWorker(int threads_count) { @@ -205,7 +215,7 @@ PIThreadPoolWorker::TaskStatus PIThreadPoolWorker::taskStatus(int64_t id) const if (w->in_work == id) return TaskStatus::InProgress; for (const auto & t: *qref) if (t.id == id) return TaskStatus::Enqueued; - return id <= next_task_id ? TaskStatus::Done : TaskStatus::Unknown; + return id <= next_task_id ? TaskStatus::DoneOrCancelled : TaskStatus::Unknown; } diff --git a/libs/main/thread/pithreadpoolworker.h b/libs/main/thread/pithreadpoolworker.h index aa89823f..295ea6ea 100644 --- a/libs/main/thread/pithreadpoolworker.h +++ b/libs/main/thread/pithreadpoolworker.h @@ -3,14 +3,10 @@ //! \brief //! \~english Thread pool worker //! \~russian Исполнитель пула потоков -//! -//! \details -//! \~english Executes tasks in a pool of worker threads. -//! \~russian Выполняет задачи в пуле рабочих потоков. /* PIP - Platform Independent Primitives - Stephan Fomenko, Ivan Pelipenko + Ivan Pelipenko, Stephan Fomenko This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by @@ -29,32 +25,35 @@ #ifndef PITHREADPOOLWORKER_H #define PITHREADPOOLWORKER_H -#include "piblockingqueue.h" #include "piprotectedvariable.h" #include "pithread.h" //! \~\ingroup Thread //! \~\brief -//! \~english Fixed-size pool of worker threads for fire-and-forget tasks. -//! \~russian Фиксированный пул рабочих потоков для задач без ожидания результата. +//! \~english Fixed-size pool of worker threads for generic-purpose tasks. +//! \~russian Фиксированный пул рабочих потоков для задач общего назначения. class PIP_EXPORT PIThreadPoolWorker: public PIObject { PIOBJECT(PIThreadPoolWorker) public: - //! \~english Constructs executor with \a threads_count worker threads. - //! \~russian Создает исполнитель с \a threads_count рабочими потоками. + //! \~english Constructs executor with \a threads_count worker threads. If \a threads_count < 0 processor threads count used. + //! \~russian Создает исполнитель с \a threads_count рабочими потоками. Если \a threads_count < 0 используется количество потоков + //! процессора. explicit PIThreadPoolWorker(int threads_count = -1); - //! \~english Stops worker threads and destroys executor resources. - //! \~russian Останавливает рабочие потоки и уничтожает ресурсы исполнителя. + //! \~english Destroy worker threads. Call \a stopAndWait() before. + //! \~russian Уничтожает рабочие потоки. Вызывайте перед этим \a stopAndWait(). virtual ~PIThreadPoolWorker(); + + //! \~english Task status. + //! \~russian Статус задачи. enum class TaskStatus { - Unknown, - Enqueued, - InProgress, - Done + Unknown /** \~english ID <= 0 or not queued yet \~russian ID <= 0 или не поставлена в очередь */, + Enqueued /** \~english Wait for execution \~russian Ожидает выполнения */, + InProgress /** \~english In execution now \~russian В процессе выполнения */, + DoneOrCancelled /** \~english Done or cancelled \~russian Выполнена или отменена */ }; @@ -90,31 +89,50 @@ public: //! \~russian Ожидает завершения задачи с id \a id. Возвращает \b false, если таймаут истек раньше. bool waitForTask(int64_t id, PISystemTime timeout = {}); + //! \~english Starts threads, wait for all tasks complete and threads stop. + //! \~russian Запускает потоки, ожидает завершения всех задач и остановки потоков. void exec(); + + //! \~english Queue functor to execution. Pass task ID in functor. Returns task ID. + //! \~russian Запланировать функтор на выполнение. В функтор передастся ID задачи. Возвращает ID задачи. int64_t enqueueTask(std::function func, PIObject * context = nullptr); + //! \~english Queue functor to execution. Returns task ID. + //! \~russian Запланировать функтор на выполнение. Возвращает ID задачи. int64_t enqueueTask(std::function func, PIObject * context = nullptr) { return enqueueTask([func](int64_t) { func(); }, context); } + //! \~english Queue class member method to execution. Pass task ID in method. Returns task ID. + //! \~russian Запланировать член-метод класса на выполнение. В метод передастся ID задачи. Возвращает ID задачи. template int64_t enqueueTask(O * obj, void (O::*member_func)(int64_t)) { return enqueueTask([obj, member_func](int64_t id) { (obj->*member_func)(id); }, PIObject::isPIObject(obj) ? dynamic_cast(obj) : nullptr); } + //! \~english Queue class member method to execution. Returns task ID. + //! \~russian Запланировать член-метод класса на выполнение. Возвращает ID задачи. template int64_t enqueueTask(O * obj, void (O::*member_func)()) { return enqueueTask([obj, member_func](int64_t) { (obj->*member_func)(); }, PIObject::isPIObject(obj) ? dynamic_cast(obj) : nullptr); } + //! \~english Remove task with id \a id from queue. Returns if task delete. + //! \~russian Удаляет задачу с id \a id из очереди. Возвращиает была ли задача удалена. bool removeTask(int64_t id); + + //! \~english Remove all queued tasks. + //! \~russian Удаляет все задачи из очереди. void clearTasks(); + //! \~english Returns task status with id \a id. + //! \~russian Возвращиает статус задачи с id \a id. TaskStatus taskStatus(int64_t id) const; + //! \events //! \{ @@ -133,25 +151,6 @@ public: //! \} - // DEPRECATED - - //! \~\brief - //! \~english Submits \a runnable for asynchronous execution by a worker thread. - //! \~russian Передает \a runnable на асинхронное выполнение рабочим потоком. - //! \details - //! \~english - //! This is a best-effort fire-and-forget call and does not report whether the task was accepted. - //! \After shutdown requests new tasks are ignored. - //! \~russian - //! Это вызов по принципу best-effort без ожидания результата и без сообщения о том, была ли задача принята. - //! \После запроса на завершение новые задачи игнорируются. - void execute(std::function runnable) { enqueueTask(runnable); } - - void shutdownNow() DEPRECATEDM("Use stopAndWait()") { stopAndWait(); } - void shutdown() DEPRECATEDM("Use stop()") { stop(); } - bool isShutdown() const DEPRECATEDM("Use !isRunning()") { return !isRunning(); } - bool awaitTermination(PISystemTime timeout) DEPRECATEDM("Use waitForFinish()") { return waitForFinish(timeout); } - private: struct Worker { PIThread thread; @@ -175,7 +174,4 @@ private: }; -typedef PIThreadPoolWorker PIThreadPoolExecutor DEPRECATEDM("Use PIThreadPoolWorker"); - - #endif // PITHREADPOOLWORKER_H diff --git a/main.cpp b/main.cpp index 23928d89..61a7b945 100644 --- a/main.cpp +++ b/main.cpp @@ -1,30 +1,53 @@ #include "pip.h" +#include "pithreadpoolexecutor.h" +class A: public PIObject { + PIOBJECT(A) + +public: + A(PIString n = {}) { setName(n); } + void foo() { + 100_ms .sleep(); + piCoutObj << "foo!"; + 100_ms .sleep(); + } +}; int main(int argc, char * argv[]) { + PIVector objects; + objects << new A("1") << new A("2") << new A("3"); + PITimer status_timer; - PIThreadPoolWorker pool(1); - - int64_t id = -1; - - status_timer.start(10_Hz, [&id, &pool] { piCout << "[timer] status" << (int)pool.taskStatus(id); }); - - 200_ms .sleep(); - id = pool.enqueueTask([](int64_t id) { - piCout << "[task ] start, id" << id; - 300_ms .sleep(); - piCout << "[task ] done"; - }); - piCout << "[main ]" << "enqueued, id" << id; - - 200_ms .sleep(); - piCout << pool.removeTask(id); - piCout << pool.removeTask(id); - piCout << "[main ]" << "start"; + PIThreadPoolWorker pool(2); pool.start(); + // int64_t id = -1; + // status_timer.start(10_Hz, [&id, &pool] { piCout << "[timer] status" << (int)pool.taskStatus(id); }); + + 100_ms .sleep(); + // pool.enqueueTask([](int64_t id) { + // piCout << "[task ] start, id" << id; + // // 500_ms .sleep(); + // piCout << "[task ] done"; + // }); + pool.enqueueTask(objects[0], &A::foo); + pool.enqueueTask(objects[1], &A::foo); + pool.enqueueTask(objects[1], &A::foo); + pool.enqueueTask(objects[2], &A::foo); + + 10_ms .sleep(); + delete objects[1]; + objects.remove(1); + // piCout << "[main ]" << "enqueued, id" << id; + + // 200_ms .sleep(); + piCout << "[main ]" << "wait ..."; + piCout << "[main ]" << "wait done"; + 1000_ms .sleep(); + pool.stopAndWait(); status_timer.stopAndWait(); + piDeleteAll(objects); return 0; }