From 8efd2cf4470264062b2f12edc43f28949a905ce2 Mon Sep 17 00:00:00 2001 From: Stepan Fomenko Date: Fri, 17 Jul 2020 18:36:28 +0300 Subject: [PATCH] Rewrite executor to template & come back executor unit tests --- lib/concurrent/executor.cpp | 74 ------------- .../test/ExecutorIntegrationTest.cpp | 9 +- lib/concurrent/test/ExecutorUnitTest.cpp | 102 ++++++++++++++++++ lib/concurrent/test/testutil.h | 5 +- lib/main/concurrent/executor.h | 71 +++++++++--- 5 files changed, 169 insertions(+), 92 deletions(-) delete mode 100644 lib/concurrent/executor.cpp create mode 100644 lib/concurrent/test/ExecutorUnitTest.cpp diff --git a/lib/concurrent/executor.cpp b/lib/concurrent/executor.cpp deleted file mode 100644 index a1c80042..00000000 --- a/lib/concurrent/executor.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/* - PIP - Platform Independent Primitives - - Stephan Fomenko - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "executor.h" - - -PIThreadPoolExecutor::PIThreadPoolExecutor(size_t corePoolSize, PIBlockingDequeue> *taskQueue_) : isShutdown_(false), taskQueue(taskQueue_) { - for (size_t i = 0; i < corePoolSize; ++i) { - PIThread * thread = new PIThread([&, i](){ - auto runnable = taskQueue->poll(100, std::function()); - if (runnable) { - runnable(); - } - if (isShutdown_ && taskQueue->size() == 0) threadPool[i]->stop(); - }); - threadPool.push_back(thread); - thread->start(); - } -} - - -bool PIThreadPoolExecutor::awaitTermination(int timeoutMs) { - PITimeMeasurer measurer; - for (size_t i = 0; i < threadPool.size(); ++i) { - int dif = timeoutMs - (int)measurer.elapsed_m(); - if (dif < 0) return false; - if (!threadPool[i]->waitForFinish(dif)) return false; - } - return true; -} - - -void PIThreadPoolExecutor::shutdownNow() { - isShutdown_ = true; - for (size_t i = 0; i < threadPool.size(); ++i) threadPool[i]->stop(); -} - - -PIThreadPoolExecutor::~PIThreadPoolExecutor() { - shutdownNow(); - while (threadPool.size() > 0) delete threadPool.take_back(); - delete taskQueue; -} - - -void PIThreadPoolExecutor::execute(const std::function &runnable) { - if (!isShutdown_) taskQueue->offer(runnable); -} - - -bool PIThreadPoolExecutor::isShutdown() const { - return isShutdown_; -} - - -void PIThreadPoolExecutor::shutdown() { - isShutdown_ = true; -} diff --git a/lib/concurrent/test/ExecutorIntegrationTest.cpp b/lib/concurrent/test/ExecutorIntegrationTest.cpp index 099c5a8e..e2a40299 100644 --- a/lib/concurrent/test/ExecutorIntegrationTest.cpp +++ b/lib/concurrent/test/ExecutorIntegrationTest.cpp @@ -1,8 +1,7 @@ #include "gtest/gtest.h" #include "executor.h" #include "pimutex.h" - -const int WAIT_THREAD_TIME_MS = 30; +#include "testutil.h" TEST(ExcutorIntegrationTest, execute_is_runnable_invoke) { PIMutex m; @@ -14,11 +13,13 @@ TEST(ExcutorIntegrationTest, execute_is_runnable_invoke) { m.unlock(); }); piMSleep(WAIT_THREAD_TIME_MS); + m.lock(); ASSERT_EQ(invokedRunnables, 1); + m.unlock(); } TEST(ExcutorIntegrationTest, execute_is_not_execute_after_shutdown) { - bool isRunnableInvoke = false; + volatile bool isRunnableInvoke = false; PIThreadPoolExecutor executorService(1); executorService.shutdown(); executorService.execute([&]() { @@ -29,7 +30,7 @@ TEST(ExcutorIntegrationTest, execute_is_not_execute_after_shutdown) { } TEST(ExcutorIntegrationTest, execute_is_execute_before_shutdown) { - bool isRunnableInvoke = false; + volatile bool isRunnableInvoke = false; PIThreadPoolExecutor executorService(1); executorService.execute([&]() { piMSleep(WAIT_THREAD_TIME_MS); diff --git a/lib/concurrent/test/ExecutorUnitTest.cpp b/lib/concurrent/test/ExecutorUnitTest.cpp new file mode 100644 index 00000000..ac6adde3 --- /dev/null +++ b/lib/concurrent/test/ExecutorUnitTest.cpp @@ -0,0 +1,102 @@ +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "executor.h" +#include "testutil.h" + +using ::testing::_; +using ::testing::SetArgReferee; +using ::testing::DoAll; +using ::testing::DeleteArg; +using ::testing::Return; +using ::testing::AtLeast; +using ::testing::ByRef; +using ::testing::Eq; +using ::testing::Ge; +using ::testing::Pointee; +using ::testing::IsNull; +using ::testing::NiceMock; + +typedef std::function VoidFunc; + +namespace std { + inline bool operator ==(const VoidFunc& s, const VoidFunc& v) { + // TODO VoidFunc operator == + return true; + } +} + +class MockThread { +public: + std::function runnnable; + + MockThread(std::function runnnable) : runnnable(runnnable) { } + + MOCK_METHOD0(start, bool()); + MOCK_METHOD0(stop, void()); + MOCK_METHOD1(waitForStart, bool(int timeout_msecs)); + MOCK_METHOD1(waitForFinish, bool(int timeout_msecs)); +}; + +class MockDeque : public PIBlockingDequeue { +public: + MOCK_METHOD1(offer, bool(const VoidFunc&)); + MOCK_METHOD0(take, VoidFunc()); + MOCK_METHOD1(poll, VoidFunc(int)); + MOCK_METHOD0(capacity, size_t()); + MOCK_METHOD0(remainingCapacity, size_t()); +}; + +typedef PIThreadPoolExecutorTemplate, MockDeque> PIThreadPoolExecutorMoc_t; + +class PIThreadPoolExecutorMoc : public PIThreadPoolExecutorMoc_t { +public: + explicit PIThreadPoolExecutorMoc(size_t corePoolSize) : PIThreadPoolExecutorMoc_t(corePoolSize) { } + + template + explicit PIThreadPoolExecutorMoc(size_t corePoolSize, Function onBeforeStart) : PIThreadPoolExecutorMoc_t(corePoolSize, onBeforeStart) { } + + PIVector*>* getThreadPool() { return &threadPool; } + bool isShutdown() { return isShutdown_; } + MockDeque* getTaskQueue() { return &taskQueue; } +}; + +TEST(ExecutorUnitTest, is_corePool_created) { + PIThreadPoolExecutorMoc executor(THREAD_COUNT); + ASSERT_EQ(THREAD_COUNT, executor.getThreadPool()->size()); +} + +TEST(ExecutorUnitTest, is_corePool_started) { + PIThreadPoolExecutorMoc executor(THREAD_COUNT, [](MockThread* thread){ + EXPECT_CALL(*thread, start()) + .WillOnce(Return(true)); + }); + EXPECT_EQ(THREAD_COUNT, executor.getThreadPool()->size()); + executor.getThreadPool()->forEach([](MockThread* thread){ + EXPECT_CALL(*thread, stop()) + .WillOnce(Return()); + }); +} + +TEST(ExecutorUnitTest, execute_is_added_to_taskQueue) { + VoidFunc voidFunc = [](){}; + PIThreadPoolExecutorMoc executor(THREAD_COUNT); + EXPECT_CALL(*executor.getTaskQueue(), offer(Eq(voidFunc))) + .WillOnce(Return(true)); + executor.execute(voidFunc); +} + +TEST(ExecutorUnitTest, is_corePool_execute_queue_elements) { + bool is_executed = false; + PIThreadPoolExecutorMoc executor(1); + EXPECT_EQ(executor.getThreadPool()->size(), 1); + EXPECT_CALL(*executor.getTaskQueue(), poll(Ge(0))) + .WillOnce(Return([&](){ is_executed = true; })); + executor.getThreadPool()->at(0)->runnnable(); + ASSERT_TRUE(is_executed); +} +/* FIXME +TEST(ExecutorUnitTest, shutdown_is_stop_threads) { + PIThreadPoolExecutorMoc executor(THREAD_COUNT); + executor.shutdown(); +} +*/ \ No newline at end of file diff --git a/lib/concurrent/test/testutil.h b/lib/concurrent/test/testutil.h index 063f6622..eb48c2c8 100644 --- a/lib/concurrent/test/testutil.h +++ b/lib/concurrent/test/testutil.h @@ -8,9 +8,9 @@ * Minimum wait thread start, switch context or another interthread communication action time. Increase it if tests * write "Start thread timeout reach!" message. You can reduce it if you want increase test performance. */ -const int WAIT_THREAD_TIME_MS = 40; +const int WAIT_THREAD_TIME_MS = 10; -const int THREAD_COUNT = 5; +const int THREAD_COUNT = 2; class TestUtil: public PIObject { PIOBJECT(TestUtil) @@ -55,6 +55,7 @@ public: return !isTimeout; } + }; #endif //AWRCANFLASHER_TESTUTIL_H diff --git a/lib/main/concurrent/executor.h b/lib/main/concurrent/executor.h index 63a6135c..a4ccdaff 100644 --- a/lib/main/concurrent/executor.h +++ b/lib/main/concurrent/executor.h @@ -27,12 +27,18 @@ * @brief Thread pools address two different problems: they usually provide improved performance when executing large * numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and * managing the resources, including threads, consumed when executing a collection of tasks. + * + * TODO adapt documentation to template */ -class PIThreadPoolExecutor { +template +class PIThreadPoolExecutorTemplate { public: - explicit PIThreadPoolExecutor(size_t corePoolSize = 1, PIBlockingDequeue >* taskQueue_ = new PIBlockingDequeue >()); + explicit PIThreadPoolExecutorTemplate(size_t corePoolSize = 1) : isShutdown_(false) { makePool(corePoolSize); } - virtual ~PIThreadPoolExecutor(); + virtual ~PIThreadPoolExecutorTemplate() { + shutdownNow(); + while (threadPool.size() > 0) delete threadPool.take_back(); + } /** * @brief Executes the given task sometime in the future. The task execute in an existing pooled thread. If the task @@ -41,24 +47,65 @@ public: * * @param runnable not empty function for thread pool execution */ - void execute(const std::function& runnable); - - void shutdownNow(); + void execute(const std::function &runnable) { + if (!isShutdown_) taskQueue.offer(runnable); + } /** * @brief Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be * accepted. Invocation has no additional effect if already shut down. This method does not wait for previously * submitted tasks to complete execution. Use awaitTermination to do that. */ - void shutdown(); + void shutdown() { + isShutdown_ = true; + } - bool isShutdown() const; + void shutdownNow() { + isShutdown_ = true; + for (size_t i = 0; i < threadPool.size(); ++i) threadPool[i]->stop(); + } - bool awaitTermination(int timeoutMs); -private: + bool isShutdown() const { + return isShutdown_; + } + + bool awaitTermination(int timeoutMs) { + PITimeMeasurer measurer; + for (size_t i = 0; i < threadPool.size(); ++i) { + int dif = timeoutMs - (int)measurer.elapsed_m(); + if (dif < 0) return false; + if (!threadPool[i]->waitForFinish(dif)) return false; + } + return true; + } + +protected: std::atomic_bool isShutdown_; - PIBlockingDequeue >* taskQueue; - PIVector threadPool; + Dequeue_ taskQueue; + PIVector threadPool; + + template + PIThreadPoolExecutorTemplate(size_t corePoolSize, Function onBeforeStart) : isShutdown_(false) { + makePool(corePoolSize, onBeforeStart); + } + + void makePool(size_t corePoolSize, std::function onBeforeStart = [](Thread_*){}) { + for (size_t i = 0; i < corePoolSize; ++i) { + auto* thread = new Thread_([&, i](){ + auto runnable = taskQueue.poll(100); + if (runnable) { + runnable(); + } + if (isShutdown_ && taskQueue.size() == 0) threadPool[i]->stop(); + }); + threadPool.push_back(thread); + onBeforeStart(thread); + thread->start(); + } + } }; +typedef PIThreadPoolExecutorTemplate > > PIThreadPoolExecutor; + + #endif //PIP_TESTS_EXECUTOR_H