From 8c3349d84afb922878cc77b38825504af03a339a Mon Sep 17 00:00:00 2001 From: andrey Date: Tue, 11 Aug 2020 17:26:44 +0300 Subject: [PATCH] fix PIThreadPoolExecutor and PIBlockingDequeue --- lib/main/thread/piblockingdequeue.h | 60 +++++--------------- lib/main/thread/pithreadpoolexecutor.cpp | 20 ++----- lib/main/thread/pithreadpoolexecutor.h | 4 +- tests/concurrent/BlockingDequeueUnitTest.cpp | 12 ++-- 4 files changed, 27 insertions(+), 69 deletions(-) diff --git a/lib/main/thread/piblockingdequeue.h b/lib/main/thread/piblockingdequeue.h index 27e23572..4b7fc57f 100644 --- a/lib/main/thread/piblockingdequeue.h +++ b/lib/main/thread/piblockingdequeue.h @@ -86,29 +86,13 @@ public: * @param v the element to add * @return true if the element was added to this queue, else false */ - bool offer(const T & v) { + bool offer(const T & v, int timeoutMs = 0) { + bool isOk; mutex.lock(); - if (PIDeque::size() >= max_size) { - mutex.unlock(); - return false; - } - PIDeque::push_back(v); - mutex.unlock(); - cond_var_add->notifyOne(); - return true; - } - - /** - * @brief Inserts the specified element into this queue, waiting up to the specified wait time if necessary for - * space to become available. - * - * @param v the element to add - * @param timeoutMs how long to wait before giving up, in milliseconds - * @return true if successful, or false if the specified waiting time elapses before space is available - */ - bool offer(const T & v, int timeoutMs) { - mutex.lock(); - bool isOk = cond_var_rem->waitFor(mutex, timeoutMs, [&]() { return PIDeque::size() < max_size; } ); + if (timeoutMs == 0) + isOk = PIDeque::size() < max_size; + else + isOk = cond_var_rem->waitFor(mutex, timeoutMs, [&]() { return PIDeque::size() < max_size; } ); if (isOk) PIDeque::push_back(v); mutex.unlock(); if (isOk) cond_var_add->notifyOne(); @@ -140,31 +124,15 @@ public: * return value is retrieved value * @return the head of this queue, or defaultVal if the specified waiting time elapses before an element is available */ - T poll(int timeoutMs, const T & defaultVal = T(), bool * isOk = nullptr) { - T t; + T poll(int timeoutMs = 0, const T & defaultVal = T(), bool * isOk = nullptr) { + T t = defaultVal; + bool isNotEmpty; mutex.lock(); - bool isNotEmpty = cond_var_add->waitFor(mutex, timeoutMs, [&]() { return !PIDeque::isEmpty(); }); - t = isNotEmpty ? T(PIDeque::take_front()) : defaultVal; - mutex.unlock(); - if (isNotEmpty) cond_var_rem->notifyOne(); - if (isOk) *isOk = isNotEmpty; - return t; - } - - /** - * @brief Retrieves and removes the head of this queue and return it if queue not empty, otherwise return defaultVal. - * Do it immediately without waiting. - * - * @param defaultVal value, which returns if the specified waiting time elapses before an element is available - * @param isOk flag, which indicates result of method execution. It will be set to false if timeout, or true if - * return value is retrieved value - * @return the head of this queue, or defaultVal if the specified waiting time elapses before an element is available - */ - T poll(const T & defaultVal = T(), bool * isOk = nullptr) { - T t; - mutex.lock(); - bool isNotEmpty = !PIDeque::isEmpty(); - t = isNotEmpty ? PIDeque::take_front() : defaultVal; + if (timeoutMs == 0) + isNotEmpty = !PIDeque::isEmpty(); + else + isNotEmpty = cond_var_add->waitFor(mutex, timeoutMs, [&]() { return !PIDeque::isEmpty(); }); + if (isNotEmpty) t = PIDeque::take_front(); mutex.unlock(); if (isNotEmpty) cond_var_rem->notifyOne(); if (isOk) *isOk = isNotEmpty; diff --git a/lib/main/thread/pithreadpoolexecutor.cpp b/lib/main/thread/pithreadpoolexecutor.cpp index d0feeb83..ea0eb9aa 100644 --- a/lib/main/thread/pithreadpoolexecutor.cpp +++ b/lib/main/thread/pithreadpoolexecutor.cpp @@ -18,7 +18,6 @@ */ #include "pithreadpoolexecutor.h" -#include "pisysteminfo.h" /*! \class PIThreadPoolExecutor * @brief Thread pools address two different problems: they usually provide improved performance when executing large @@ -27,21 +26,14 @@ */ -PIThreadPoolExecutor::PIThreadPoolExecutor(size_t corePoolSize, PIBlockingDequeue > * taskQueue_) : isShutdown_(false) { - queue_own = false; - if (corePoolSize <= 0) - corePoolSize = PISystemInfo::instance()->processorsCount; - if (!taskQueue_) { - taskQueue = new PIBlockingDequeue >(); - queue_own = true; - } - for (size_t i = 0; i < corePoolSize; ++i) { +PIThreadPoolExecutor::PIThreadPoolExecutor(int corePoolSize) : isShutdown_(false) { + for (int i = 0; i < corePoolSize; ++i) { PIThread * thread = new PIThread([&, i](){ - auto runnable = taskQueue->poll(100, std::function()); + auto runnable = taskQueue.poll(100, std::function()); if (runnable) { runnable(); } - if (isShutdown_ && taskQueue->size() == 0) threadPool[i]->stop(); + if (isShutdown_ && taskQueue.size() == 0) threadPool[i]->stop(); }); threadPool.push_back(thread); thread->start(); @@ -69,13 +61,11 @@ void PIThreadPoolExecutor::shutdownNow() { PIThreadPoolExecutor::~PIThreadPoolExecutor() { shutdownNow(); while (threadPool.size() > 0) delete threadPool.take_back(); - if (queue_own) - delete taskQueue; } void PIThreadPoolExecutor::execute(const std::function & runnable) { - if (!isShutdown_) taskQueue->offer(runnable); + if (!isShutdown_) taskQueue.offer(runnable); } diff --git a/lib/main/thread/pithreadpoolexecutor.h b/lib/main/thread/pithreadpoolexecutor.h index 1d3bf75f..266103ea 100644 --- a/lib/main/thread/pithreadpoolexecutor.h +++ b/lib/main/thread/pithreadpoolexecutor.h @@ -26,7 +26,7 @@ class PIP_EXPORT PIThreadPoolExecutor { public: - explicit PIThreadPoolExecutor(size_t corePoolSize = -1, PIBlockingDequeue > * taskQueue_ = 0); + explicit PIThreadPoolExecutor(int corePoolSize); virtual ~PIThreadPoolExecutor(); @@ -54,7 +54,7 @@ public: private: std::atomic_bool isShutdown_; - PIBlockingDequeue > * taskQueue; + PIBlockingDequeue > taskQueue; PIVector threadPool; bool queue_own; diff --git a/tests/concurrent/BlockingDequeueUnitTest.cpp b/tests/concurrent/BlockingDequeueUnitTest.cpp index fae9f3ff..08a4b7ce 100644 --- a/tests/concurrent/BlockingDequeueUnitTest.cpp +++ b/tests/concurrent/BlockingDequeueUnitTest.cpp @@ -69,7 +69,7 @@ TEST(BlockingDequeueUnitTest, offer_timedout_is_block_when_capacity_reach) { TEST(BlockingDequeueUnitTest, offer_is_true_before_capacity_reach) { size_t capacity = 1; PIBlockingDequeue dequeue(capacity); - ASSERT_TRUE(dequeue.offer(10)); + ASSERT_TRUE(dequeue.offer(10)); } TEST(BlockingDequeueUnitTest, offer_is_false_when_capacity_reach) { @@ -125,7 +125,7 @@ TEST(BlockingDequeueUnitTest, poll_is_not_block_when_empty) { bool isOk; auto conditionVar = new MockConditionVar(); PIBlockingDequeue dequeue(capacity, conditionVar); - dequeue.poll(111, &isOk); + dequeue.poll(0, 111, &isOk); EXPECT_FALSE(conditionVar->isWaitForCalled); } @@ -134,7 +134,7 @@ TEST(BlockingDequeueUnitTest, poll_is_default_value_when_empty) { bool isOk; auto conditionVar = new MockConditionVar(); PIBlockingDequeue dequeue(capacity, conditionVar); - ASSERT_EQ(dequeue.poll(111, &isOk), 111); + ASSERT_EQ(dequeue.poll(0, 111, &isOk), 111); } TEST(BlockingDequeueUnitTest, poll_is_offer_value_when_not_empty) { @@ -143,7 +143,7 @@ TEST(BlockingDequeueUnitTest, poll_is_offer_value_when_not_empty) { auto conditionVar = new MockConditionVar(); PIBlockingDequeue dequeue(capacity, conditionVar); dequeue.offer(111); - ASSERT_EQ(dequeue.poll(-1, &isOk), 111); + ASSERT_EQ(dequeue.poll(0, -1, &isOk), 111); } TEST(BlockingDequeueUnitTest, poll_timeouted_is_block_when_empty) { @@ -152,8 +152,8 @@ TEST(BlockingDequeueUnitTest, poll_timeouted_is_block_when_empty) { auto conditionVar = new MockConditionVar(); PIBlockingDequeue dequeue(capacity, conditionVar); dequeue.poll(timeout, 111); - EXPECT_TRUE(conditionVar->isWaitForCalled); - EXPECT_EQ(timeout, conditionVar->timeout); + EXPECT_TRUE(conditionVar->isWaitForCalled); + EXPECT_EQ(timeout, conditionVar->timeout); ASSERT_FALSE(conditionVar->isTrueCondition); }