diff --git a/libs/main/thread/pithread.h b/libs/main/thread/pithread.h
index 2c80e367..102b790c 100644
--- a/libs/main/thread/pithread.h
+++ b/libs/main/thread/pithread.h
@@ -217,6 +217,7 @@ public:
//! \~english Waits until the thread starts. Returns \b false if the timeout expires first.
//! \~russian Ожидает запуска потока. Возвращает \b false, если таймаут истек раньше.
bool waitForStart(PISystemTime timeout = {});
+
//! \~english Deprecated overload of \a waitForStart() that accepts milliseconds.
//! \~russian Устаревшая перегрузка \a waitForStart(), принимающая миллисекунды.
bool waitForStart(int timeout_msecs) DEPRECATEDM("use waitForStart(PISystemTime)") {
@@ -226,6 +227,7 @@ public:
//! \~english Waits for thread completion. Returns \b false if the timeout expires first.
//! \~russian Ожидает завершения потока. Возвращает \b false, если таймаут истек раньше.
bool waitForFinish(PISystemTime timeout = {});
+
//! \~english Deprecated overload of \a waitForFinish() that accepts milliseconds.
//! \~russian Устаревшая перегрузка \a waitForFinish(), принимающая миллисекунды.
bool waitForFinish(int timeout_msecs) DEPRECATEDM("use waitForFinish(PISystemTime)") {
diff --git a/libs/main/thread/pithreadmodule.h b/libs/main/thread/pithreadmodule.h
index 55ce08ab..15557d1d 100644
--- a/libs/main/thread/pithreadmodule.h
+++ b/libs/main/thread/pithreadmodule.h
@@ -71,8 +71,8 @@
#include "pispinlock.h"
#include "pithread.h"
#include "pithreadnotifier.h"
-#include "pithreadpoolexecutor.h"
#include "pithreadpoolloop.h"
+#include "pithreadpoolworker.h"
#include "pitimer.h"
#endif // PITHREADMODULE_H
diff --git a/libs/main/thread/pithreadpoolexecutor.cpp b/libs/main/thread/pithreadpoolexecutor.cpp
deleted file mode 100644
index 1973b6bd..00000000
--- a/libs/main/thread/pithreadpoolexecutor.cpp
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- PIP - Platform Independent Primitives
-
- Stephan Fomenko
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see .
-*/
-
-#include "pithreadpoolexecutor.h"
-
-#include "piliterals_time.h"
-
-/*! \class PIThreadPoolExecutor
- * \brief Thread pools address two different problems: they usually provide improved performance when executing large
- * numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and
- * managing the resources, including threads, consumed when executing a collection of tasks.
- */
-
-
-PIThreadPoolExecutor::PIThreadPoolExecutor(int corePoolSize): isShutdown_(false) {
- for (int i = 0; i < corePoolSize; ++i) {
- PIThread * thread = new PIThread([&, i]() {
- auto runnable = taskQueue.poll(100_ms, std::function());
- if (runnable) {
- runnable();
- }
- if (isShutdown_ && taskQueue.size() == 0) threadPool[i]->stop();
- });
- threadPool.push_back(thread);
- thread->start();
- }
-}
-
-
-bool PIThreadPoolExecutor::awaitTermination(PISystemTime timeout) {
- PITimeMeasurer measurer;
- for (size_t i = 0; i < threadPool.size(); ++i) {
- auto dif = timeout - measurer.elapsed();
- if (dif.isNegative()) return false;
- if (!threadPool[i]->waitForFinish(dif)) return false;
- }
- return true;
-}
-
-
-void PIThreadPoolExecutor::shutdownNow() {
- isShutdown_ = true;
- for (size_t i = 0; i < threadPool.size(); ++i)
- threadPool[i]->stop();
-}
-
-
-PIThreadPoolExecutor::~PIThreadPoolExecutor() {
- shutdownNow();
- while (threadPool.size() > 0)
- delete threadPool.take_back();
-}
-
-
-void PIThreadPoolExecutor::execute(std::function runnable) {
- if (!isShutdown_) taskQueue.offer(std::move(runnable));
-}
-
-
-bool PIThreadPoolExecutor::isShutdown() const {
- return isShutdown_;
-}
-
-
-void PIThreadPoolExecutor::shutdown() {
- isShutdown_ = true;
-}
diff --git a/libs/main/thread/pithreadpoolexecutor.h b/libs/main/thread/pithreadpoolexecutor.h
index c302dbd2..c5e78142 100644
--- a/libs/main/thread/pithreadpoolexecutor.h
+++ b/libs/main/thread/pithreadpoolexecutor.h
@@ -1,95 +1,23 @@
-//! \~\file pithreadpoolexecutor.h
-//! \~\ingroup Thread
-//! \brief
-//! \~english Thread pool executor
-//! \~russian Исполнитель пула потоков
-//!
-//! \details
-//! \~english Executes tasks in a pool of worker threads.
-//! \~russian Выполняет задачи в пуле рабочих потоков.
-/*
- PIP - Platform Independent Primitives
-
- Stephan Fomenko
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see .
-*/
-
#ifndef PITHREADPOOLEXECUTOR_H
#define PITHREADPOOLEXECUTOR_H
-#include "piblockingqueue.h"
-#include "pithread.h"
+#include "pithreadpoolworker.h"
-#include
+namespace {
+DEPRECATEDM("Use pithreadpoolworker.h") const bool deprecated_header_is_used = true;
+}
-
-//! \~\ingroup Thread
-//! \~\brief
-//! \~english Fixed-size pool of worker threads for fire-and-forget tasks.
-//! \~russian Фиксированный пул рабочих потоков для задач без ожидания результата.
-class PIP_EXPORT PIThreadPoolExecutor {
+class DEPRECATEDM("Use PIThreadPoolWorker") PIThreadPoolExecutor: public PIThreadPoolWorker {
public:
- //! \~english Constructs executor with \a corePoolSize worker threads.
- //! \~russian Создает исполнитель с \a corePoolSize рабочими потоками.
- explicit PIThreadPoolExecutor(int corePoolSize);
+ PIThreadPoolExecutor(int threads_count): PIThreadPoolWorker(threads_count) { start(); }
+ ~PIThreadPoolExecutor() { stopAndWait(); }
- //! \~english Stops worker threads and destroys executor resources.
- //! \~russian Останавливает рабочие потоки и уничтожает ресурсы исполнителя.
- virtual ~PIThreadPoolExecutor();
-
- //! \~\brief
- //! \~english Submits \a runnable for asynchronous execution by a worker thread.
- //! \~russian Передает \a runnable на асинхронное выполнение рабочим потоком.
- //! \details
- //! \~english
- //! This is a best-effort fire-and-forget call and does not report whether the task was accepted.
- //! \After shutdown requests new tasks are ignored.
- //! \~russian
- //! Это вызов по принципу best-effort без ожидания результата и без сообщения о том, была ли задача принята.
- //! \После запроса на завершение новые задачи игнорируются.
- void execute(std::function runnable);
-
- //! \~english Requests immediate shutdown and stops worker threads without waiting for queued tasks to finish.
- //! \~russian Запрашивает немедленное завершение и останавливает рабочие потоки без ожидания завершения задач в очереди.
- void shutdownNow();
-
- //! \~\brief
- //! \~english Requests orderly shutdown: new tasks are rejected and workers stop after the current queue is drained.
- //! \~russian Запрашивает упорядоченное завершение: новые задачи отклоняются, а рабочие потоки останавливаются после опустошения текущей
- //! очереди.
- //! \details
- //! \~english This method does not wait for worker termination.
- //! \~russian Этот метод не ожидает завершения рабочих потоков.
- void shutdown();
-
- //! \~english Returns \c true after \a shutdown() or \a shutdownNow() has been requested.
- //! \~russian Возвращает \c true после запроса \a shutdown() или \a shutdownNow().
- bool isShutdown() const;
-
- //! \~\brief
- //! \~english Waits up to \a timeout for all worker threads to finish.
- //! \~russian Ожидает до \a timeout завершения всех рабочих потоков.
- //! \return
- //! \~english \c false if the timeout expires first.
- //! \~russian \c false, если таймаут истек раньше.
- bool awaitTermination(PISystemTime timeout);
-
-private:
- std::atomic_bool isShutdown_;
- PIBlockingQueue> taskQueue;
- PIVector threadPool;
+ void execute(std::function runnable) DEPRECATEDM("Use enqueueTask()") { enqueueTask(runnable); }
+ void shutdownNow() DEPRECATEDM("Use stopAndWait()") { stopAndWait(); }
+ void shutdown() DEPRECATEDM("Use stop()") { stop(); }
+ bool isShutdown() const DEPRECATEDM("Use !isRunning()") { return !isRunning(); }
+ bool awaitTermination(PISystemTime timeout) DEPRECATEDM("Use waitForFinish()") { return waitForFinish(timeout); }
};
-#endif // PITHREADPOOLEXECUTOR_H
+
+#endif
diff --git a/libs/main/thread/pithreadpoolworker.cpp b/libs/main/thread/pithreadpoolworker.cpp
new file mode 100644
index 00000000..26032f12
--- /dev/null
+++ b/libs/main/thread/pithreadpoolworker.cpp
@@ -0,0 +1,238 @@
+/*
+ PIP - Platform Independent Primitives
+
+ Ivan Pelipenko, Stephan Fomenko
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#include "pithreadpoolworker.h"
+
+#include "pisysteminfo.h"
+
+//! \addtogroup Thread
+//! \{
+//! \class PIThreadPoolWorker pithreadpoolworker.h
+//! \~\details
+//! \~english
+//! \PIThreadPoolWorker is a class that implements a fixed-size pool of worker threads for general-purpose task execution. It
+//! allows starting threads, enqueuing tasks as functors or class member methods, and managing their execution. The class provides methods
+//! to wait for all tasks or specific tasks by ID to complete, as well as to gracefully stop the pool with timeouts. The lifecycle of each
+//! task can be monitored using the taskStarted and taskFinished events. It is a versatile tool for multithreaded asynchronous operations.
+//! \~russian
+//! PIThreadPoolWorker — это класс, реализующий фиксированный пул рабочих потоков для выполнения задач общего назначения. Он
+//! позволяет запускать потоки, добавлять в очередь задачи в виде функторов или методов класса, а также управлять их выполнением. Класс
+//! предоставляет методы для ожидания завершения всех задач или отдельных задач по их идентификатору, а также для корректной остановки пула
+//! с таймаутами. С помощью событий taskStarted и taskFinished можно отслеживать жизненный цикл каждой задачи. Это универсальный инструмент
+//! для многопоточного асинхронного выполнения операций.
+//!
+//! \}
+
+
+PIThreadPoolWorker::PIThreadPoolWorker(int threads_count) {
+ if (threads_count < 0) threads_count = PISystemInfo::instance()->processorsCount;
+ assertm(threads_count > 0, "Invalid threads count!");
+ for (int i = 0; i < threads_count; ++i) {
+ Worker * w = new Worker();
+ w->thread.setSlot([this, w]() { threadFunc(w); });
+ workers << w;
+ }
+}
+
+
+PIThreadPoolWorker::~PIThreadPoolWorker() {
+ piDeleteAllAndClear(workers);
+}
+
+
+void PIThreadPoolWorker::start() {
+ for (auto w: workers) {
+ w->thread.start();
+ w->notifier.notify();
+ }
+}
+
+
+void PIThreadPoolWorker::stop() {
+ for (auto w: workers) {
+ w->thread.stop();
+ w->notifier.notify();
+ }
+}
+
+
+bool PIThreadPoolWorker::stopAndWait(PISystemTime timeout) {
+ stop();
+ return waitForFinish(timeout);
+}
+
+
+bool PIThreadPoolWorker::waitForStart(PISystemTime timeout) {
+ PITimeMeasurer tm;
+ for (auto w: workers) {
+ if (timeout.isNull())
+ w->thread.waitForStart();
+ else {
+ auto remains = timeout - tm.elapsed();
+ if (remains.isNegative()) return false;
+ if (!w->thread.waitForStart(remains)) return false;
+ }
+ }
+ return true;
+}
+
+
+bool PIThreadPoolWorker::waitForFinish(PISystemTime timeout) {
+ PITimeMeasurer tm;
+ for (auto w: workers) {
+ if (timeout.isNull())
+ w->thread.waitForFinish();
+ else {
+ auto remains = timeout - tm.elapsed();
+ if (remains.isNegative()) return false;
+ if (!w->thread.waitForFinish(remains)) return false;
+ }
+ }
+ return true;
+}
+
+
+bool PIThreadPoolWorker::isRunning() const {
+ return workers.every([](Worker * w) { return w->thread.isRunning(); });
+}
+
+
+bool PIThreadPoolWorker::waitForTasks(PISystemTime timeout) {
+ if (!isRunning()) return tasks_queue.getRef()->isEmpty();
+ auto checkWorking = [this] {
+ if (tasks_queue.getRef()->isNotEmpty()) return true;
+ for (auto * w: workers)
+ if (w->in_work > 0) return true;
+ return false;
+ };
+ if (timeout.isNull()) {
+ for (;;) {
+ if (!checkWorking()) break;
+ piMinSleep();
+ }
+ return true;
+ }
+ PITimeMeasurer tm;
+ while (tm.elapsed() < timeout) {
+ if (!checkWorking()) return true;
+ piMinSleep();
+ }
+ return tm.elapsed() < timeout;
+}
+
+
+bool PIThreadPoolWorker::waitForTask(int64_t id, PISystemTime timeout) {
+ if (!isRunning()) return tasks_queue.getRef()->every([id](const Task & t) { return t.id != id; });
+ auto checkWorking = [this, id] {
+ if (tasks_queue.getRef()->any([id](const Task & t) { return t.id == id; })) return true;
+ for (auto * w: workers)
+ if (w->in_work == id) return true;
+ return false;
+ };
+ if (timeout.isNull()) {
+ for (;;) {
+ if (!checkWorking()) break;
+ piMinSleep();
+ }
+ return true;
+ }
+ PITimeMeasurer tm;
+ while (tm.elapsed() < timeout) {
+ if (!checkWorking()) return true;
+ piMinSleep();
+ }
+ return tm.elapsed() < timeout;
+}
+
+
+void PIThreadPoolWorker::exec() {
+ start();
+ waitForStart();
+ waitForTasks();
+ stopAndWait();
+}
+
+
+int64_t PIThreadPoolWorker::enqueueTask(std::function func, PIObject * context) {
+ if (context) {
+ if (!contexts[context]) {
+ contexts << context;
+ CONNECTL(context, deleted, ([this, context](PIObject *) {
+ contexts.remove(context);
+ auto qref = tasks_queue.getRef();
+ // auto prev_size = qref->size();
+ // piCout << "deleted" << (void *)context << qref->map([](const Task & t) { return t.context; });
+ qref->removeWhere([context](const Task & t) { return t.context == context; });
+ // piCout << prev_size << qref->size() << qref->map([](const Task & t) { return t.context; });
+ }));
+ }
+ }
+ int64_t id = ++next_task_id;
+ Task task;
+ task.id = id;
+ task.func = std::move(func);
+ task.context = context;
+ tasks_queue.getRef()->enqueue(std::move(task));
+ for (auto * w: workers)
+ w->notifier.notify();
+ return id;
+}
+
+
+bool PIThreadPoolWorker::removeTask(int64_t id) {
+ auto qref = tasks_queue.getRef();
+ auto prev_size = qref->size();
+ qref->removeWhere([id](const Task & t) { return t.id == id; });
+ return prev_size != qref->size();
+}
+
+
+void PIThreadPoolWorker::clearTasks() {
+ tasks_queue.getRef()->clear();
+}
+
+
+PIThreadPoolWorker::TaskStatus PIThreadPoolWorker::taskStatus(int64_t id) const {
+ if (id <= 0) return TaskStatus::Unknown;
+ auto qref = tasks_queue.getRef();
+ for (auto w: workers)
+ if (w->in_work == id) return TaskStatus::InProgress;
+ for (const auto & t: *qref)
+ if (t.id == id) return TaskStatus::Enqueued;
+ return id <= next_task_id ? TaskStatus::DoneOrCancelled : TaskStatus::Unknown;
+}
+
+
+void PIThreadPoolWorker::threadFunc(Worker * w) {
+ w->notifier.wait();
+ if (w->thread.isStopping()) return;
+ Task task;
+ {
+ auto ref = tasks_queue.getRef();
+ if (ref->isEmpty()) return;
+ task = ref->dequeue();
+ }
+ if (!task.isValid()) return;
+ w->in_work = task.id;
+ taskStarted(task.id);
+ task.func(task.id);
+ w->in_work = -1;
+ taskFinished(task.id);
+ w->notifier.notify();
+}
diff --git a/libs/main/thread/pithreadpoolworker.h b/libs/main/thread/pithreadpoolworker.h
new file mode 100644
index 00000000..295ea6ea
--- /dev/null
+++ b/libs/main/thread/pithreadpoolworker.h
@@ -0,0 +1,177 @@
+//! \~\file pithreadpoolworker.h
+//! \~\ingroup Thread
+//! \brief
+//! \~english Thread pool worker
+//! \~russian Исполнитель пула потоков
+/*
+ PIP - Platform Independent Primitives
+
+ Ivan Pelipenko, Stephan Fomenko
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#ifndef PITHREADPOOLWORKER_H
+#define PITHREADPOOLWORKER_H
+
+#include "piprotectedvariable.h"
+#include "pithread.h"
+
+
+//! \~\ingroup Thread
+//! \~\brief
+//! \~english Fixed-size pool of worker threads for generic-purpose tasks.
+//! \~russian Фиксированный пул рабочих потоков для задач общего назначения.
+class PIP_EXPORT PIThreadPoolWorker: public PIObject {
+ PIOBJECT(PIThreadPoolWorker)
+
+public:
+ //! \~english Constructs executor with \a threads_count worker threads. If \a threads_count < 0 processor threads count used.
+ //! \~russian Создает исполнитель с \a threads_count рабочими потоками. Если \a threads_count < 0 используется количество потоков
+ //! процессора.
+ explicit PIThreadPoolWorker(int threads_count = -1);
+
+ //! \~english Destroy worker threads. Call \a stopAndWait() before.
+ //! \~russian Уничтожает рабочие потоки. Вызывайте перед этим \a stopAndWait().
+ virtual ~PIThreadPoolWorker();
+
+
+ //! \~english Task status.
+ //! \~russian Статус задачи.
+ enum class TaskStatus {
+ Unknown /** \~english ID <= 0 or not queued yet \~russian ID <= 0 или не поставлена в очередь */,
+ Enqueued /** \~english Wait for execution \~russian Ожидает выполнения */,
+ InProgress /** \~english In execution now \~russian В процессе выполнения */,
+ DoneOrCancelled /** \~english Done or cancelled \~russian Выполнена или отменена */
+ };
+
+
+ //! \~english Starts the threads.
+ //! \~russian Запускает потоки.
+ void start();
+
+ //! \~english Requests graceful threads shutdown.
+ //! \~russian Запрашивает корректное завершение потоков.
+ void stop();
+
+ //! \~english Requests stop and waits for threads completion. Returns \b false if the timeout expires.
+ //! \~russian Запрашивает остановку и ожидает завершения потоков. Возвращает \b false, если таймаут истек.
+ bool stopAndWait(PISystemTime timeout = {});
+
+ //! \~english Waits until the threads starts. Returns \b false if the timeout expires first.
+ //! \~russian Ожидает запуска потоков. Возвращает \b false, если таймаут истек раньше.
+ bool waitForStart(PISystemTime timeout = {});
+
+ //! \~english Waits for threads completion. Returns \b false if the timeout expires first.
+ //! \~russian Ожидает завершения потоков. Возвращает \b false, если таймаут истек раньше.
+ bool waitForFinish(PISystemTime timeout = {});
+
+ //! \~english Returns whether the threads are currently running.
+ //! \~russian Возвращает, выполняются ли потоки в данный момент.
+ bool isRunning() const;
+
+ //! \~english Waits for all tasks completion. Returns \b false if the timeout expires first.
+ //! \~russian Ожидает завершения всех задач. Возвращает \b false, если таймаут истек раньше.
+ bool waitForTasks(PISystemTime timeout = {});
+
+ //! \~english Waits for task with id \a id completion. Returns \b false if the timeout expires first.
+ //! \~russian Ожидает завершения задачи с id \a id. Возвращает \b false, если таймаут истек раньше.
+ bool waitForTask(int64_t id, PISystemTime timeout = {});
+
+ //! \~english Starts threads, wait for all tasks complete and threads stop.
+ //! \~russian Запускает потоки, ожидает завершения всех задач и остановки потоков.
+ void exec();
+
+
+ //! \~english Queue functor to execution. Pass task ID in functor. Returns task ID.
+ //! \~russian Запланировать функтор на выполнение. В функтор передастся ID задачи. Возвращает ID задачи.
+ int64_t enqueueTask(std::function func, PIObject * context = nullptr);
+
+ //! \~english Queue functor to execution. Returns task ID.
+ //! \~russian Запланировать функтор на выполнение. Возвращает ID задачи.
+ int64_t enqueueTask(std::function func, PIObject * context = nullptr) {
+ return enqueueTask([func](int64_t) { func(); }, context);
+ }
+
+ //! \~english Queue class member method to execution. Pass task ID in method. Returns task ID.
+ //! \~russian Запланировать член-метод класса на выполнение. В метод передастся ID задачи. Возвращает ID задачи.
+ template
+ int64_t enqueueTask(O * obj, void (O::*member_func)(int64_t)) {
+ return enqueueTask([obj, member_func](int64_t id) { (obj->*member_func)(id); },
+ PIObject::isPIObject(obj) ? dynamic_cast(obj) : nullptr);
+ }
+
+ //! \~english Queue class member method to execution. Returns task ID.
+ //! \~russian Запланировать член-метод класса на выполнение. Возвращает ID задачи.
+ template
+ int64_t enqueueTask(O * obj, void (O::*member_func)()) {
+ return enqueueTask([obj, member_func](int64_t) { (obj->*member_func)(); },
+ PIObject::isPIObject(obj) ? dynamic_cast(obj) : nullptr);
+ }
+
+ //! \~english Remove task with id \a id from queue. Returns if task delete.
+ //! \~russian Удаляет задачу с id \a id из очереди. Возвращиает была ли задача удалена.
+ bool removeTask(int64_t id);
+
+ //! \~english Remove all queued tasks.
+ //! \~russian Удаляет все задачи из очереди.
+ void clearTasks();
+
+ //! \~english Returns task status with id \a id.
+ //! \~russian Возвращиает статус задачи с id \a id.
+ TaskStatus taskStatus(int64_t id) const;
+
+
+ //! \events
+ //! \{
+
+ //! \fn void taskStarted(int64_t id)
+ //! \brief
+ //! \~english Raised on start execution task with id \a id.
+ //! \~russian Вызывается при старте выполнения задачи с id \a id.
+ EVENT1(taskStarted, int64_t, id);
+
+ //! \fn void taskFinished(int64_t id)
+ //! \brief
+ //! \~english Raised on finish execution task with id \a id.
+ //! \~russian Вызывается при завершении выполнения задачи с id \a id.
+ EVENT1(taskFinished, int64_t, id);
+
+ //! \}
+
+
+private:
+ struct Worker {
+ PIThread thread;
+ PIThreadNotifier notifier;
+ std::atomic_int64_t in_work = {-1};
+ };
+ struct Task {
+ bool isValid() const { return func != nullptr; }
+ PIObject * context = nullptr;
+ std::function func = nullptr;
+ int64_t id = -1;
+ };
+
+ void threadFunc(Worker * w);
+
+ mutable PIVector workers;
+ mutable PIProtectedVariable> tasks_queue;
+
+ PISet contexts;
+ std::atomic_int64_t next_task_id = {0};
+};
+
+
+#endif // PITHREADPOOLWORKER_H
diff --git a/libs/main/thread/pitimer.cpp b/libs/main/thread/pitimer.cpp
index 611e9852..8cb72a22 100644
--- a/libs/main/thread/pitimer.cpp
+++ b/libs/main/thread/pitimer.cpp
@@ -274,6 +274,18 @@ void PITimer::stopAndWait(PISystemTime timeout) {
}
+void PITimer::setSlot(std::function func) {
+ ret_func_delim = nullptr;
+ ret_func = std::move(func);
+}
+
+
+void PITimer::setSlot(std::function func) {
+ ret_func = nullptr;
+ ret_func_delim = std::move(func);
+}
+
+
void PITimer::addDelimiter(int delim, std::function func) {
delims << Delimiter(std::move(func), delim);
}
diff --git a/libs/main/thread/pitimer.h b/libs/main/thread/pitimer.h
index 4ed9f089..3ff367a8 100644
--- a/libs/main/thread/pitimer.h
+++ b/libs/main/thread/pitimer.h
@@ -110,11 +110,6 @@ public:
//! \~english Deprecated overload of \a start() that accepts milliseconds.
//! \~russian Устаревшая перегрузка \a start(), принимающая миллисекунды.
bool start(double interval_ms) DEPRECATEDM("use start(PISystemTime)") { return start(PISystemTime::fromMilliseconds(interval_ms)); }
- EVENT_HANDLER0(bool, start);
-
- EVENT_HANDLER0(bool, restart);
-
- EVENT_HANDLER0(void, stop);
//! \~english Deprecated overload of \a stopAndWait() that accepts milliseconds.
//! \~russian Устаревшая перегрузка \a stopAndWait(), принимающая миллисекунды.
@@ -126,23 +121,15 @@ public:
//! \~english Sets a tick callback that ignores the delimiter value.
//! \~russian Устанавливает обратный вызов тика, игнорирующий значение делителя.
- void setSlot(std::function func) {
- ret_func_delim = nullptr;
- ret_func = std::move(func);
- }
+ void setSlot(std::function func);
//! \~english Sets a tick callback that receives the current delimiter value.
//! \~russian Устанавливает обратный вызов тика, принимающий текущее значение делителя.
- void setSlot(std::function func) {
- ret_func = nullptr;
- ret_func_delim = std::move(func);
- }
+ void setSlot(std::function func);
//! \~english Enables locking of the internal mutex around tick processing.
//! \~russian Включает блокировку внутреннего мьютекса вокруг обработки тиков.
void needLockRun(bool need) { lockRun = need; }
- EVENT_HANDLER0(void, lock) { mutex_.lock(); }
- EVENT_HANDLER0(void, unlock) { mutex_.unlock(); }
//! \~english Returns whether the timer drains queued delivery for itself as performer on each main tick. By default \b true.
//! \~russian Возвращает, должен ли таймер обрабатывать отложенную доставку для себя как исполнителя на каждом основном тике. По
@@ -169,9 +156,6 @@ public:
//! \~russian Удаляет все делители со значением \a delim.
void removeDelimiter(int delim);
- EVENT_HANDLER0(void, clearDelimiters) { delims.clear(); }
-
- EVENT1(tickEvent, int, delimiter);
//! \handlers
//! \{
@@ -180,31 +164,37 @@ public:
//! \brief
//! \~english Starts the timer with the current \a interval().
//! \~russian Запускает таймер с текущим значением \a interval().
+ EVENT_HANDLER0(bool, start);
//! \fn bool restart()
//! \brief
//! \~english Stops the timer, then starts it again with the current \a interval().
//! \~russian Останавливает таймер, затем снова запускает его с текущим значением \a interval().
+ EVENT_HANDLER0(bool, restart);
//! \fn bool stop()
//! \brief
//! \~english Requests stop and wakes the timer thread, but does not wait for completion.
//! \~russian Запрашивает остановку и пробуждает поток таймера, но не ожидает завершения.
+ EVENT_HANDLER0(void, stop);
//! \fn void clearDelimiters()
//! \brief
//! \~english Remove all frequency delimiters
//! \~russian Удаляет все делители частоты
+ EVENT_HANDLER0(void, clearDelimiters) { delims.clear(); }
//! \fn void lock()
//! \brief
//! \~english Locks the internal timer mutex.
//! \~russian Блокирует внутренний мьютекс таймера.
+ EVENT_HANDLER0(void, lock) { mutex_.lock(); }
//! \fn void unlock()
//! \brief
//! \~english Unlocks the internal timer mutex.
//! \~russian Разблокирует внутренний мьютекс таймера.
+ EVENT_HANDLER0(void, unlock) { mutex_.unlock(); }
//! \}
//! \events
@@ -219,7 +209,7 @@ public:
//! "delimiter" is the frequency delimiter, 1 for the main loop.
//! \~russian
//! "delimiter" - делитель частоты, 1 для основного цикла.
-
+ EVENT1(tickEvent, int, delimiter);
//! \}
diff --git a/main.cpp b/main.cpp
index cf2f6235..61a7b945 100644
--- a/main.cpp
+++ b/main.cpp
@@ -1,143 +1,53 @@
-#include "libs/http_client/curl_thread_pool_p.h"
-#include "picodeparser.h"
-#include "pidigest.h"
-#include "pihttpclient.h"
-#include "piliterals.h"
#include "pip.h"
-#include "piunits.h"
-#include "pivaluetree_conversions.h"
+#include "pithreadpoolexecutor.h"
-using namespace PICoutManipulators;
-using namespace PIHTTP;
-using namespace PIUnits::Class;
-
-int rcnt = 0, scnt = 0;
-
-inline PIByteArray SMBusTypeInfo_genHash(PIString n) {
- PICrypt c;
- return piSerialize(c.shorthash(n.removeAll(" "), PIString("SMBusDataHashKey").toByteArray()));
-}
+class A: public PIObject {
+ PIOBJECT(A)
+public:
+ A(PIString n = {}) { setName(n); }
+ void foo() {
+ 100_ms .sleep();
+ piCoutObj << "foo!";
+ 100_ms .sleep();
+ }
+};
int main(int argc, char * argv[]) {
- PICrypt _crypt;
- // auto ba = PIFile::readAll("logo.png");
- PIString str = "hello!"_a;
- PIByteArray ba = str.toAscii();
- PIByteArray key = PIString("SMBusDataHashKey").toByteArray();
+ PIVector objects;
+ objects << new A("1") << new A("2") << new A("3");
- const int times = 1000000;
- PITimeMeasurer tm;
- PISystemTime el;
+ PITimer status_timer;
+ PIThreadPoolWorker pool(2);
+ pool.start();
- tm.reset();
- piForTimes(times) {
- PIDigest::calculateWithKey(ba, key, PIDigest::Type::SipHash_2_4_128);
- }
- el = tm.elapsed();
- piCout << "PIDigest" << el.toString();
+ // int64_t id = -1;
+ // status_timer.start(10_Hz, [&id, &pool] { piCout << "[timer] status" << (int)pool.taskStatus(id); });
- tm.reset();
- piForTimes(times) {
- _crypt.shorthash(str, key);
- }
- el = tm.elapsed();
- piCout << " sodium" << el.toString();
+ 100_ms .sleep();
+ // pool.enqueueTask([](int64_t id) {
+ // piCout << "[task ] start, id" << id;
+ // // 500_ms .sleep();
+ // piCout << "[task ] done";
+ // });
+ pool.enqueueTask(objects[0], &A::foo);
+ pool.enqueueTask(objects[1], &A::foo);
+ pool.enqueueTask(objects[1], &A::foo);
+ pool.enqueueTask(objects[2], &A::foo);
- tm.reset();
- piForTimes(times) {
- PIDigest::calculateWithKey(ba, key, PIDigest::Type::BLAKE2b_128);
- }
- el = tm.elapsed();
- piCout << " blake" << el.toString();
+ 10_ms .sleep();
+ delete objects[1];
+ objects.remove(1);
+ // piCout << "[main ]" << "enqueued, id" << id;
- return 0;
+ // 200_ms .sleep();
+ piCout << "[main ]" << "wait ...";
+ piCout << "[main ]" << "wait done";
+ 1000_ms .sleep();
- PIEthernet *eth_r, *eth_s;
- eth_r = PIIODevice::createFromFullPath("eth://udp: 192.168.1.25 :10000")->cast();
- eth_s = PIIODevice::createFromFullPath("eth://udp: : : 192.168.1.25:10000")->cast();
-
- eth_r->setReadBufferSize(1_MiB);
- CONNECTL(eth_r, threadedReadEvent, [](const uchar * readed, ssize_t size) {
- // piCout << "rec";
- piMSleep(1);
- ++rcnt;
- });
- eth_r->startThreadedRead();
-
- PIByteArray _ba(1400);
- for (int i = 0; i < 100; ++i) {
- eth_s->write(_ba);
- ++scnt;
- }
-
- 0.2_s .sleep();
-
- piCout << "snd" << scnt;
- piCout << "rec" << rcnt;
-
- piDeleteSafety(eth_r);
- piDeleteSafety(eth_s);
- return 0;
-
- PITranslator::loadLang("ru");
- /*auto ucl = PIUnits::allClasses();
- for (auto c: ucl) {
- piCout << (c->className() + ":");
- for (auto t: c->allTypes()) {
- piCout << " " << c->name(t) << "->" << c->unit(t);
- }
- }*/
-
- // PIUnits::Value(1);
- // piCout << PIUnits::name(PIUnits::Class::Information::Bit);
- // piCout << PIUnits::name(PIUnits::Class::Information::Byte);
- // piCout << PIUnits::name(PIUnits::Class::Information::_LastType);
- // piCout << PIUnits::name((int)PIUnits::Class::Angle::Degree);
-
- // piCout << PIUnits::unit(PIUnits::Class::Information::Bit);
- // piCout << PIUnits::unit(PIUnits::Class::Information::Byte);
- // piCout << PIUnits::unit(PIUnits::Class::Information::_LastType);
- // piCout << PIUnits::unit((int)PIUnits::Class::Angle::Degree);
-
- // for (int i = -10; i < 10; ++i)
- // piCout << PIUnits::Value(pow10(i * 0.99), PIUnits::Class::Distance::Meter).toString();
-
- auto v = PIUnits::Value(M_PI, Angle::Radian);
- piCout << v << "=" << v.converted(Angle::Degree);
-
- v = PIUnits::Value(45, Angle::Degree);
- piCout << v << "=" << v.converted(Angle::Radian);
-
- piCout << PIUnits::Value(5E-5, Time::Second);
- piCout << PIUnits::Value(3E-3, Time::Second);
- piCout << PIUnits::Value(0.8, Time::Second);
- piCout << PIUnits::Value(1.2, Time::Second);
- piCout << PIUnits::Value(1001, Time::Second);
- piCout << PIUnits::Value(1000001, Time::Second);
-
- piCout << PIUnits::Value(1_KB, Information::Byte);
- piCout << PIUnits::Value(1_MB, Information::Byte);
- piCout << PIUnits::Value(1_MiB, Information::Byte);
- piCout << PIUnits::Value(1_MB, Information::Byte).converted(Information::Bit);
- piCout << PIUnits::Value(1_MiB, Information::Byte).converted(Information::Bit);
-
- piCout << PIUnits::Value(0., Temperature::Celsius).converted(Temperature::Kelvin);
- piCout << PIUnits::Value(0., Temperature::Celsius).converted(Temperature::Fahrenheit);
- piCout << PIUnits::Value(100., Temperature::Celsius).converted(Temperature::Fahrenheit);
-
- piCout << PIUnits::Value(1., Pressure::Atmosphere).converted(Pressure::Pascal);
- piCout << PIUnits::Value(1., Pressure::Atmosphere).converted(Pressure::MillimetreOfMercury);
- piCout << PIUnits::Value(766., Pressure::MillimetreOfMercury).converted(Pressure::Atmosphere);
-
- piCout << PIUnits::Value(5E-5, Time::Second).converted(Time::Hertz);
- piCout << PIUnits::Value(3E-3, Time::Second).converted(Time::Hertz);
- piCout << PIUnits::Value(0.8, Time::Second).converted(Time::Hertz);
- piCout << PIUnits::Value(1.2, Time::Second).converted(Time::Hertz);
- piCout << PIUnits::Value(1001, Time::Second).converted(Time::Hertz);
- piCout << PIUnits::Value(1000001, Time::Second).converted(Time::Hertz);
- // piCout << PIUnits::Value(0.2, Time::Second).converted(Time::Hertz);
- // piCout << PIUnits::Value(5E-5, Time::Second).converted(Time::Hertz);
+ pool.stopAndWait();
+ status_timer.stopAndWait();
+ piDeleteAll(objects);
return 0;
}