fix concurrent

git-svn-id: svn://db.shs.com.ru/pip@884 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5
This commit is contained in:
2020-02-25 15:58:02 +00:00
parent f8f627360a
commit 92ac2b12cf
7 changed files with 408 additions and 467 deletions

View File

@@ -1,54 +1,55 @@
//
// Created by fomenko on 23.09.2019.
//
#include "executor.h"
PIThreadPoolExecutor::PIThreadPoolExecutor(size_t corePoolSize, PIBlockingDequeue<std::function<void()>> *taskQueue_,
PIThreadFactory *threadFactory) : isShutdown_(false), taskQueue(taskQueue_), threadFactory(threadFactory) {
for (size_t i = 0; i < corePoolSize; ++i) {
AbstractThread* thread = threadFactory->newThread([&, i](){
auto runnable = taskQueue->poll(100, std::function<void()>());
if (runnable) {
runnable();
}
if (isShutdown_ && taskQueue->size() == 0) threadPool[i]->stop();
});
threadPool.push_back(thread);
thread->start();
}
PIThreadPoolExecutor::PIThreadPoolExecutor(size_t corePoolSize, PIBlockingDequeue<std::function<void()>> *taskQueue_) : isShutdown_(false), taskQueue(taskQueue_) {
for (size_t i = 0; i < corePoolSize; ++i) {
PIThread * thread = new PIThread([&, i](){
auto runnable = taskQueue->poll(100, std::function<void()>());
if (runnable) {
runnable();
}
if (isShutdown_ && taskQueue->size() == 0) threadPool[i]->stop();
});
threadPool.push_back(thread);
thread->start();
}
}
bool PIThreadPoolExecutor::awaitTermination(int timeoutMs) {
PITimeMeasurer measurer;
for (size_t i = 0; i < threadPool.size(); ++i) {
int dif = timeoutMs - (int)measurer.elapsed_m();
if (dif < 0) return false;
if (!threadPool[i]->waitForFinish(dif)) return false;
}
return true;
PITimeMeasurer measurer;
for (size_t i = 0; i < threadPool.size(); ++i) {
int dif = timeoutMs - (int)measurer.elapsed_m();
if (dif < 0) return false;
if (!threadPool[i]->waitForFinish(dif)) return false;
}
return true;
}
void PIThreadPoolExecutor::shutdownNow() {
isShutdown_ = true;
for (size_t i = 0; i < threadPool.size(); ++i) threadPool[i]->stop();
isShutdown_ = true;
for (size_t i = 0; i < threadPool.size(); ++i) threadPool[i]->stop();
}
PIThreadPoolExecutor::~PIThreadPoolExecutor() {
shutdownNow();
while (threadPool.size() > 0) delete threadPool.take_back();
delete threadFactory;
delete taskQueue;
shutdownNow();
while (threadPool.size() > 0) delete threadPool.take_back();
delete taskQueue;
}
void PIThreadPoolExecutor::execute(const std::function<void()> &runnable) {
if (!isShutdown_) taskQueue->offer(runnable);
if (!isShutdown_) taskQueue->offer(runnable);
}
volatile bool PIThreadPoolExecutor::isShutdown() const {
return isShutdown_;
return isShutdown_;
}
void PIThreadPoolExecutor::shutdown() {
isShutdown_ = true;
isShutdown_ = true;
}