/* PIP - Platform Independent Primitives Ivan Pelipenko, Stephan Fomenko This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include "pithreadpoolworker.h" #include "pisysteminfo.h" //! \addtogroup Thread //! \{ //! \class PIThreadPoolWorker pithreadpoolworker.h //! \~\details //! \~english //! \PIThreadPoolWorker is a class that implements a fixed-size pool of worker threads for general-purpose task execution. It //! allows starting threads, enqueuing tasks as functors or class member methods, and managing their execution. The class provides methods //! to wait for all tasks or specific tasks by ID to complete, as well as to gracefully stop the pool with timeouts. The lifecycle of each //! task can be monitored using the taskStarted and taskFinished events. It is a versatile tool for multithreaded asynchronous operations. //! \~russian //! PIThreadPoolWorker — это класс, реализующий фиксированный пул рабочих потоков для выполнения задач общего назначения. Он //! позволяет запускать потоки, добавлять в очередь задачи в виде функторов или методов класса, а также управлять их выполнением. Класс //! предоставляет методы для ожидания завершения всех задач или отдельных задач по их идентификатору, а также для корректной остановки пула //! с таймаутами. С помощью событий taskStarted и taskFinished можно отслеживать жизненный цикл каждой задачи. Это универсальный инструмент //! для многопоточного асинхронного выполнения операций. //! //! \} PIThreadPoolWorker::PIThreadPoolWorker(int threads_count) { if (threads_count < 0) threads_count = PISystemInfo::instance()->processorsCount; assertm(threads_count > 0, "Invalid threads count!"); for (int i = 0; i < threads_count; ++i) { Worker * w = new Worker(); w->thread.setSlot([this, w]() { threadFunc(w); }); workers << w; } } PIThreadPoolWorker::~PIThreadPoolWorker() { piDeleteAllAndClear(workers); } void PIThreadPoolWorker::start() { for (auto w: workers) { w->thread.start(); w->notifier.notify(); } } void PIThreadPoolWorker::stop() { for (auto w: workers) { w->thread.stop(); w->notifier.notify(); } } bool PIThreadPoolWorker::stopAndWait(PISystemTime timeout) { stop(); return waitForFinish(timeout); } bool PIThreadPoolWorker::waitForStart(PISystemTime timeout) { PITimeMeasurer tm; for (auto w: workers) { if (timeout.isNull()) w->thread.waitForStart(); else { auto remains = timeout - tm.elapsed(); if (remains.isNegative()) return false; if (!w->thread.waitForStart(remains)) return false; } } return true; } bool PIThreadPoolWorker::waitForFinish(PISystemTime timeout) { PITimeMeasurer tm; for (auto w: workers) { if (timeout.isNull()) w->thread.waitForFinish(); else { auto remains = timeout - tm.elapsed(); if (remains.isNegative()) return false; if (!w->thread.waitForFinish(remains)) return false; } } return true; } bool PIThreadPoolWorker::isRunning() const { return workers.every([](Worker * w) { return w->thread.isRunning(); }); } bool PIThreadPoolWorker::waitForTasks(PISystemTime timeout) { if (!isRunning()) return tasks_queue.getRef()->isEmpty(); auto checkWorking = [this] { if (tasks_queue.getRef()->isNotEmpty()) return true; for (auto * w: workers) if (w->in_work > 0) return true; return false; }; if (timeout.isNull()) { for (;;) { if (!checkWorking()) break; piMinSleep(); } return true; } PITimeMeasurer tm; while (tm.elapsed() < timeout) { if (!checkWorking()) return true; piMinSleep(); } return tm.elapsed() < timeout; } bool PIThreadPoolWorker::waitForTask(int64_t id, PISystemTime timeout) { if (!isRunning()) return tasks_queue.getRef()->every([id](const Task & t) { return t.id != id; }); auto checkWorking = [this, id] { if (tasks_queue.getRef()->any([id](const Task & t) { return t.id == id; })) return true; for (auto * w: workers) if (w->in_work == id) return true; return false; }; if (timeout.isNull()) { for (;;) { if (!checkWorking()) break; piMinSleep(); } return true; } PITimeMeasurer tm; while (tm.elapsed() < timeout) { if (!checkWorking()) return true; piMinSleep(); } return tm.elapsed() < timeout; } void PIThreadPoolWorker::exec() { start(); waitForStart(); waitForTasks(); stopAndWait(); } int64_t PIThreadPoolWorker::enqueueTask(std::function func, PIObject * context) { if (context) { if (!contexts[context]) { contexts << context; CONNECTL(context, deleted, ([this, context](PIObject *) { contexts.remove(context); auto qref = tasks_queue.getRef(); // auto prev_size = qref->size(); // piCout << "deleted" << (void *)context << qref->map([](const Task & t) { return t.context; }); qref->removeWhere([context](const Task & t) { return t.context == context; }); // piCout << prev_size << qref->size() << qref->map([](const Task & t) { return t.context; }); })); } } int64_t id = ++next_task_id; Task task; task.id = id; task.func = std::move(func); task.context = context; tasks_queue.getRef()->enqueue(std::move(task)); for (auto * w: workers) w->notifier.notify(); return id; } bool PIThreadPoolWorker::removeTask(int64_t id) { auto qref = tasks_queue.getRef(); auto prev_size = qref->size(); qref->removeWhere([id](const Task & t) { return t.id == id; }); return prev_size != qref->size(); } void PIThreadPoolWorker::clearTasks() { tasks_queue.getRef()->clear(); } PIThreadPoolWorker::TaskStatus PIThreadPoolWorker::taskStatus(int64_t id) const { if (id <= 0) return TaskStatus::Unknown; auto qref = tasks_queue.getRef(); for (auto w: workers) if (w->in_work == id) return TaskStatus::InProgress; for (const auto & t: *qref) if (t.id == id) return TaskStatus::Enqueued; return id <= next_task_id ? TaskStatus::DoneOrCancelled : TaskStatus::Unknown; } void PIThreadPoolWorker::threadFunc(Worker * w) { w->notifier.wait(); if (w->thread.isStopping()) return; Task task; { auto ref = tasks_queue.getRef(); if (ref->isEmpty()) return; task = ref->dequeue(); } if (!task.isValid()) return; w->in_work = task.id; taskStarted(task.id); task.func(task.id); w->in_work = -1; taskFinished(task.id); w->notifier.notify(); }