/* PIP - Platform Independent Primitives Stephan Fomenko This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #ifndef PIP_EXECUTOR_H #define PIP_EXECUTOR_H #include "piblockingdequeue.h" #include /** * @brief Thread pools address two different problems: they usually provide improved performance when executing large * numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and * managing the resources, including threads, consumed when executing a collection of tasks. * * TODO adapt documentation to template */ template class PIP_EXPORT PIThreadPoolExecutorTemplate { public: explicit PIThreadPoolExecutorTemplate(size_t corePoolSize = 1) : isShutdown_(false) { makePool(corePoolSize); } virtual ~PIThreadPoolExecutorTemplate() { shutdownNow(); while (threadPool.size() > 0) delete threadPool.take_back(); } /** * @brief Executes the given task sometime in the future. The task execute in an existing pooled thread. If the task * cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been * reached. * * @param runnable not empty function for thread pool execution */ void execute(const std::function & runnable) { if (!isShutdown_) taskQueue.offer(runnable); } /** * @brief Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be * accepted. Invocation has no additional effect if already shut down. This method does not wait for previously * submitted tasks to complete execution. Use awaitTermination to do that. */ void shutdown() { isShutdown_ = true; } void shutdownNow() { isShutdown_ = true; for (size_t i = 0; i < threadPool.size(); ++i) threadPool[i]->stop(); } bool isShutdown() const { return isShutdown_; } bool awaitTermination(int timeoutMs) { PITimeMeasurer measurer; for (size_t i = 0; i < threadPool.size(); ++i) { int dif = timeoutMs - (int)measurer.elapsed_m(); if (dif < 0) return false; if (!threadPool[i]->waitForFinish(dif)) return false; } return true; } protected: std::atomic_bool isShutdown_; Dequeue_ taskQueue; PIVector threadPool; template PIThreadPoolExecutorTemplate(size_t corePoolSize, Function onBeforeStart) : isShutdown_(false) { makePool(corePoolSize, onBeforeStart); } void makePool(size_t corePoolSize, std::function onBeforeStart = [](Thread_*){}) { for (size_t i = 0; i < corePoolSize; ++i) { auto* thread = new Thread_([&, i](){ auto runnable = taskQueue.poll(100); if (runnable) { runnable(); } if (isShutdown_ && taskQueue.size() == 0) threadPool[i]->stop(); }); threadPool.push_back(thread); onBeforeStart(thread); thread->start(); } } }; typedef PIThreadPoolExecutorTemplate > > PIThreadPoolExecutor; #endif //PIP_EXECUTOR_H