/* PIP - Platform Independent Primitives Stephan Fomenko, Ivan Pelipenko This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include "pithreadpoolworker.h" #include "piliterals_time.h" #include "pisysteminfo.h" //! \class PIThreadPoolWorker //! \brief //! Thread pools address two different problems: they usually provide improved performance when executing large //! numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and //! managing the resources, including threads, consumed when executing a collection of tasks. //! PIThreadPoolWorker::PIThreadPoolWorker(int threads_count) { if (threads_count < 0) threads_count = PISystemInfo::instance()->processorsCount; assertm(threads_count > 0, "Invalid threads count!"); for (int i = 0; i < threads_count; ++i) { Worker * w = new Worker(); w->thread.setSlot([this, w]() { threadFunc(w); }); workers << w; } } PIThreadPoolWorker::~PIThreadPoolWorker() { stop(); for (auto * w: workers) { if (!w->thread.waitForFinish(1_s)) w->thread.terminate(); } piDeleteAllAndClear(workers); } void PIThreadPoolWorker::start() { for (auto w: workers) w->thread.start(); m_running = true; } void PIThreadPoolWorker::stop() { m_running = false; for (auto w: workers) { w->thread.stop(); w->notifier.notify(); } } bool PIThreadPoolWorker::stopAndWait(PISystemTime timeout) { stop(); return waitForFinish(timeout); } bool PIThreadPoolWorker::waitForStart(PISystemTime timeout) { PITimeMeasurer tm; for (auto w: workers) { if (timeout.isNull()) w->thread.waitForStart(); else { auto remains = timeout - tm.elapsed(); if (remains.isNegative()) return false; if (!w->thread.waitForStart(remains)) return false; } } return true; } bool PIThreadPoolWorker::waitForFinish(PISystemTime timeout) { PITimeMeasurer tm; for (auto w: workers) { if (timeout.isNull()) w->thread.waitForFinish(); else { auto remains = timeout - tm.elapsed(); if (remains.isNegative()) return false; if (!w->thread.waitForFinish(remains)) return false; } } return true; } int64_t PIThreadPoolWorker::enqueueTask(std::function func, PIObject * context) { int64_t id = ++next_task_id; Task task; task.id = id; task.func = std::move(func); task.context = context; tasks_queue.getRef()->append(std::move(task)); for (auto * w: workers) w->notifier.notify(); return id; } bool PIThreadPoolWorker::removeTask(int64_t id) { auto qref = tasks_queue.getRef(); auto prev_size = qref->size(); qref->removeWhere([id](const Task & t) { return t.id == id; }); return prev_size != qref->size(); } void PIThreadPoolWorker::clearTasks() { tasks_queue.getRef()->clear(); } PIThreadPoolWorker::TaskStatus PIThreadPoolWorker::taskStatus(int64_t id) const { if (id <= 0) return TaskStatus::Unknown; auto qref = tasks_queue.getRef(); for (auto w: workers) if (w->in_work == id) return TaskStatus::InProgress; for (const auto & t: *qref) if (t.id == id) return TaskStatus::Enqueued; return id <= next_task_id ? TaskStatus::Done : TaskStatus::Unknown; } void PIThreadPoolWorker::threadFunc(Worker * w) { w->notifier.wait(); if (w->thread.isStopping()) return; if (!m_running) return; Task task; { auto ref = tasks_queue.getRef(); if (ref->isEmpty()) return; task = ref->dequeue(); } if (!task.isValid()) return; w->in_work = task.id; task.func(task.id); w->in_work = -1; w->notifier.notify(); }