diff --git a/lib/concurrent/executor.cpp b/lib/concurrent/executor.cpp
deleted file mode 100644
index a1c80042..00000000
--- a/lib/concurrent/executor.cpp
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- PIP - Platform Independent Primitives
-
- Stephan Fomenko
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see .
-*/
-
-#include "executor.h"
-
-
-PIThreadPoolExecutor::PIThreadPoolExecutor(size_t corePoolSize, PIBlockingDequeue> *taskQueue_) : isShutdown_(false), taskQueue(taskQueue_) {
- for (size_t i = 0; i < corePoolSize; ++i) {
- PIThread * thread = new PIThread([&, i](){
- auto runnable = taskQueue->poll(100, std::function());
- if (runnable) {
- runnable();
- }
- if (isShutdown_ && taskQueue->size() == 0) threadPool[i]->stop();
- });
- threadPool.push_back(thread);
- thread->start();
- }
-}
-
-
-bool PIThreadPoolExecutor::awaitTermination(int timeoutMs) {
- PITimeMeasurer measurer;
- for (size_t i = 0; i < threadPool.size(); ++i) {
- int dif = timeoutMs - (int)measurer.elapsed_m();
- if (dif < 0) return false;
- if (!threadPool[i]->waitForFinish(dif)) return false;
- }
- return true;
-}
-
-
-void PIThreadPoolExecutor::shutdownNow() {
- isShutdown_ = true;
- for (size_t i = 0; i < threadPool.size(); ++i) threadPool[i]->stop();
-}
-
-
-PIThreadPoolExecutor::~PIThreadPoolExecutor() {
- shutdownNow();
- while (threadPool.size() > 0) delete threadPool.take_back();
- delete taskQueue;
-}
-
-
-void PIThreadPoolExecutor::execute(const std::function &runnable) {
- if (!isShutdown_) taskQueue->offer(runnable);
-}
-
-
-bool PIThreadPoolExecutor::isShutdown() const {
- return isShutdown_;
-}
-
-
-void PIThreadPoolExecutor::shutdown() {
- isShutdown_ = true;
-}
diff --git a/lib/concurrent/test/ExecutorIntegrationTest.cpp b/lib/concurrent/test/ExecutorIntegrationTest.cpp
index 099c5a8e..e2a40299 100644
--- a/lib/concurrent/test/ExecutorIntegrationTest.cpp
+++ b/lib/concurrent/test/ExecutorIntegrationTest.cpp
@@ -1,8 +1,7 @@
#include "gtest/gtest.h"
#include "executor.h"
#include "pimutex.h"
-
-const int WAIT_THREAD_TIME_MS = 30;
+#include "testutil.h"
TEST(ExcutorIntegrationTest, execute_is_runnable_invoke) {
PIMutex m;
@@ -14,11 +13,13 @@ TEST(ExcutorIntegrationTest, execute_is_runnable_invoke) {
m.unlock();
});
piMSleep(WAIT_THREAD_TIME_MS);
+ m.lock();
ASSERT_EQ(invokedRunnables, 1);
+ m.unlock();
}
TEST(ExcutorIntegrationTest, execute_is_not_execute_after_shutdown) {
- bool isRunnableInvoke = false;
+ volatile bool isRunnableInvoke = false;
PIThreadPoolExecutor executorService(1);
executorService.shutdown();
executorService.execute([&]() {
@@ -29,7 +30,7 @@ TEST(ExcutorIntegrationTest, execute_is_not_execute_after_shutdown) {
}
TEST(ExcutorIntegrationTest, execute_is_execute_before_shutdown) {
- bool isRunnableInvoke = false;
+ volatile bool isRunnableInvoke = false;
PIThreadPoolExecutor executorService(1);
executorService.execute([&]() {
piMSleep(WAIT_THREAD_TIME_MS);
diff --git a/lib/concurrent/test/ExecutorUnitTest.cpp b/lib/concurrent/test/ExecutorUnitTest.cpp
new file mode 100644
index 00000000..ac6adde3
--- /dev/null
+++ b/lib/concurrent/test/ExecutorUnitTest.cpp
@@ -0,0 +1,102 @@
+#include "gtest/gtest.h"
+#include "gmock/gmock.h"
+#include "executor.h"
+#include "testutil.h"
+
+using ::testing::_;
+using ::testing::SetArgReferee;
+using ::testing::DoAll;
+using ::testing::DeleteArg;
+using ::testing::Return;
+using ::testing::AtLeast;
+using ::testing::ByRef;
+using ::testing::Eq;
+using ::testing::Ge;
+using ::testing::Pointee;
+using ::testing::IsNull;
+using ::testing::NiceMock;
+
+typedef std::function VoidFunc;
+
+namespace std {
+ inline bool operator ==(const VoidFunc& s, const VoidFunc& v) {
+ // TODO VoidFunc operator ==
+ return true;
+ }
+}
+
+class MockThread {
+public:
+ std::function runnnable;
+
+ MockThread(std::function runnnable) : runnnable(runnnable) { }
+
+ MOCK_METHOD0(start, bool());
+ MOCK_METHOD0(stop, void());
+ MOCK_METHOD1(waitForStart, bool(int timeout_msecs));
+ MOCK_METHOD1(waitForFinish, bool(int timeout_msecs));
+};
+
+class MockDeque : public PIBlockingDequeue {
+public:
+ MOCK_METHOD1(offer, bool(const VoidFunc&));
+ MOCK_METHOD0(take, VoidFunc());
+ MOCK_METHOD1(poll, VoidFunc(int));
+ MOCK_METHOD0(capacity, size_t());
+ MOCK_METHOD0(remainingCapacity, size_t());
+};
+
+typedef PIThreadPoolExecutorTemplate, MockDeque> PIThreadPoolExecutorMoc_t;
+
+class PIThreadPoolExecutorMoc : public PIThreadPoolExecutorMoc_t {
+public:
+ explicit PIThreadPoolExecutorMoc(size_t corePoolSize) : PIThreadPoolExecutorMoc_t(corePoolSize) { }
+
+ template
+ explicit PIThreadPoolExecutorMoc(size_t corePoolSize, Function onBeforeStart) : PIThreadPoolExecutorMoc_t(corePoolSize, onBeforeStart) { }
+
+ PIVector*>* getThreadPool() { return &threadPool; }
+ bool isShutdown() { return isShutdown_; }
+ MockDeque* getTaskQueue() { return &taskQueue; }
+};
+
+TEST(ExecutorUnitTest, is_corePool_created) {
+ PIThreadPoolExecutorMoc executor(THREAD_COUNT);
+ ASSERT_EQ(THREAD_COUNT, executor.getThreadPool()->size());
+}
+
+TEST(ExecutorUnitTest, is_corePool_started) {
+ PIThreadPoolExecutorMoc executor(THREAD_COUNT, [](MockThread* thread){
+ EXPECT_CALL(*thread, start())
+ .WillOnce(Return(true));
+ });
+ EXPECT_EQ(THREAD_COUNT, executor.getThreadPool()->size());
+ executor.getThreadPool()->forEach([](MockThread* thread){
+ EXPECT_CALL(*thread, stop())
+ .WillOnce(Return());
+ });
+}
+
+TEST(ExecutorUnitTest, execute_is_added_to_taskQueue) {
+ VoidFunc voidFunc = [](){};
+ PIThreadPoolExecutorMoc executor(THREAD_COUNT);
+ EXPECT_CALL(*executor.getTaskQueue(), offer(Eq(voidFunc)))
+ .WillOnce(Return(true));
+ executor.execute(voidFunc);
+}
+
+TEST(ExecutorUnitTest, is_corePool_execute_queue_elements) {
+ bool is_executed = false;
+ PIThreadPoolExecutorMoc executor(1);
+ EXPECT_EQ(executor.getThreadPool()->size(), 1);
+ EXPECT_CALL(*executor.getTaskQueue(), poll(Ge(0)))
+ .WillOnce(Return([&](){ is_executed = true; }));
+ executor.getThreadPool()->at(0)->runnnable();
+ ASSERT_TRUE(is_executed);
+}
+/* FIXME
+TEST(ExecutorUnitTest, shutdown_is_stop_threads) {
+ PIThreadPoolExecutorMoc executor(THREAD_COUNT);
+ executor.shutdown();
+}
+*/
\ No newline at end of file
diff --git a/lib/concurrent/test/testutil.h b/lib/concurrent/test/testutil.h
index 063f6622..eb48c2c8 100644
--- a/lib/concurrent/test/testutil.h
+++ b/lib/concurrent/test/testutil.h
@@ -8,9 +8,9 @@
* Minimum wait thread start, switch context or another interthread communication action time. Increase it if tests
* write "Start thread timeout reach!" message. You can reduce it if you want increase test performance.
*/
-const int WAIT_THREAD_TIME_MS = 40;
+const int WAIT_THREAD_TIME_MS = 10;
-const int THREAD_COUNT = 5;
+const int THREAD_COUNT = 2;
class TestUtil: public PIObject {
PIOBJECT(TestUtil)
@@ -55,6 +55,7 @@ public:
return !isTimeout;
}
+
};
#endif //AWRCANFLASHER_TESTUTIL_H
diff --git a/lib/main/concurrent/executor.h b/lib/main/concurrent/executor.h
index 63a6135c..a4ccdaff 100644
--- a/lib/main/concurrent/executor.h
+++ b/lib/main/concurrent/executor.h
@@ -27,12 +27,18 @@
* @brief Thread pools address two different problems: they usually provide improved performance when executing large
* numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and
* managing the resources, including threads, consumed when executing a collection of tasks.
+ *
+ * TODO adapt documentation to template
*/
-class PIThreadPoolExecutor {
+template
+class PIThreadPoolExecutorTemplate {
public:
- explicit PIThreadPoolExecutor(size_t corePoolSize = 1, PIBlockingDequeue >* taskQueue_ = new PIBlockingDequeue >());
+ explicit PIThreadPoolExecutorTemplate(size_t corePoolSize = 1) : isShutdown_(false) { makePool(corePoolSize); }
- virtual ~PIThreadPoolExecutor();
+ virtual ~PIThreadPoolExecutorTemplate() {
+ shutdownNow();
+ while (threadPool.size() > 0) delete threadPool.take_back();
+ }
/**
* @brief Executes the given task sometime in the future. The task execute in an existing pooled thread. If the task
@@ -41,24 +47,65 @@ public:
*
* @param runnable not empty function for thread pool execution
*/
- void execute(const std::function& runnable);
-
- void shutdownNow();
+ void execute(const std::function &runnable) {
+ if (!isShutdown_) taskQueue.offer(runnable);
+ }
/**
* @brief Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be
* accepted. Invocation has no additional effect if already shut down. This method does not wait for previously
* submitted tasks to complete execution. Use awaitTermination to do that.
*/
- void shutdown();
+ void shutdown() {
+ isShutdown_ = true;
+ }
- bool isShutdown() const;
+ void shutdownNow() {
+ isShutdown_ = true;
+ for (size_t i = 0; i < threadPool.size(); ++i) threadPool[i]->stop();
+ }
- bool awaitTermination(int timeoutMs);
-private:
+ bool isShutdown() const {
+ return isShutdown_;
+ }
+
+ bool awaitTermination(int timeoutMs) {
+ PITimeMeasurer measurer;
+ for (size_t i = 0; i < threadPool.size(); ++i) {
+ int dif = timeoutMs - (int)measurer.elapsed_m();
+ if (dif < 0) return false;
+ if (!threadPool[i]->waitForFinish(dif)) return false;
+ }
+ return true;
+ }
+
+protected:
std::atomic_bool isShutdown_;
- PIBlockingDequeue >* taskQueue;
- PIVector threadPool;
+ Dequeue_ taskQueue;
+ PIVector threadPool;
+
+ template
+ PIThreadPoolExecutorTemplate(size_t corePoolSize, Function onBeforeStart) : isShutdown_(false) {
+ makePool(corePoolSize, onBeforeStart);
+ }
+
+ void makePool(size_t corePoolSize, std::function onBeforeStart = [](Thread_*){}) {
+ for (size_t i = 0; i < corePoolSize; ++i) {
+ auto* thread = new Thread_([&, i](){
+ auto runnable = taskQueue.poll(100);
+ if (runnable) {
+ runnable();
+ }
+ if (isShutdown_ && taskQueue.size() == 0) threadPool[i]->stop();
+ });
+ threadPool.push_back(thread);
+ onBeforeStart(thread);
+ thread->start();
+ }
+ }
};
+typedef PIThreadPoolExecutorTemplate > > PIThreadPoolExecutor;
+
+
#endif //PIP_TESTS_EXECUTOR_H