From 49dfc86d469fd8ae4869e12e973aa73ef0f91ac8 Mon Sep 17 00:00:00 2001 From: Stepan Fomenko Date: Fri, 28 Aug 2020 14:02:45 +0300 Subject: [PATCH] Init commit --- CMakeLists.txt | 53 +++ include/piblockingdequeue.h | 272 ++++++++++++++ include/piexecutor.h | 214 +++++++++++ readme.md | 56 +++ src/dummy.cpp | 0 test/CMakeLists.txt | 17 + test/include/testutil.h | 76 ++++ test/src/BlockingDequeueUnitTest.cpp | 510 +++++++++++++++++++++++++++ test/src/ExecutorIntegrationTest.cpp | 58 +++ test/src/ExecutorUnitTest.cpp | 132 +++++++ 10 files changed, 1388 insertions(+) create mode 100644 CMakeLists.txt create mode 100644 include/piblockingdequeue.h create mode 100644 include/piexecutor.h create mode 100644 readme.md create mode 100644 src/dummy.cpp create mode 100644 test/CMakeLists.txt create mode 100644 test/include/testutil.h create mode 100644 test/src/BlockingDequeueUnitTest.cpp create mode 100644 test/src/ExecutorIntegrationTest.cpp create mode 100644 test/src/ExecutorUnitTest.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..ee353a7 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,53 @@ +cmake_minimum_required(VERSION 3.13) +project(concurrent_lib VERSION 1.0 LANGUAGES CXX) +set(CMAKE_CXX_STANDARD 11) + +get_directory_property(IS_SUBPROJECT PARENT_DIRECTORY) + +option(CONCURRENT_TESTING "Enable build tests for concurrent lib" ON) +option(CONCURRENT_EXAMPLES "Enable build examples for concurrent lib" ON) + +add_compile_options( + -Werror + + -Wall + + -Wcast-align + -Wcast-qual + -Wconversion + -Wenum-compare + -Wfloat-equal + -Wnon-virtual-dtor + -Wold-style-cast + -Woverloaded-virtual + -Wredundant-decls + -Wsign-promo +) + +if(NOT CMAKE_CXX_EXTENSIONS) + set(CMAKE_CXX_EXTENSIONS OFF) +endif() + +file(GLOB SOURCES src/*.cpp) +add_library(concurrent ${SOURCES}) +target_include_directories(concurrent INTERFACE + $ + $ +) + +install(DIRECTORY include DESTINATION ${CMAKE_INSTALL_PREFIX}) +install(TARGETS concurrent EXPORT ConcurrentConfig) +install(EXPORT ConcurrentConfig DESTINATION lib/cmake/Concurrent) + +if(NOT CONCURRENT_TESTING) + message(STATUS "Concurrent tests is OFF") +elseif(IS_SUBPROJECT) + message(STATUS "Concurrent tests is OFF (lib is subproject)") +else() + add_subdirectory(test) + if (TARGET concurrent_test) + message(STATUS "Concurrent tests is ON") + else() + message(STATUS "Concurrent tests is OFF (GTest not found)") + endif() +endif() \ No newline at end of file diff --git a/include/piblockingdequeue.h b/include/piblockingdequeue.h new file mode 100644 index 0000000..6c24d35 --- /dev/null +++ b/include/piblockingdequeue.h @@ -0,0 +1,272 @@ +/* + PIP - Platform Independent Primitives + + Stephan Fomenko + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef PIBLOCKINGDEQUEUE_H +#define PIBLOCKINGDEQUEUE_H + +#include +#include + +/** + * @brief A Queue that supports operations that wait for the queue to become non-empty when retrieving an element, and + * wait for space to become available in the queue when storing an element. + */ +template class Queue_ = std::deque, typename ConditionVariable_ = std::condition_variable> +class PIBlockingDequeue { +public: + typedef Queue_ QueueType; + + /** + * @brief Constructor + */ + explicit PIBlockingDequeue(size_t capacity = SIZE_MAX) + : cond_var_add(new ConditionVariable_()), cond_var_rem(new ConditionVariable_()), max_size(capacity) { } + + /** + * @brief Copy constructor. Initialize queue with copy of other container elements. Not thread-safe for other queue. + */ + template::value, int>::type = 0> + explicit PIBlockingDequeue(const Iterable& other): PIBlockingDequeue() { + mutex.lock(); + for (const T& t : other) data_queue.push_back(t); + mutex.unlock(); + } + + /** + * @brief Thread-safe copy constructor. Initialize queue with copy of other queue elements. + */ + explicit PIBlockingDequeue(PIBlockingDequeue& other): PIBlockingDequeue() { + other.mutex.lock(); + mutex.lock(); + max_size = other.max_size; + data_queue = other.data_queue; + mutex.unlock(); + other.mutex.unlock(); + } + + ~PIBlockingDequeue() { + delete cond_var_add; + delete cond_var_rem; + } + + /** + * @brief Inserts the specified element into this queue, waiting if necessary for space to become available. + * + * @param v the element to add + */ + template + void put(Type && v) { + mutex.lock(); + cond_var_rem->wait(mutex, [&]() { return data_queue.size() < max_size; }); + data_queue.push_back(std::forward(v)); + mutex.unlock(); + cond_var_add->notify_one(); + } + + /** + * @brief Inserts the specified element at the end of this queue if it is possible to do so immediately without + * exceeding the queue's capacity, returning true upon success and false if this queue is full. + * + * @param v the element to add + * @return true if the element was added to this queue, else false + */ + template + bool offer(Type && v) { + mutex.lock(); + if (data_queue.size() >= max_size) { + mutex.unlock(); + return false; + } + data_queue.push_back(std::forward(v)); + mutex.unlock(); + cond_var_add->notify_one(); + return true; + } + + /** + * @brief Inserts the specified element into this queue, waiting up to the specified wait time if necessary for + * space to become available. + * + * @param v the element to add + * @param timeoutMs how long to wait before giving up, in milliseconds + * @return true if successful, or false if the specified waiting time elapses before space is available + */ + template + bool offer(Type && v, int timeoutMs) { + mutex.lock(); + bool isOk = cond_var_rem->wait_for(mutex, timeoutMs, [&]() { return data_queue.size() < max_size; } ); + if (isOk) data_queue.push_back(std::forward(v)); + mutex.unlock(); + if (isOk) cond_var_add->notify_one(); + return isOk; + } + + /** + * @brief Retrieves and removes the head of this queue, waiting if necessary until an element becomes available. + * + * @return the head of this queue + */ + T take() { + mutex.lock(); + cond_var_add->wait(mutex, [&]() { return data_queue.size() != 0; }); + T t = std::move(data_queue.front()); + data_queue.pop_front(); + mutex.unlock(); + cond_var_rem->notify_one(); + return t; + } + + /** + * @brief Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an + * element to become available. + * + * @param timeoutMs how long to wait before giving up, in milliseconds + * @param defaultVal value, which returns if the specified waiting time elapses before an element is available + * @param isOk flag, which indicates result of method execution. It will be set to false if timeout, or true if + * return value is retrieved value + * @return the head of this queue, or defaultVal if the specified waiting time elapses before an element is available + */ + template + T poll(int timeoutMs, Type && defaultVal = Type(), bool * isOk = nullptr) { + bool isNotEmpty; + T t; + { + std::unique_lock lc(mutex); + isNotEmpty = cond_var_add->wait_for(lc, std::chrono::milliseconds(timeoutMs), [&]() { return data_queue.size() != 0; }); + + if (isNotEmpty) { + t = std::move(data_queue.front()); + data_queue.pop_front(); + } else { + t = std::forward(defaultVal); + } + } + if (isNotEmpty) cond_var_rem->notify_one(); + if (isOk) *isOk = isNotEmpty; + return t; + } + + /** + * @brief Retrieves and removes the head of this queue and return it if queue not empty, otherwise return defaultVal. + * Do it immediately without waiting. + * + * @param defaultVal value, which returns if the specified waiting time elapses before an element is available + * @param isOk flag, which indicates result of method execution. It will be set to false if timeout, or true if + * return value is retrieved value + * @return the head of this queue, or defaultVal if the specified waiting time elapses before an element is available + */ + template + T poll(Type && defaultVal = Type(), bool * isOk = nullptr) { + T t; + mutex.lock(); + bool isNotEmpty = data_queue.size() != 0; + if (isNotEmpty) { + t = std::move(data_queue.front()); + data_queue.pop_front(); + } else { + t = std::forward(defaultVal); + } + mutex.unlock(); + if (isNotEmpty) cond_var_rem->notifyOne(); + if (isOk) *isOk = isNotEmpty; + return t; + } + + /** + * @brief Returns the number of elements that this queue can ideally (in the absence of memory or resource + * constraints) contains. This is always equal to the initial capacity of this queue less the current size of this queue. + * + * @return the capacity + */ + size_t capacity() { + size_t c; + mutex.lock(); + c = max_size; + mutex.unlock(); + return c; + } + + /** + * @brief Returns the number of additional elements that this queue can ideally (in the absence of memory or resource + * constraints) accept. This is always equal to the initial capacity of this queue less the current size of this queue. + * + * @return the remaining capacity + */ + size_t remainingCapacity() { + mutex.lock(); + size_t c = max_size - data_queue.size(); + mutex.unlock(); + return c; + } + + /** + * @brief Returns the number of elements in this collection. + */ + size_t size() { + mutex.lock(); + size_t s = data_queue.size(); + mutex.unlock(); + return s; + } + + /** + * @brief Removes all available elements from this queue and adds them to other given queue. + */ + template + size_t drainTo(Appendable& other, size_t maxCount = SIZE_MAX) { + mutex.lock(); + size_t count = maxCount > data_queue.size() ? data_queue.size() : maxCount; + for (size_t i = 0; i < count; ++i) { + other.push_back(std::move(data_queue.front())); + data_queue.pop_front(); + } + mutex.unlock(); + return count; + } + + /** + * @brief Removes all available elements from this queue and adds them to other given queue. + */ + size_t drainTo(PIBlockingDequeue& other, size_t maxCount = SIZE_MAX) { + mutex.lock(); + other.mutex.lock(); + size_t count = maxCount > data_queue.size() ? data_queue.size() : maxCount; + size_t otherRemainingCapacity = other.max_size - data_queue.size(); + if (count > otherRemainingCapacity) count = otherRemainingCapacity; + for (size_t i = 0; i < count; ++i) { + other.data_queue.push_back(std::move(data_queue.front())); + data_queue.pop_front(); + } + other.mutex.unlock(); + mutex.unlock(); + return count; + } + +protected: + std::mutex mutex; + // TODO change to type without point + ConditionVariable_ *cond_var_add, *cond_var_rem; + QueueType data_queue; + size_t max_size; + +}; + + +#endif // PIBLOCKINGDEQUEUE_H diff --git a/include/piexecutor.h b/include/piexecutor.h new file mode 100644 index 0000000..764d275 --- /dev/null +++ b/include/piexecutor.h @@ -0,0 +1,214 @@ +/* + PIP - Platform Independent Primitives + + Stephan Fomenko + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef PIEXECUTOR_H +#define PIEXECUTOR_H + +#include "piblockingdequeue.h" +#include +#include + +/** + * @brief Wrapper for custom invoke operator available function types. + * @note Source from: "Энтони Уильямс, Параллельное программирование на С++ в действии. Практика разработки многопоточных + * программ. Пер. с англ. Слинкин А. А. - M.: ДМК Пресс, 2012 - 672c.: ил." (page 387) + */ +class FunctionWrapper { + struct ImplBase { + virtual void call() = 0; + virtual ~ImplBase() = default; + }; + + std::unique_ptr impl; + + template + struct ImplType: ImplBase { + F f; + explicit ImplType(F&& f): f(std::forward(f)) {} + void call() final { f(); } + }; +public: + template::value> > + explicit FunctionWrapper(F&& f): impl(new ImplType(std::forward(f))) {} + + void operator()() { impl->call(); } + + explicit operator bool() const noexcept { return static_cast(impl); } + + FunctionWrapper() = default; + FunctionWrapper(FunctionWrapper&& other) noexcept : impl(std::move(other.impl)) {} + FunctionWrapper& operator=(FunctionWrapper&& other) noexcept { + impl = std::move(other.impl); + return *this; + } + + FunctionWrapper(const FunctionWrapper& other) = delete; + FunctionWrapper& operator=(const FunctionWrapper&) = delete; +}; + +template > +class PIThreadPoolExecutorTemplate { +protected: + enum thread_command { + run, + shutdown_c, + shutdown_now + }; + +public: + explicit PIThreadPoolExecutorTemplate(size_t corePoolSize = 1) : thread_command_(thread_command::run) { makePool(corePoolSize); } + + virtual ~PIThreadPoolExecutorTemplate() { + shutdownNow(); + awaitTermination(1000); + while (threadPool.size() > 0) { + auto thread = threadPool.back(); + threadPool.pop_back(); + delete thread; + } + } + + template + std::future::type> submit(FunctionType&& callable) { + typedef typename std::result_of::type ResultType; + + if (thread_command_ == thread_command::run) { + std::packaged_task callable_task(std::forward(callable)); + auto future = callable_task.get_future(); + FunctionWrapper functionWrapper(callable_task); + taskQueue.offer(std::move(functionWrapper)); + return future; + } else { + return std::future(); + } + } + + template + void execute(FunctionType&& runnable) { + if (thread_command_ == thread_command::run) { + FunctionWrapper function_wrapper(std::forward(runnable)); + taskQueue.offer(std::move(function_wrapper)); + } + } + + void shutdown() { + thread_command_ = thread_command::shutdown_c; + } + + void shutdownNow() { + thread_command_ = thread_command::shutdown_now; + } + + bool isShutdown() const { + return thread_command_; + } + + bool awaitTermination(int timeoutMs) { + using namespace std::chrono; + + auto start_time = high_resolution_clock::now(); + for (size_t i = 0; i < threadPool.size(); ++i) { + int dif = timeoutMs - static_cast(duration_cast(high_resolution_clock::now() - start_time).count()); + if (dif < 0) return false; + // TODO add wait with timeout + threadPool[i]->join(); +// if (!threadPool[i]->waitFinish(dif)) return false; + } + return true; + } + +protected: + std::atomic thread_command_; + Dequeue_ taskQueue; + std::vector threadPool; + + template + PIThreadPoolExecutorTemplate(size_t corePoolSize, Function&& onBeforeStart) : thread_command_(thread_command::run) { + makePool(corePoolSize, std::forward(onBeforeStart)); + } + + void makePool(size_t corePoolSize, std::function&& onBeforeStart = [](Thread_*){}) { + for (size_t i = 0; i < corePoolSize; ++i) { + auto* thread = new Thread_([&, i](){ + do { + auto runnable = taskQueue.poll(100); + if (runnable) { + runnable(); + } + } while (!thread_command_ || taskQueue.size() != 0); + }); + threadPool.push_back(thread); + onBeforeStart(thread); + } + } +}; + +typedef PIThreadPoolExecutorTemplate<> PIThreadPoolExecutor; + +#ifdef DOXYGEN +/** + * @brief Thread pools address two different problems: they usually provide improved performance when executing large + * numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and + * managing the resources, including threads, consumed when executing a collection of tasks. + */ +class PIThreadPoolExecutor { +public: + explicit PIThreadPoolExecutor(size_t corePoolSize); + + virtual ~PIThreadPoolExecutor(); + + /** + * @brief Submits a Runnable task for execution and returns a Future representing that task. The Future's get method + * will return null upon successful completion. + * + * @tparam FunctionType - custom type of function with operator() and return type + * @tparam R - derived from FunctionType return type + * + * @param callable - the task to submit + * @return a future representing pending completion of the task + */ + std::future submit(FunctionType&& callable); + + /** + * @brief Executes the given task sometime in the future. The task execute in an existing pooled thread. If the task + * cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been + * reached. + * + * @tparam FunctionType - custom type of function with operator() and return type + * + * @param runnable not empty function for thread pool execution + */ + void execute(FunctionType&& runnable); + + /** + * @brief Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be + * accepted. Invocation has no additional effect if already shut down. This method does not wait for previously + * submitted tasks to complete execution. Use awaitTermination to do that. + */ + void shutdown(); + + void shutdownNow(); + + bool isShutdown() const; + + bool awaitTermination(int timeoutMs); +}; +#endif //DOXYGEN + +#endif //PIEXECUTOR_H diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..d1530da --- /dev/null +++ b/readme.md @@ -0,0 +1,56 @@ +# Concurrent library + +## Requirements + +[GTest v1.10.0](https://github.com/google/googletest/tree/v1.10.x) - "googletest is a testing framework developed by the +Testing Technology team with Google's specific requirements and constraints in mind". + +## Options + +- `CONCURRENT_TESTING` - enable build tests +- `CONCURRENT_EXAMPLES`- enable build examples + +## Use from another project + +1 Clone project: + +```cmd +git clone https://git.shs.tools/zzuummaa/concurrent_lib.git +``` + +2 Generate cmake files: + +```cmd +cd concurrent +mkdir build +cd build +cmake -DCMAKE_BUILD_TYPE=Release .. +``` + +3 Build and install library (use shell with administrative privileges or sudo): + +```cmd +cmake --build . --target install +``` + +## Build tests + +1 Clone GTest: + +```cmd +git clone --branch v1.10.x https://github.com/google/googletest.git +``` + +2 Generate cmake files: + +```cmd +cd concurrent +mkdir build +cd build +cmake -DCMAKE_BUILD_TYPE=Release .. +``` +3 Build and install library (use shell with administrative privileges or sudo): + +```cmd +cmake --build . --target install +``` \ No newline at end of file diff --git a/src/dummy.cpp b/src/dummy.cpp new file mode 100644 index 0000000..e69de29 diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt new file mode 100644 index 0000000..64196ce --- /dev/null +++ b/test/CMakeLists.txt @@ -0,0 +1,17 @@ +project(concurrent_test) + +find_package(GTest 1.10.0 REQUIRED) + +if (GTest_FOUND) + file(GLOB TEST_SOURCES src/*.cpp) + add_executable(concurrent_test ${TEST_SOURCES}) + + # Disable for GTest build + target_compile_options(concurrent_test PRIVATE -Wno-sign-compare) + + target_include_directories(concurrent_test PUBLIC include) + target_link_libraries(concurrent_test GTest::gtest GTest::gtest_main GTest::gmock GTest::gmock concurrent) + add_test(test-1 concurrent_test) + + add_custom_target(check ALL COMMAND concurrent_test) +endif() diff --git a/test/include/testutil.h b/test/include/testutil.h new file mode 100644 index 0000000..f335930 --- /dev/null +++ b/test/include/testutil.h @@ -0,0 +1,76 @@ +#ifndef AWRCANFLASHER_TESTUTIL_H +#define AWRCANFLASHER_TESTUTIL_H + +#include +#include + +template +void print_type_info() { + std::cout << typeid(T).name() << " is a " + << (std::is_const::type>::value ? "const " : "") + << (std::is_lvalue_reference::value ? "lvalue" : "rvalue") + << " reference" << std::endl; +} + +/** + * Minimum wait thread start, switch context or another interthread communication action time. Increase it if tests + * write "Start thread timeout reach!" message. You can reduce it if you want increase test performance. + */ +const int WAIT_THREAD_TIME_MS = 30; + +const int THREAD_COUNT = 2; + +class TestUtil { +public: + double threadStartTime; + std::atomic_bool isRunning; + std::function adapterFunctionDefault; + + TestUtil() : isRunning(false) {} + + bool createThread(const std::function& fun = nullptr) { + std::function actualFun = fun == nullptr ? adapterFunctionDefault : fun; + + std::promise start_promise; + std::future start_future = start_promise.get_future(); + std::thread thread([this, &start_promise, actualFun](){ + isRunning = true; + start_promise.set_value(); + actualFun(); + }); + thread.detach(); + + auto status = start_future.wait_for(std::chrono::milliseconds(WAIT_THREAD_TIME_MS)); + if (status == std::future_status::timeout) { + std::cout << "Start thread timeout reach!" << std::endl; + } + start_future.get(); + return status == std::future_status::ready; + } + +// bool waitThread(PIThread* thread_, bool runningStatus = true) { +// PITimeMeasurer measurer; +// bool isTimeout = !thread_->waitForStart(WAIT_THREAD_TIME_MS); +// while (!isRunning) { +// isTimeout = WAIT_THREAD_TIME_MS <= measurer.elapsed_m(); +// if (isTimeout) break; +// piUSleep(100); +// } +// +// threadStartTime = measurer.elapsed_m(); +// +// if (isTimeout) piCout << "Start thread timeout reach!"; +// +// if (threadStartTime > 1) { +// piCout << "Start time" << threadStartTime << "ms"; +// } else if (threadStartTime > 0.001) { +// piCout << "Start time" << threadStartTime * 1000 << "mcs"; +// } else { +// piCout << "Start time" << threadStartTime * 1000 * 1000 << "ns"; +// } +// +// return !isTimeout; +// } +}; + +#endif //AWRCANFLASHER_TESTUTIL_H diff --git a/test/src/BlockingDequeueUnitTest.cpp b/test/src/BlockingDequeueUnitTest.cpp new file mode 100644 index 0000000..6a91d25 --- /dev/null +++ b/test/src/BlockingDequeueUnitTest.cpp @@ -0,0 +1,510 @@ +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "testutil.h" +#include "piblockingdequeue.h" + +using ::testing::_; +using ::testing::Return; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Matcher; +using ::testing::Expectation; +using ::testing::Sequence; +using ::testing::NiceMock; + +class MockConditionVar { +public: + bool isWaitCalled = false; + bool isWaitForCalled = false; + bool isTrueCondition = false; + int timeout = -1; + + MOCK_METHOD1(wait, void(std::mutex&)); + MOCK_METHOD2(wait, void(std::mutex&, const std::function&)); + MOCK_METHOD2(wait_for, bool(std::mutex&, int)); + MOCK_METHOD3(wait_for, bool(std::mutex&, int, const std::function&)); + MOCK_METHOD0(notify_one, void()); +}; + +struct QueueElement { + bool is_empty; + int value; + int copy_count; + + QueueElement(): is_empty(true), value(0), copy_count(0) { } + explicit QueueElement(int value): is_empty(false), value(value), copy_count(0) { } + + QueueElement(const QueueElement& other) { + this->is_empty = other.is_empty; + this->value = other.value; + this->copy_count = 0; + const_cast(other.copy_count)++; + } + QueueElement(QueueElement&& other) noexcept : QueueElement() { + std::swap(is_empty, other.is_empty); + std::swap(value, other.value); + std::swap(copy_count, other.copy_count); + } + + bool operator==(const QueueElement &rhs) const { + return is_empty == rhs.is_empty && + value == rhs.value; + } + + bool operator!=(const QueueElement &rhs) const { + return !(rhs == *this); + } + + friend std::ostream& operator<<(std::ostream& os, const QueueElement& el) { + return os << "{ is_empty:" << el.is_empty << ", value:" << el.value << ", copy_count:" << el.copy_count << " }"; + } +}; + +template +class MockDequeBase { +public: + MOCK_METHOD1_T(push_back_rval, void(T)); + MOCK_METHOD1_T(push_back, void(const T&)); + MOCK_METHOD0(size, size_t()); + MOCK_METHOD0_T(front, T()); + MOCK_METHOD0(pop_front, void()); + + void push_back(T&& t) { + push_back_rval(t); + } +}; + +template +class MockDeque: public NiceMock> {}; + +class PIBlockingDequeuePrepare: public PIBlockingDequeue> { +public: + typedef PIBlockingDequeue> SuperClass; + + explicit PIBlockingDequeuePrepare(size_t capacity = SIZE_MAX): SuperClass(capacity) { } + + template::value, int>::type = 0> + explicit PIBlockingDequeuePrepare(const Iterable& other): SuperClass(other) { } + + MockConditionVar* getCondVarAdd() { return this->cond_var_add; } + MockConditionVar* getCondVarRem() { return this->cond_var_rem; } + MockDeque& getQueue() { return this->data_queue; } + size_t getMaxSize() { return max_size; } +}; + +class BlockingDequeueUnitTest: public ::testing::Test { +public: + int timeout = 100; + size_t capacity; + PIBlockingDequeuePrepare dequeue; + QueueElement element; + + BlockingDequeueUnitTest(): capacity(1), dequeue(capacity), element(11) {} + + void offer2_is_wait_predicate(bool isCapacityReach); + void put_is_wait_predicate(bool isCapacityReach); + void take_is_wait_predicate(bool isEmpty); +}; + +TEST_F(BlockingDequeueUnitTest, construct_default_is_max_size_eq_size_max) { + PIBlockingDequeuePrepare dequeue; + ASSERT_EQ(dequeue.getMaxSize(), SIZE_MAX); +} + +TEST_F(BlockingDequeueUnitTest, construct_from_constant_is_max_size_eq_capacity) { + PIBlockingDequeuePrepare dequeue(2); + ASSERT_EQ(dequeue.getMaxSize(), 2); +} + +TEST_F(BlockingDequeueUnitTest, construct_from_capacity_is_max_size_eq_capacity) { + ASSERT_EQ(dequeue.getMaxSize(), capacity); +} + +TEST_F(BlockingDequeueUnitTest, construct_from_iterable) { + std::vector iterable; + iterable.emplace_back(11); + iterable.emplace_back(22); + PIBlockingDequeuePrepare dequeue(iterable); +} + +void BlockingDequeueUnitTest::put_is_wait_predicate(bool isCapacityReach) { + std::function conditionVarPredicate; + EXPECT_CALL(*dequeue.getCondVarRem(), wait(_, _)) + .WillOnce([&](std::mutex& m, const std::function& predicate){ conditionVarPredicate = predicate; }); + dequeue.put(element); + + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(isCapacityReach ? capacity : capacity - 1)); + ASSERT_EQ(conditionVarPredicate(), !isCapacityReach); +} + +TEST_F(BlockingDequeueUnitTest, put_is_wait_predicate_true) { + put_is_wait_predicate(false); +} + +TEST_F(BlockingDequeueUnitTest, put_is_wait_predicate_false_when_capacity_reach) { + put_is_wait_predicate(true); +} + +TEST_F(BlockingDequeueUnitTest, put_is_insert_by_copy) { + EXPECT_CALL(dequeue.getQueue(), push_back( Eq(element) )) + .WillOnce(Return()); + dequeue.put(element); +} + +TEST_F(BlockingDequeueUnitTest, put_is_insert_by_move) { + QueueElement copyElement = element; + EXPECT_CALL(dequeue.getQueue(), push_back_rval( Eq(element) )) + .WillOnce(Return()); + dequeue.put(std::move(copyElement)); +} + +TEST_F(BlockingDequeueUnitTest, put_is_notify_about_insert) { + EXPECT_CALL(*dequeue.getCondVarAdd(), notify_one) + .WillOnce(Return()); + dequeue.put(element); +} + +TEST_F(BlockingDequeueUnitTest, offer1_is_insert_by_copy) { + EXPECT_CALL(dequeue.getQueue(), push_back( Eq(element) )) + .WillOnce(Return()); + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(capacity - 1)); + dequeue.offer(element); +} + +TEST_F(BlockingDequeueUnitTest, offer1_is_insert_by_move) { + QueueElement copyElement = element; + EXPECT_CALL(dequeue.getQueue(), push_back_rval( Eq(element) )) + .WillOnce(Return()); + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(capacity - 1)); + dequeue.offer(std::move(copyElement)); +} + +TEST_F(BlockingDequeueUnitTest, offer1_is_not_insert_when_capacity_reach) { + EXPECT_CALL(dequeue.getQueue(), push_back(_)) + .Times(0); + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(capacity)); + dequeue.offer(element); +} + +TEST_F(BlockingDequeueUnitTest, offer1_is_true_when_insert) { + ON_CALL(dequeue.getQueue(), push_back(_)) + .WillByDefault(Return()); + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(capacity - 1)); + ASSERT_TRUE(dequeue.offer(element)); +} + +TEST_F(BlockingDequeueUnitTest, offer1_is_false_when_capacity_reach) { + ON_CALL(dequeue.getQueue(), push_back(_)) + .WillByDefault(Return()); + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(capacity)); + ASSERT_FALSE(dequeue.offer(element)); +} + +TEST_F(BlockingDequeueUnitTest, offer1_is_notify_about_insert) { + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(capacity - 1)); + EXPECT_CALL(*dequeue.getCondVarAdd(), notify_one) + .WillOnce(Return()); + dequeue.offer(element); +} + +TEST_F(BlockingDequeueUnitTest, offer1_is_not_notify_about_insert_when_capacity_reach) { + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(capacity)); + EXPECT_CALL(*dequeue.getCondVarAdd(), notify_one) + .Times(0); + dequeue.offer(element); +} + +void BlockingDequeueUnitTest::offer2_is_wait_predicate(bool isCapacityReach) { + std::function conditionVarPredicate; + EXPECT_CALL(*dequeue.getCondVarRem(), wait_for(_, Eq(timeout), _)) + .WillOnce([&](std::mutex& m, int timeout_, const std::function& predicate) { + conditionVarPredicate = predicate; + return isCapacityReach; + }); + dequeue.offer(element, timeout); + + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(isCapacityReach ? capacity : capacity - 1)); + ASSERT_EQ(conditionVarPredicate(), !isCapacityReach); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_wait_predicate_true) { + offer2_is_wait_predicate(false); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_wait_predicate_false_when_capacity_reach) { + offer2_is_wait_predicate(true); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_insert_by_copy) { + EXPECT_CALL(*dequeue.getCondVarRem(), wait_for(_, Eq(timeout), _)) + .WillOnce(Return(true)); + EXPECT_CALL(dequeue.getQueue(), push_back( Eq(element) )) + .WillOnce(Return()); + dequeue.offer(element, timeout); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_insert_by_move) { + QueueElement copyElement = element; + EXPECT_CALL(*dequeue.getCondVarRem(), wait_for(_, Eq(timeout), _)) + .WillOnce(Return(true)); + EXPECT_CALL(dequeue.getQueue(), push_back_rval( Eq(element) )) + .WillOnce(Return()); + dequeue.offer(std::move(copyElement), timeout); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_not_insert_when_timeout) { + EXPECT_CALL(*dequeue.getCondVarRem(), wait_for(_, Eq(timeout), _)) + .WillOnce(Return(false)); + EXPECT_CALL(dequeue.getQueue(), push_back(_)) + .Times(0); + dequeue.offer(element, timeout); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_true_when_insert) { + ON_CALL(*dequeue.getCondVarRem(), wait_for(_, _, _)) + .WillByDefault(Return(true)); + ASSERT_TRUE(dequeue.offer(element, timeout)); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_false_when_timeout) { + ON_CALL(*dequeue.getCondVarRem(), wait_for(_, _, _)) + .WillByDefault(Return(false)); + ASSERT_FALSE(dequeue.offer(element, timeout)); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_notify_about_insert) { + ON_CALL(*dequeue.getCondVarRem(), wait_for(_, _, _)) + .WillByDefault(Return(true)); + EXPECT_CALL(*dequeue.getCondVarAdd(), notify_one) + .WillOnce(Return()); + dequeue.offer(element, timeout); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_not_notify_about_insert_when_timeout) { + ON_CALL(*dequeue.getCondVarRem(), wait_for(_, _, _)) + .WillByDefault(Return(false)); + EXPECT_CALL(*dequeue.getCondVarAdd(), notify_one) + .Times(0); + dequeue.offer(element, timeout); +} + +void BlockingDequeueUnitTest::take_is_wait_predicate(bool isEmpty) { + std::function conditionVarPredicate; + EXPECT_CALL(*dequeue.getCondVarAdd(), wait(_, _)) + .WillOnce([&](std::mutex& m, const std::function& predicate) { conditionVarPredicate = predicate; }); + dequeue.take(); + + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(isEmpty ? 0 : 1)); + ASSERT_EQ(conditionVarPredicate(), !isEmpty); +} + +TEST_F(BlockingDequeueUnitTest, take_is_wait_predicate_true) { + take_is_wait_predicate(false); +} + +TEST_F(BlockingDequeueUnitTest, take_is_wait_predicate_false_when_queue_empty) { + take_is_wait_predicate(true); +} + +TEST_F(BlockingDequeueUnitTest, take_is_get_and_remove) { + Expectation front = EXPECT_CALL(dequeue.getQueue(), front()) + .WillOnce(Return(element)); + EXPECT_CALL(dequeue.getQueue(), pop_front()) + .After(front) + .WillOnce(Return()); + + QueueElement takenElement = dequeue.take(); + ASSERT_EQ(element, takenElement); +} + +TEST_F(BlockingDequeueUnitTest, take_is_notify_about_remove) { + EXPECT_CALL(*dequeue.getCondVarRem(), notify_one) + .WillOnce(Return()); + dequeue.take(); +} + +/* +// TODO change take_is_block_when_empty to prevent segfault +TEST(DISABLED_BlockingDequeueUnitTest, take_is_block_when_empty) { + size_t capacity = 1; + PIBlockingDequeuePrepare dequeue(capacity); + // May cause segfault because take front of empty queue + dequeue.take(); + EXPECT_TRUE(dequeue.getCondVarAdd()->isWaitCalled); + ASSERT_FALSE(dequeue.getCondVarAdd()->isTrueCondition); +} + +TEST(BlockingDequeueUnitTest, take_is_not_block_when_not_empty) { + size_t capacity = 1; + PIBlockingDequeuePrepare dequeue(capacity); + dequeue.offer(111); + dequeue.take(); + + EXPECT_TRUE(dequeue.getCondVarAdd()->isWaitCalled); + ASSERT_TRUE(dequeue.getCondVarAdd()->isTrueCondition); +} + +TEST(BlockingDequeueUnitTest, take_is_value_eq_to_offer_value) { + size_t capacity = 1; + PIBlockingDequeuePrepare dequeue(capacity); + + dequeue.offer(111); + ASSERT_EQ(dequeue.take(), 111); +} + +TEST(BlockingDequeueUnitTest, take_is_last) { + size_t capacity = 10; + PIBlockingDequeuePrepare dequeue(capacity); + EXPECT_TRUE(dequeue.offer(111)); + EXPECT_TRUE(dequeue.offer(222)); + ASSERT_EQ(dequeue.take(), 111); + ASSERT_EQ(dequeue.take(), 222); +} + +TEST(BlockingDequeueUnitTest, poll_is_not_block_when_empty) { + size_t capacity = 1; + bool isOk; + PIBlockingDequeuePrepare dequeue(capacity); + dequeue.poll(111, &isOk); + EXPECT_FALSE(dequeue.getCondVarAdd()->isWaitForCalled); +} + +TEST(BlockingDequeueUnitTest, poll_is_default_value_when_empty) { + size_t capacity = 1; + bool isOk; + PIBlockingDequeuePrepare dequeue(capacity); + ASSERT_EQ(dequeue.poll(111, &isOk), 111); +} + +TEST(BlockingDequeueUnitTest, poll_is_offer_value_when_not_empty) { + size_t capacity = 1; + bool isOk; + PIBlockingDequeuePrepare dequeue(capacity); + dequeue.offer(111); + ASSERT_EQ(dequeue.poll(-1, &isOk), 111); +} + +TEST(BlockingDequeueUnitTest, poll_timeouted_is_block_when_empty) { + size_t capacity = 1; + int timeout = 11; + PIBlockingDequeuePrepare dequeue(capacity); + dequeue.poll(timeout, 111); + EXPECT_TRUE(dequeue.getCondVarAdd()->isWaitForCalled); + EXPECT_EQ(timeout, dequeue.getCondVarAdd()->timeout); + ASSERT_FALSE(dequeue.getCondVarAdd()->isTrueCondition); +} + +TEST(BlockingDequeueUnitTest, poll_timeouted_is_default_value_when_empty) { + size_t capacity = 1; + int timeout = 11; + PIBlockingDequeuePrepare dequeue(capacity); + ASSERT_EQ(dequeue.poll(timeout, 111), 111); +} + +TEST(BlockingDequeueUnitTest, poll_timeouted_is_not_block_when_not_empty) { + size_t capacity = 1; + int timeout = 11; + PIBlockingDequeuePrepare dequeue(capacity); + dequeue.offer(111); + dequeue.poll(timeout, -1); + + EXPECT_TRUE(dequeue.getCondVarAdd()->isWaitForCalled); + ASSERT_TRUE(dequeue.getCondVarAdd()->isTrueCondition); +} + +TEST(BlockingDequeueUnitTest, poll_timeouted_is_offer_value_when_not_empty) { + size_t capacity = 1; + int timeout = 11; + PIBlockingDequeuePrepare dequeue(capacity); + dequeue.offer(111); + ASSERT_EQ(dequeue.poll(timeout, -1), 111); +} + +TEST(BlockingDequeueUnitTest, poll_timeouted_is_last) { + size_t capacity = 10; + PIBlockingDequeuePrepare dequeue(capacity); + dequeue.offer(111); + dequeue.offer(222); + ASSERT_EQ(dequeue.poll(10, -1), 111); + ASSERT_EQ(dequeue.poll(10, -1), 222); +} + +TEST(BlockingDequeueUnitTest, capacity_is_eq_constructor_capacity) { + size_t capacity = 10; + PIBlockingDequeuePrepare dequeue(capacity); + ASSERT_EQ(dequeue.capacity(), capacity); +} + +TEST(BlockingDequeueUnitTest, remainingCapacity_is_dif_of_capacity_and_size) { + size_t capacity = 2; + PIBlockingDequeuePrepare dequeue(capacity); + ASSERT_EQ(dequeue.remainingCapacity(), capacity); + dequeue.offer(111); + ASSERT_EQ(dequeue.remainingCapacity(), capacity - 1); +} + +TEST(BlockingDequeueUnitTest, remainingCapacity_is_zero_when_capacity_reach) { + size_t capacity = 1; + PIBlockingDequeuePrepare dequeue(capacity); + dequeue.offer(111); + dequeue.offer(111); + ASSERT_EQ(dequeue.remainingCapacity(), 0); +} + +TEST(BlockingDequeueUnitTest, size_is_eq_to_num_of_elements) { + size_t capacity = 1; + PIBlockingDequeuePrepare dequeue(capacity); + ASSERT_EQ(dequeue.size(), 0); + dequeue.offer(111); + ASSERT_EQ(dequeue.size(), 1); +} + +TEST(BlockingDequeueUnitTest, size_is_eq_to_capacity_when_capacity_reach) { + size_t capacity = 1; + PIBlockingDequeuePrepare dequeue(capacity); + dequeue.offer(111); + dequeue.offer(111); + ASSERT_EQ(dequeue.size(), capacity); +} + +TEST(BlockingDequeueUnitTest, drainTo_is_elements_moved) { + size_t capacity = 10; + std::deque refDeque; + for (size_t i = 0; i < capacity / 2; ++i) refDeque.push_back(i * 10); + PIBlockingDequeuePrepare blockingDequeue(refDeque); + PIBlockingDequeuePrepare::QueueType deque; + blockingDequeue.drainTo(deque); + ASSERT_EQ(blockingDequeue.size(), 0); + // FIXME +// ASSERT_TRUE(deque == refDeque); +} + +TEST(BlockingDequeueUnitTest, drainTo_is_ret_eq_to_size_when_all_moved) { + size_t capacity = 10; + std::deque refDeque; + for (size_t i = 0; i < capacity / 2; ++i) refDeque.push_back(i * 10); + PIBlockingDequeuePrepare blockingDequeue(refDeque); + PIBlockingDequeuePrepare::QueueType deque; + ASSERT_EQ(blockingDequeue.drainTo(deque), refDeque.size()); +} + +TEST(BlockingDequeueUnitTest, drainTo_is_ret_eq_to_maxCount) { + size_t capacity = 10; + std::deque refDeque; + for (size_t i = 0; i < capacity / 2; ++i) refDeque.push_back(i * 10); + PIBlockingDequeuePrepare blockingDequeue(refDeque); + PIBlockingDequeuePrepare::QueueType deque; + ASSERT_EQ(blockingDequeue.drainTo(deque, refDeque.size() - 1), refDeque.size() - 1); +} +*/ diff --git a/test/src/ExecutorIntegrationTest.cpp b/test/src/ExecutorIntegrationTest.cpp new file mode 100644 index 0000000..8a1b783 --- /dev/null +++ b/test/src/ExecutorIntegrationTest.cpp @@ -0,0 +1,58 @@ +#include "gtest/gtest.h" +#include "testutil.h" +#include "piexecutor.h" + +using namespace std; +using namespace chrono; + +TEST(ExcutorIntegrationTest, execute_is_runnable_invoke) { + std::mutex m; + int invokedRunnables = 0; + PIThreadPoolExecutor executorService(1); + executorService.execute([&]() { + m.lock(); + invokedRunnables++; + m.unlock(); + }); + this_thread::sleep_for(milliseconds(WAIT_THREAD_TIME_MS)); + m.lock(); + ASSERT_EQ(invokedRunnables, 1); + m.unlock(); +} + +TEST(ExcutorIntegrationTest, execute_is_not_execute_after_shutdown) { + volatile bool isRunnableInvoke = false; + PIThreadPoolExecutor executorService(1); + executorService.shutdown(); + executorService.execute([&]() { + isRunnableInvoke = true; + }); + this_thread::sleep_for(milliseconds(WAIT_THREAD_TIME_MS)); + ASSERT_FALSE(isRunnableInvoke); +} + +TEST(ExcutorIntegrationTest, execute_is_execute_before_shutdown) { + volatile bool isRunnableInvoke = false; + PIThreadPoolExecutor executorService(1); + executorService.execute([&]() { + this_thread::sleep_for(milliseconds(WAIT_THREAD_TIME_MS)); + isRunnableInvoke = true; + }); + executorService.shutdown(); + this_thread::sleep_for(milliseconds(2 * WAIT_THREAD_TIME_MS)); + ASSERT_TRUE(isRunnableInvoke); +} + +// FIXME +TEST(DISABLED_ExcutorIntegrationTest, execute_is_awaitTermination_wait) { + PIThreadPoolExecutor executorService(1); + executorService.execute([&]() { + this_thread::sleep_for(milliseconds(2 * WAIT_THREAD_TIME_MS)); + }); + executorService.shutdown(); + auto start_time = high_resolution_clock::now(); + ASSERT_TRUE(executorService.awaitTermination(3 * WAIT_THREAD_TIME_MS)); + double wait_time = static_cast(duration_cast(high_resolution_clock::now() - start_time).count()) / 1000.; + ASSERT_GE(wait_time, WAIT_THREAD_TIME_MS); + ASSERT_LE(wait_time, 4 * WAIT_THREAD_TIME_MS); +} diff --git a/test/src/ExecutorUnitTest.cpp b/test/src/ExecutorUnitTest.cpp new file mode 100644 index 0000000..dd0a79e --- /dev/null +++ b/test/src/ExecutorUnitTest.cpp @@ -0,0 +1,132 @@ +#include + +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "testutil.h" +#include "piexecutor.h" + +using ::testing::_; +using ::testing::SetArgReferee; +using ::testing::DoAll; +using ::testing::DeleteArg; +using ::testing::Return; +using ::testing::ByMove; +using ::testing::AtLeast; +using ::testing::ByRef; +using ::testing::Eq; +using ::testing::Ge; +using ::testing::Pointee; +using ::testing::IsNull; +using ::testing::NiceMock; + +typedef std::function VoidFunc; + +namespace std { + inline bool operator ==(const VoidFunc& s, const VoidFunc& v) { + // TODO VoidFunc operator == + return true; + } +} + +class MockThread { +public: + bool is_executed; + VoidFunc runnnable; + + explicit MockThread(VoidFunc runnnable) : is_executed(true), runnnable(std::move(runnnable)) { } + +// MOCK_METHOD0(stop, void()); +// MOCK_METHOD1(waitForStart, bool(int timeout_msecs)); + MOCK_METHOD0(join, void()); +}; + +class MockDeque : public PIBlockingDequeue { +public: + MOCK_METHOD1(offer, bool(const FunctionWrapper&)); + MOCK_METHOD0(take, FunctionWrapper()); + MOCK_METHOD1(poll, FunctionWrapper(int)); + MOCK_METHOD0(capacity, size_t()); + MOCK_METHOD0(remainingCapacity, size_t()); +}; + +typedef PIThreadPoolExecutorTemplate, MockDeque> PIThreadPoolExecutorMoc_t; + +class PIThreadPoolExecutorMoc : public PIThreadPoolExecutorMoc_t { +public: + explicit PIThreadPoolExecutorMoc(size_t corePoolSize) : PIThreadPoolExecutorMoc_t(corePoolSize) { } + + template + explicit PIThreadPoolExecutorMoc(size_t corePoolSize, Function onBeforeStart) : PIThreadPoolExecutorMoc_t(corePoolSize, onBeforeStart) { } + + std::vector*>* getThreadPool() { return &threadPool; } + bool isShutdown() { return thread_command_ != thread_command::run; } + MockDeque* getTaskQueue() { return &taskQueue; } +}; + +TEST(ExecutorUnitTest, is_corePool_created) { + PIThreadPoolExecutorMoc executor(THREAD_COUNT); + ASSERT_EQ(THREAD_COUNT, executor.getThreadPool()->size()); +} + +TEST(ExecutorUnitTest, is_corePool_started) { + PIThreadPoolExecutorMoc executor(THREAD_COUNT); + for (auto* thread : *executor.getThreadPool()) ASSERT_TRUE(thread->is_executed); +} + +TEST(ExecutorUnitTest, submit_is_added_to_taskQueue) { + VoidFunc voidFunc = [](){}; + PIThreadPoolExecutorMoc executor(THREAD_COUNT); + // TODO add check of offered + EXPECT_CALL(*executor.getTaskQueue(), offer) + .WillOnce(Return(true)); + executor.submit(voidFunc); +} + +TEST(ExecutorUnitTest, submit_is_return_valid_future) { + VoidFunc voidFunc = [](){}; + PIThreadPoolExecutorMoc executor(THREAD_COUNT); + // TODO add check of offered + EXPECT_CALL(*executor.getTaskQueue(), offer) + .WillOnce(Return(true)); + auto future = executor.submit(voidFunc); + EXPECT_TRUE(future.valid()); +} + +TEST(ExecutorUnitTest, execute_is_added_to_taskQueue) { + VoidFunc voidFunc = [](){}; + PIThreadPoolExecutorMoc executor(THREAD_COUNT); + // TODO add check of offered + EXPECT_CALL(*executor.getTaskQueue(), offer) + .WillOnce(Return(true)); + executor.execute(voidFunc); +} + +// TODO fix +TEST(DISABLED_ExecutorUnitTest, is_corePool_execute_queue_elements) { + bool is_executed = false; + PIThreadPoolExecutorMoc executor(1); + EXPECT_EQ(executor.getThreadPool()->size(), 1); + EXPECT_CALL(*executor.getTaskQueue(), poll(Ge(0))) + .WillOnce([&is_executed](int){ + return FunctionWrapper([&is_executed](){ is_executed = true; }); + }); + executor.getThreadPool()->at(0)->runnnable(); + ASSERT_TRUE(is_executed); +} + +// FIXME +TEST(DISABLED_ExecutorUnitTest, shutdown_is_stop_threads) { + // Exclude stop calls when executor deleting + auto* executor = new PIThreadPoolExecutorMoc(THREAD_COUNT, [](MockThread* thread){ + testing::Mock::AllowLeak(thread); + EXPECT_CALL(*thread, join()) + .WillOnce(Return()); + }); + testing::Mock::AllowLeak(executor); + testing::Mock::AllowLeak(executor->getTaskQueue()); + + EXPECT_CALL(*executor->getTaskQueue(), poll(Ge(0))) + .WillRepeatedly([](int){ return FunctionWrapper(); }); + executor->shutdown(); + for (auto* thread : *executor->getThreadPool()) thread->runnnable(); +}