From 5868e0ec9d9a5bbd32800a3de6bee38301101d2b Mon Sep 17 00:00:00 2001 From: peri4 Date: Tue, 24 Mar 2026 19:56:43 +0300 Subject: [PATCH] work with PIThreadPoolWorker --- libs/main/thread/pithreadpoolexecutor.h | 8 ++- libs/main/thread/pithreadpoolworker.cpp | 89 ++++++++++++++++++++++--- libs/main/thread/pithreadpoolworker.h | 43 ++++++++++-- 3 files changed, 123 insertions(+), 17 deletions(-) diff --git a/libs/main/thread/pithreadpoolexecutor.h b/libs/main/thread/pithreadpoolexecutor.h index 135cd5db..264d7168 100644 --- a/libs/main/thread/pithreadpoolexecutor.h +++ b/libs/main/thread/pithreadpoolexecutor.h @@ -1,6 +1,10 @@ +#ifndef PITHREADPOOLEXECUTOR_H +#define PITHREADPOOLEXECUTOR_H + #include "pithreadpoolworker.h" namespace { -// [[deprecated("DeprecatedHeader.h is deprecated. Use NewHeader.h instead.")]] DEPRECATEDM("Use pithreadpoolworker.h") const bool deprecated_header_is_used = true; -} // namespace +} + +#endif diff --git a/libs/main/thread/pithreadpoolworker.cpp b/libs/main/thread/pithreadpoolworker.cpp index 03b49b32..c3505fe7 100644 --- a/libs/main/thread/pithreadpoolworker.cpp +++ b/libs/main/thread/pithreadpoolworker.cpp @@ -42,23 +42,19 @@ PIThreadPoolWorker::PIThreadPoolWorker(int threads_count) { PIThreadPoolWorker::~PIThreadPoolWorker() { - stop(); - for (auto * w: workers) { - if (!w->thread.waitForFinish(1_s)) w->thread.terminate(); - } piDeleteAllAndClear(workers); } void PIThreadPoolWorker::start() { - for (auto w: workers) + for (auto w: workers) { w->thread.start(); - m_running = true; + w->notifier.notify(); + } } void PIThreadPoolWorker::stop() { - m_running = false; for (auto w: workers) { w->thread.stop(); w->notifier.notify(); @@ -102,13 +98,87 @@ bool PIThreadPoolWorker::waitForFinish(PISystemTime timeout) { } +bool PIThreadPoolWorker::isRunning() const { + return workers.every([](Worker * w) { return w->thread.isRunning(); }); +} + + +bool PIThreadPoolWorker::waitForTasks(PISystemTime timeout) { + if (!isRunning()) return tasks_queue.getRef()->isEmpty(); + auto checkWorking = [this] { + if (tasks_queue.getRef()->isNotEmpty()) return true; + for (auto * w: workers) + if (w->in_work > 0) return true; + return false; + }; + if (timeout.isNull()) { + for (;;) { + if (!checkWorking()) break; + piMinSleep(); + } + return true; + } + PITimeMeasurer tm; + while (tm.elapsed() < timeout) { + if (!checkWorking()) return true; + piMinSleep(); + } + return tm.elapsed() < timeout; +} + + +bool PIThreadPoolWorker::waitForTask(int64_t id, PISystemTime timeout) { + if (!isRunning()) return tasks_queue.getRef()->every([id](const Task & t) { return t.id != id; }); + auto checkWorking = [this, id] { + if (tasks_queue.getRef()->any([id](const Task & t) { return t.id == id; })) return true; + for (auto * w: workers) + if (w->in_work == id) return true; + return false; + }; + if (timeout.isNull()) { + for (;;) { + if (!checkWorking()) break; + piMinSleep(); + } + return true; + } + PITimeMeasurer tm; + while (tm.elapsed() < timeout) { + if (!checkWorking()) return true; + piMinSleep(); + } + return tm.elapsed() < timeout; +} + + +void PIThreadPoolWorker::exec() { + start(); + waitForStart(); + waitForTasks(); + stopAndWait(); +} + + int64_t PIThreadPoolWorker::enqueueTask(std::function func, PIObject * context) { + if (context) { + if (!contexts[context]) { + contexts << context; + CONNECTL(context, deleted, ([this, context](PIObject *) { + contexts.remove(context); + auto qref = tasks_queue.getRef(); + // auto prev_size = qref->size(); + // piCout << "deleted" << (void *)context << qref->map([](const Task & t) { return t.context; }); + qref->removeWhere([context](const Task & t) { return t.context == context; }); + // piCout << prev_size << qref->size() << qref->map([](const Task & t) { return t.context; }); + })); + } + } int64_t id = ++next_task_id; Task task; task.id = id; task.func = std::move(func); task.context = context; - tasks_queue.getRef()->append(std::move(task)); + tasks_queue.getRef()->enqueue(std::move(task)); for (auto * w: workers) w->notifier.notify(); return id; @@ -142,7 +212,6 @@ PIThreadPoolWorker::TaskStatus PIThreadPoolWorker::taskStatus(int64_t id) const void PIThreadPoolWorker::threadFunc(Worker * w) { w->notifier.wait(); if (w->thread.isStopping()) return; - if (!m_running) return; Task task; { auto ref = tasks_queue.getRef(); @@ -151,7 +220,9 @@ void PIThreadPoolWorker::threadFunc(Worker * w) { } if (!task.isValid()) return; w->in_work = task.id; + taskStarted(task.id); task.func(task.id); w->in_work = -1; + taskFinished(task.id); w->notifier.notify(); } diff --git a/libs/main/thread/pithreadpoolworker.h b/libs/main/thread/pithreadpoolworker.h index 04ff1888..aa89823f 100644 --- a/libs/main/thread/pithreadpoolworker.h +++ b/libs/main/thread/pithreadpoolworker.h @@ -38,7 +38,9 @@ //! \~\brief //! \~english Fixed-size pool of worker threads for fire-and-forget tasks. //! \~russian Фиксированный пул рабочих потоков для задач без ожидания результата. -class PIP_EXPORT PIThreadPoolWorker { +class PIP_EXPORT PIThreadPoolWorker: public PIObject { + PIOBJECT(PIThreadPoolWorker) + public: //! \~english Constructs executor with \a threads_count worker threads. //! \~russian Создает исполнитель с \a threads_count рабочими потоками. @@ -78,8 +80,17 @@ public: //! \~english Returns whether the threads are currently running. //! \~russian Возвращает, выполняются ли потоки в данный момент. - bool isRunning() const { return m_running; } + bool isRunning() const; + //! \~english Waits for all tasks completion. Returns \b false if the timeout expires first. + //! \~russian Ожидает завершения всех задач. Возвращает \b false, если таймаут истек раньше. + bool waitForTasks(PISystemTime timeout = {}); + + //! \~english Waits for task with id \a id completion. Returns \b false if the timeout expires first. + //! \~russian Ожидает завершения задачи с id \a id. Возвращает \b false, если таймаут истек раньше. + bool waitForTask(int64_t id, PISystemTime timeout = {}); + + void exec(); int64_t enqueueTask(std::function func, PIObject * context = nullptr); @@ -89,18 +100,38 @@ public: template int64_t enqueueTask(O * obj, void (O::*member_func)(int64_t)) { - return enqueueTask([obj, member_func](int64_t id) { (obj->*member_func)(id); }); + return enqueueTask([obj, member_func](int64_t id) { (obj->*member_func)(id); }, + PIObject::isPIObject(obj) ? dynamic_cast(obj) : nullptr); } template int64_t enqueueTask(O * obj, void (O::*member_func)()) { - return enqueueTask([obj, member_func](int64_t) { (obj->*member_func)(); }); + return enqueueTask([obj, member_func](int64_t) { (obj->*member_func)(); }, + PIObject::isPIObject(obj) ? dynamic_cast(obj) : nullptr); } bool removeTask(int64_t id); + void clearTasks(); TaskStatus taskStatus(int64_t id) const; + //! \events + //! \{ + + //! \fn void taskStarted(int64_t id) + //! \brief + //! \~english Raised on start execution task with id \a id. + //! \~russian Вызывается при старте выполнения задачи с id \a id. + EVENT1(taskStarted, int64_t, id); + + //! \fn void taskFinished(int64_t id) + //! \brief + //! \~english Raised on finish execution task with id \a id. + //! \~russian Вызывается при завершении выполнения задачи с id \a id. + EVENT1(taskFinished, int64_t, id); + + //! \} + // DEPRECATED @@ -114,7 +145,7 @@ public: //! \~russian //! Это вызов по принципу best-effort без ожидания результата и без сообщения о том, была ли задача принята. //! \После запроса на завершение новые задачи игнорируются. - void execute(std::function runnable); + void execute(std::function runnable) { enqueueTask(runnable); } void shutdownNow() DEPRECATEDM("Use stopAndWait()") { stopAndWait(); } void shutdown() DEPRECATEDM("Use stop()") { stop(); } @@ -139,7 +170,7 @@ private: mutable PIVector workers; mutable PIProtectedVariable> tasks_queue; - std::atomic_bool m_running = {false}; + PISet contexts; std::atomic_int64_t next_task_id = {0}; };