// // Created by fomenko on 23.09.2019. // #include "executor.h" PIThreadPoolExecutor::PIThreadPoolExecutor(size_t corePoolSize, PIBlockingDequeue> *taskQueue_, PIThreadFactory *threadFactory) : isShutdown_(false), taskQueue(taskQueue_), threadFactory(threadFactory) { for (size_t i = 0; i < corePoolSize; ++i) { AbstractThread* thread = threadFactory->newThread([&, i](){ auto runnable = taskQueue->poll(100, std::function()); if (runnable) { runnable(); } else if (isShutdown_) threadPool[i]->stop(); }); threadPool.push_back(thread); thread->start(); } } bool PIThreadPoolExecutor::awaitTermination(int timeoutMs) { PITimeMeasurer measurer; for (size_t i = 0; i < threadPool.size(); ++i) { int dif = timeoutMs - (int)measurer.elapsed_m(); if (dif < 0) return false; if (!threadPool[i]->waitForFinish(dif)) return false; } return true; } void PIThreadPoolExecutor::shutdownNow() { isShutdown_ = true; for (size_t i = 0; i < threadPool.size(); ++i) threadPool[i]->stop(); } PIThreadPoolExecutor::~PIThreadPoolExecutor() { shutdownNow(); taskQueue->getConditionVar()->notifyAll(); while (threadPool.size() > 0) delete threadPool.take_back(); delete threadFactory; delete taskQueue; } void PIThreadPoolExecutor::execute(const std::function &runnable) { if (!isShutdown_) taskQueue->offer(runnable); } volatile bool PIThreadPoolExecutor::isShutdown() const { return isShutdown_; }