239 lines
7.2 KiB
C++
239 lines
7.2 KiB
C++
/*
|
||
PIP - Platform Independent Primitives
|
||
|
||
Ivan Pelipenko, Stephan Fomenko
|
||
|
||
This program is free software: you can redistribute it and/or modify
|
||
it under the terms of the GNU Lesser General Public License as published by
|
||
the Free Software Foundation, either version 3 of the License, or
|
||
(at your option) any later version.
|
||
|
||
This program is distributed in the hope that it will be useful,
|
||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
GNU Lesser General Public License for more details.
|
||
|
||
You should have received a copy of the GNU Lesser General Public License
|
||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||
*/
|
||
|
||
#include "pithreadpoolworker.h"
|
||
|
||
#include "pisysteminfo.h"
|
||
|
||
//! \addtogroup Thread
|
||
//! \{
|
||
//! \class PIThreadPoolWorker pithreadpoolworker.h
|
||
//! \~\details
|
||
//! \~english
|
||
//! \PIThreadPoolWorker is a class that implements a fixed-size pool of worker threads for general-purpose task execution. It
|
||
//! allows starting threads, enqueuing tasks as functors or class member methods, and managing their execution. The class provides methods
|
||
//! to wait for all tasks or specific tasks by ID to complete, as well as to gracefully stop the pool with timeouts. The lifecycle of each
|
||
//! task can be monitored using the taskStarted and taskFinished events. It is a versatile tool for multithreaded asynchronous operations.
|
||
//! \~russian
|
||
//! PIThreadPoolWorker — это класс, реализующий фиксированный пул рабочих потоков для выполнения задач общего назначения. Он
|
||
//! позволяет запускать потоки, добавлять в очередь задачи в виде функторов или методов класса, а также управлять их выполнением. Класс
|
||
//! предоставляет методы для ожидания завершения всех задач или отдельных задач по их идентификатору, а также для корректной остановки пула
|
||
//! с таймаутами. С помощью событий taskStarted и taskFinished можно отслеживать жизненный цикл каждой задачи. Это универсальный инструмент
|
||
//! для многопоточного асинхронного выполнения операций.
|
||
//!
|
||
//! \}
|
||
|
||
|
||
PIThreadPoolWorker::PIThreadPoolWorker(int threads_count) {
|
||
if (threads_count < 0) threads_count = PISystemInfo::instance()->processorsCount;
|
||
assertm(threads_count > 0, "Invalid threads count!");
|
||
for (int i = 0; i < threads_count; ++i) {
|
||
Worker * w = new Worker();
|
||
w->thread.setSlot([this, w]() { threadFunc(w); });
|
||
workers << w;
|
||
}
|
||
}
|
||
|
||
|
||
PIThreadPoolWorker::~PIThreadPoolWorker() {
|
||
piDeleteAllAndClear(workers);
|
||
}
|
||
|
||
|
||
void PIThreadPoolWorker::start() {
|
||
for (auto w: workers) {
|
||
w->thread.start();
|
||
w->notifier.notify();
|
||
}
|
||
}
|
||
|
||
|
||
void PIThreadPoolWorker::stop() {
|
||
for (auto w: workers) {
|
||
w->thread.stop();
|
||
w->notifier.notify();
|
||
}
|
||
}
|
||
|
||
|
||
bool PIThreadPoolWorker::stopAndWait(PISystemTime timeout) {
|
||
stop();
|
||
return waitForFinish(timeout);
|
||
}
|
||
|
||
|
||
bool PIThreadPoolWorker::waitForStart(PISystemTime timeout) {
|
||
PITimeMeasurer tm;
|
||
for (auto w: workers) {
|
||
if (timeout.isNull())
|
||
w->thread.waitForStart();
|
||
else {
|
||
auto remains = timeout - tm.elapsed();
|
||
if (remains.isNegative()) return false;
|
||
if (!w->thread.waitForStart(remains)) return false;
|
||
}
|
||
}
|
||
return true;
|
||
}
|
||
|
||
|
||
bool PIThreadPoolWorker::waitForFinish(PISystemTime timeout) {
|
||
PITimeMeasurer tm;
|
||
for (auto w: workers) {
|
||
if (timeout.isNull())
|
||
w->thread.waitForFinish();
|
||
else {
|
||
auto remains = timeout - tm.elapsed();
|
||
if (remains.isNegative()) return false;
|
||
if (!w->thread.waitForFinish(remains)) return false;
|
||
}
|
||
}
|
||
return true;
|
||
}
|
||
|
||
|
||
bool PIThreadPoolWorker::isRunning() const {
|
||
return workers.every([](Worker * w) { return w->thread.isRunning(); });
|
||
}
|
||
|
||
|
||
bool PIThreadPoolWorker::waitForTasks(PISystemTime timeout) {
|
||
if (!isRunning()) return tasks_queue.getRef()->isEmpty();
|
||
auto checkWorking = [this] {
|
||
if (tasks_queue.getRef()->isNotEmpty()) return true;
|
||
for (auto * w: workers)
|
||
if (w->in_work > 0) return true;
|
||
return false;
|
||
};
|
||
if (timeout.isNull()) {
|
||
for (;;) {
|
||
if (!checkWorking()) break;
|
||
piMinSleep();
|
||
}
|
||
return true;
|
||
}
|
||
PITimeMeasurer tm;
|
||
while (tm.elapsed() < timeout) {
|
||
if (!checkWorking()) return true;
|
||
piMinSleep();
|
||
}
|
||
return tm.elapsed() < timeout;
|
||
}
|
||
|
||
|
||
bool PIThreadPoolWorker::waitForTask(int64_t id, PISystemTime timeout) {
|
||
if (!isRunning()) return tasks_queue.getRef()->every([id](const Task & t) { return t.id != id; });
|
||
auto checkWorking = [this, id] {
|
||
if (tasks_queue.getRef()->any([id](const Task & t) { return t.id == id; })) return true;
|
||
for (auto * w: workers)
|
||
if (w->in_work == id) return true;
|
||
return false;
|
||
};
|
||
if (timeout.isNull()) {
|
||
for (;;) {
|
||
if (!checkWorking()) break;
|
||
piMinSleep();
|
||
}
|
||
return true;
|
||
}
|
||
PITimeMeasurer tm;
|
||
while (tm.elapsed() < timeout) {
|
||
if (!checkWorking()) return true;
|
||
piMinSleep();
|
||
}
|
||
return tm.elapsed() < timeout;
|
||
}
|
||
|
||
|
||
void PIThreadPoolWorker::exec() {
|
||
start();
|
||
waitForStart();
|
||
waitForTasks();
|
||
stopAndWait();
|
||
}
|
||
|
||
|
||
int64_t PIThreadPoolWorker::enqueueTask(std::function<void(int64_t)> func, PIObject * context) {
|
||
if (context) {
|
||
if (!contexts[context]) {
|
||
contexts << context;
|
||
CONNECTL(context, deleted, ([this, context](PIObject *) {
|
||
contexts.remove(context);
|
||
auto qref = tasks_queue.getRef();
|
||
// auto prev_size = qref->size();
|
||
// piCout << "deleted" << (void *)context << qref->map<void *>([](const Task & t) { return t.context; });
|
||
qref->removeWhere([context](const Task & t) { return t.context == context; });
|
||
// piCout << prev_size << qref->size() << qref->map<void *>([](const Task & t) { return t.context; });
|
||
}));
|
||
}
|
||
}
|
||
int64_t id = ++next_task_id;
|
||
Task task;
|
||
task.id = id;
|
||
task.func = std::move(func);
|
||
task.context = context;
|
||
tasks_queue.getRef()->enqueue(std::move(task));
|
||
for (auto * w: workers)
|
||
w->notifier.notify();
|
||
return id;
|
||
}
|
||
|
||
|
||
bool PIThreadPoolWorker::removeTask(int64_t id) {
|
||
auto qref = tasks_queue.getRef();
|
||
auto prev_size = qref->size();
|
||
qref->removeWhere([id](const Task & t) { return t.id == id; });
|
||
return prev_size != qref->size();
|
||
}
|
||
|
||
|
||
void PIThreadPoolWorker::clearTasks() {
|
||
tasks_queue.getRef()->clear();
|
||
}
|
||
|
||
|
||
PIThreadPoolWorker::TaskStatus PIThreadPoolWorker::taskStatus(int64_t id) const {
|
||
if (id <= 0) return TaskStatus::Unknown;
|
||
auto qref = tasks_queue.getRef();
|
||
for (auto w: workers)
|
||
if (w->in_work == id) return TaskStatus::InProgress;
|
||
for (const auto & t: *qref)
|
||
if (t.id == id) return TaskStatus::Enqueued;
|
||
return id <= next_task_id ? TaskStatus::DoneOrCancelled : TaskStatus::Unknown;
|
||
}
|
||
|
||
|
||
void PIThreadPoolWorker::threadFunc(Worker * w) {
|
||
w->notifier.wait();
|
||
if (w->thread.isStopping()) return;
|
||
Task task;
|
||
{
|
||
auto ref = tasks_queue.getRef();
|
||
if (ref->isEmpty()) return;
|
||
task = ref->dequeue();
|
||
}
|
||
if (!task.isValid()) return;
|
||
w->in_work = task.id;
|
||
taskStarted(task.id);
|
||
task.func(task.id);
|
||
w->in_work = -1;
|
||
taskFinished(task.id);
|
||
w->notifier.notify();
|
||
}
|