diff --git a/CMakeLists.txt b/CMakeLists.txt index f0a9b22f..6f21bfe6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,7 +41,8 @@ set(PIP_SRC_USB "src_usb") set(PIP_SRC_FFTW "src_fftw") set(PIP_SRC_OPENCL "src_opencl") set(PIP_SRC_IO_UTILS "src_io_utils") -set(PIP_SRC_DIRS "src_main" "src_crypt" "src_compress" "src_usb" "src_fftw" "src_opencl" "src_io_utils") +set(PIP_SRC_CONCURRENT "src_concurrent") +set(PIP_SRC_DIRS "src_main" "src_crypt" "src_compress" "src_usb" "src_fftw" "src_opencl" "src_io_utils" "src_concurrent") set(PIP_LIBS_TARGETS pip) set(LIBS_MAIN) set(LIBS_STATUS) @@ -129,7 +130,7 @@ get_filename_component(C_COMPILER "${CMAKE_C_COMPILER}" NAME) # Sources # Main lib -set(PIP_FOLDERS "." "core" "containers" "thread" "system" "io_devices" "io_utils" "console" "math" "code" "geo" "resources" "opencl" "crypt" "introspection") +set(PIP_FOLDERS "." "core" "containers" "thread" "system" "io_devices" "io_utils" "console" "math" "code" "geo" "resources" "opencl" "crypt" "introspection" "concurrent") if(PIP_FREERTOS) #list(REMOVE_ITEM PIP_FOLDERS "console") #include_directories("${PIP_SRC_MAIN}/console") @@ -159,6 +160,8 @@ gather_src("${PIP_SRC_OPENCL}" CPP_LIB_OPENCL HDRS PHDRS) # IO Utils lib gather_src("${PIP_SRC_IO_UTILS}" CPP_LIB_IO_UTILS HDRS PHDRS) +gather_src("${PIP_SRC_CONCURRENT}" CPP_LIB_CONCURRENT HDRS PHDRS) + if(PIP_FREERTOS) add_definitions(-DPIP_FREERTOS) @@ -472,12 +475,21 @@ if (NOT PIP_FREERTOS) target_link_libraries(pip_io_utils ${IO_UTILS_LIBS}) list(APPEND PIP_LIBS_TARGETS pip_io_utils) + set(CONCURRENT_LIBS pip) + add_library(pip_concurrent ${PIP_LIB_TYPE} ${CPP_LIB_CONCURRENT}) + target_link_libraries(pip_concurrent ${CONCURRENT_LIBS}) + list(APPEND PIP_LIBS_TARGETS pip_concurrent) # Test program if(PIP_UTILS) add_executable(pip_test "main.cpp") target_link_libraries(pip_test pip) endif() + + # Enable build tests for module concurrent + if(CONCURRENT_TESTING) + add_subdirectory(src_concurrent/test) + endif() else(NOT PIP_FREERTOS) message(STATUS "Building PIP with crypt support") @@ -525,7 +537,7 @@ if(LIB) #add_custom_target(pip_pch ALL COMMAND ${CMAKE_CXX_COMPILER} -O2 -fPIC -g3 ${CMAKE_INSTALL_PREFIX}/include/pip/pip.h DEPENDS pip SOURCES ${HDRS}) #list(APPEND HDRS "pip.h.gch") file(GLOB CMAKES "*.cmake") - install(FILES ${CMAKES} DESTINATION ${CMAKE_ROOT}/Modules) +# install(FILES ${CMAKES} DESTINATION ${CMAKE_ROOT}/Modules) else() if(NOT PIP_FREERTOS) install(TARGETS ${PIP_LIBS_TARGETS} DESTINATION bin) diff --git a/src_concurrent/executor.cpp b/src_concurrent/executor.cpp new file mode 100644 index 00000000..82df5b72 --- /dev/null +++ b/src_concurrent/executor.cpp @@ -0,0 +1,50 @@ +// +// Created by fomenko on 23.09.2019. +// + +#include "executor.h" + +PIThreadPoolExecutor::PIThreadPoolExecutor(size_t corePoolSize, PIBlockingDequeue> *taskQueue_, + PIThreadFactory *threadFactory) : isShutdown_(false), taskQueue(taskQueue_), threadFactory(threadFactory) { + for (size_t i = 0; i < corePoolSize; ++i) { + AbstractThread* thread = threadFactory->newThread([&, i](){ + auto runnable = taskQueue->poll(100, std::function()); + if (runnable) { + runnable(); + } else if (isShutdown_) threadPool[i]->stop(); + }); + threadPool.push_back(thread); + thread->start(); + } +} + +bool PIThreadPoolExecutor::awaitTermination(int timeoutMs) { + PITimeMeasurer measurer; + for (size_t i = 0; i < threadPool.size(); ++i) { + int dif = timeoutMs - (int)measurer.elapsed_m(); + if (dif < 0) return false; + if (!threadPool[i]->waitForFinish(dif)) return false; + } + return true; +} + +void PIThreadPoolExecutor::shutdownNow() { + isShutdown_ = true; + for (size_t i = 0; i < threadPool.size(); ++i) threadPool[i]->stop(); +} + +PIThreadPoolExecutor::~PIThreadPoolExecutor() { + shutdownNow(); + taskQueue->getConditionVar()->notifyAll(); + while (threadPool.size() > 0) delete threadPool.take_back(); + delete threadFactory; + delete taskQueue; +} + +void PIThreadPoolExecutor::execute(const std::function &runnable) { + if (!isShutdown_) taskQueue->offer(runnable); +} + +volatile bool PIThreadPoolExecutor::isShutdown() const { + return isShutdown_; +} diff --git a/src_concurrent/piconditionlock.cpp b/src_concurrent/piconditionlock.cpp new file mode 100644 index 00000000..5c1f4ec7 --- /dev/null +++ b/src_concurrent/piconditionlock.cpp @@ -0,0 +1,81 @@ +// +// Created by fomenko on 25.09.2019. +// + +#include "piconditionlock.h" +#ifdef WINDOWS +#include "synchapi.h" +#else +#include "pthread.h" +#endif + +PRIVATE_DEFINITION_START(PIConditionLock) +#ifdef WINDOWS + CRITICAL_SECTION +#else + pthread_mutex_t +#endif + nativeHandle; +PRIVATE_DEFINITION_END(PIConditionLock) + +#ifdef WINDOWS +PIConditionLock::PIConditionLock() { + InitializeCriticalSection(&PRIVATE->nativeHandle); +} + +PIConditionLock::~PIConditionLock() { + DeleteCriticalSection(&PRIVATE->nativeHandle); +} + +void PIConditionLock::lock() { + EnterCriticalSection(&PRIVATE->nativeHandle); +} + +void PIConditionLock::unlock() { + LeaveCriticalSection(&PRIVATE->nativeHandle); +} + +void *PIConditionLock::handle() { + return &PRIVATE->nativeHandle; +} + +bool PIConditionLock::tryLock() { + return TryEnterCriticalSection(&PRIVATE->nativeHandle) != 0; +} + +#else + +PIConditionLock::PIConditionLock() { + pthread_mutexattr_t attr; + memset(&attr, 0, sizeof(attr)); + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); + memset(&(PRIVATE->nativeHandle), 0, sizeof(PRIVATE->nativeHandle)); + pthread_mutex_init(&(PRIVATE->nativeHandle), &attr); + pthread_mutexattr_destroy(&attr); +} + +PIConditionLock::~PIConditionLock() { + pthread_mutex_destroy(&(PRIVATE->nativeHandle)); +} + +void PIConditionLock::lock() { + pthread_mutex_lock(&(PRIVATE->nativeHandle)); +} + +void PIConditionLock::unlock() { + pthread_mutex_unlock(&(PRIVATE->nativeHandle)); +} + +void *PIConditionLock::handle() { + return &PRIVATE->nativeHandle; +} + +bool PIConditionLock::tryLock() { + return (pthread_mutex_trylock(&(PRIVATE->nativeHandle)) == 0);; +} +#endif + + + + diff --git a/src_concurrent/piconditionvar.cpp b/src_concurrent/piconditionvar.cpp new file mode 100644 index 00000000..450eb460 --- /dev/null +++ b/src_concurrent/piconditionvar.cpp @@ -0,0 +1,129 @@ +// +// Created by fomenko on 20.09.2019. +// + +#include "piplatform.h" +#ifdef WINDOWS +#define _WIN32_WINNT 0x0600 +#include "synchapi.h" +#include +#include +#else + +#endif + +#include "piconditionvar.h" +#include "pithread.h" +#include "pitime.h" + +PRIVATE_DEFINITION_START(PIConditionVariable) +#ifdef WINDOWS + CONDITION_VARIABLE nativeHandle; +#else + pthread_cond_t nativeHandle; + PIConditionLock* currentLock; +#endif + bool isDestroying; +PRIVATE_DEFINITION_END(PIConditionVariable) + +PIConditionVariable::PIConditionVariable() { +#ifdef WINDOWS + InitializeConditionVariable(&PRIVATE->nativeHandle); +#else + PRIVATE->isDestroying = false; + PRIVATE->currentLock = nullptr; + memset(&(PRIVATE->nativeHandle), 0, sizeof(PRIVATE->nativeHandle)); + pthread_cond_init(&PRIVATE->nativeHandle, NULL); +#endif +} + +PIConditionVariable::~PIConditionVariable() { +#ifdef WINDOWS +#else + pthread_cond_destroy(&PRIVATE->nativeHandle); +#endif + +} + +void PIConditionVariable::wait(PIConditionLock& lk) { +#ifdef WINDOWS + SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), INFINITE); +#else + pthread_cond_wait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle()); +#endif +} + +void PIConditionVariable::wait(PIConditionLock& lk, const std::function& condition) { + bool isCondition; + while (true) { + isCondition = condition(); + if (isCondition) break; +#ifdef WINDOWS + SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), INFINITE); +#else + pthread_cond_wait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle()); +#endif + if (PRIVATE->isDestroying) return; + } +} + +bool PIConditionVariable::waitFor(PIConditionLock &lk, int timeoutMs) { + bool isTimeout; +#ifdef WINDOWS + isTimeout = SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), timeoutMs) != 0; +#else + timespec abstime = {.tv_sec = timeoutMs / 1000, .tv_nsec = timeoutMs * 1000 * 1000}; + isTimeout = pthread_cond_timedwait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle(), &abstime) == 0; +#endif + if (PRIVATE->isDestroying) return false; + return isTimeout; +} + +bool PIConditionVariable::waitFor(PIConditionLock& lk, int timeoutMs, const std::function &condition) { + bool isCondition; + PITimeMeasurer measurer; + while (true) { + isCondition = condition(); + if (isCondition) break; +#ifdef WINDOWS + WINBOOL isTimeout = SleepConditionVariableCS( + &PRIVATE->nativeHandle, + (PCRITICAL_SECTION)lk.handle(), + timeoutMs - (int)measurer.elapsed_m()); + if (isTimeout == 0) return false; +#else + int timeoutCurr = timeoutMs - (int)measurer.elapsed_m(); + timespec abstime = {.tv_sec = timeoutCurr / 1000, .tv_nsec = timeoutCurr * 1000 * 1000}; + bool isTimeout = pthread_cond_timedwait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle(), &abstime) == 0; + if (isTimeout) return false; +#endif + if (PRIVATE->isDestroying) return false; + } + return true; +} + + +void PIConditionVariable::notifyOne() { +#ifdef WINDOWS + WakeConditionVariable(&PRIVATE->nativeHandle); +#else + pthread_cond_signal(&PRIVATE->nativeHandle); +#endif +} +void PIConditionVariable::notifyAll() { +#ifdef WINDOWS + WakeAllConditionVariable(&PRIVATE->nativeHandle); +#else + pthread_cond_broadcast(&PRIVATE->nativeHandle); +#endif +} + +void StdFunctionThreadFuncAdapter::threadFuncStdFunctionAdapter(void *it) { + auto consumer = (StdFunctionThreadFuncAdapter*)it; + consumer->fun(); +} + +void StdFunctionThreadFuncAdapter::registerToInvoke(PIThread *thread) { + thread->setData(data()); + thread->setSlot((ThreadFunc) threadFunc()); +} diff --git a/src_concurrent/test/BlockingDequeueUnitTest.cpp b/src_concurrent/test/BlockingDequeueUnitTest.cpp new file mode 100644 index 00000000..3b1b9365 --- /dev/null +++ b/src_concurrent/test/BlockingDequeueUnitTest.cpp @@ -0,0 +1,211 @@ +// +// Created by fomenko on 23.09.2019. +// + +#include "gtest/gtest.h" +#include "piblockingdequeue.h" + +class MockConditionVar: public PIConditionVariable { +public: + bool isWaitCalled = false; + bool isWaitForCalled = false; + bool isTrueCondition = false; + int timeout = -1; + + void wait(PIConditionLock& lk) override { + isWaitCalled = true; + } + + void wait(PIConditionLock& lk, const std::function& condition) override { + isWaitCalled = true; + lk.lock(); + isTrueCondition = condition(); + lk.unlock(); + } + + bool waitFor(PIConditionLock& lk, int timeoutMs) override { + isWaitForCalled = true; + timeout = timeoutMs; + return false; + } + + bool waitFor(PIConditionLock& lk, int timeoutMs, const std::function& condition) override { + isWaitForCalled = true; + lk.lock(); + isTrueCondition = condition(); + timeout = timeoutMs; + lk.unlock(); + return isTrueCondition; + } +}; + +TEST(BlockingDequeueUnitTest, offer_is_true_before_capacity_reach) { + size_t capacity = 1; + PIBlockingDequeue dequeue(capacity); + ASSERT_TRUE(dequeue.offer(10)); +} + +TEST(BlockingDequeueUnitTest, offer_is_false_when_capacity_reach) { + size_t capacity = 1; + PIBlockingDequeue dequeue(capacity); + dequeue.offer(11); + ASSERT_FALSE(dequeue.offer(10)); +} + +// TODO change take_is_block_when_empty to prevent segfault +TEST(DISABLED_BlockingDequeueUnitTest, take_is_block_when_empty) { + size_t capacity = 1; + auto conditionVar = new MockConditionVar(); + PIBlockingDequeue dequeue(capacity, conditionVar); + // May cause segfault because take front of empty queue + dequeue.take(); + EXPECT_TRUE(conditionVar->isWaitCalled); + ASSERT_FALSE(conditionVar->isTrueCondition); +} + +TEST(BlockingDequeueUnitTest, take_is_not_block_when_not_empty) { + size_t capacity = 1; + auto conditionVar = new MockConditionVar(); + PIBlockingDequeue dequeue(capacity, conditionVar); + dequeue.offer(111); + dequeue.take(); + + EXPECT_TRUE(conditionVar->isWaitCalled); + ASSERT_TRUE(conditionVar->isTrueCondition); +} + +TEST(BlockingDequeueUnitTest, take_is_value_eq_to_offer_value) { + size_t capacity = 1; + auto conditionVar = new MockConditionVar(); + PIBlockingDequeue dequeue(capacity, conditionVar); + + dequeue.offer(111); + ASSERT_EQ(dequeue.take(), 111); +} + +TEST(BlockingDequeueUnitTest, take_is_last) { + size_t capacity = 10; + auto conditionVar = new MockConditionVar(); + PIBlockingDequeue dequeue(capacity, conditionVar); + EXPECT_TRUE(dequeue.offer(111)); + EXPECT_TRUE(dequeue.offer(222)); + ASSERT_EQ(dequeue.take(), 111); + ASSERT_EQ(dequeue.take(), 222); +} + +TEST(BlockingDequeueUnitTest, poll_is_block_when_empty) { + size_t capacity = 1; + int timeout = 11; + auto conditionVar = new MockConditionVar(); + PIBlockingDequeue dequeue(capacity, conditionVar); + dequeue.poll(timeout, 111); + EXPECT_TRUE(conditionVar->isWaitForCalled); + EXPECT_EQ(timeout, conditionVar->timeout); + ASSERT_FALSE(conditionVar->isTrueCondition); +} + +TEST(BlockingDequeueUnitTest, poll_is_default_value_when_empty) { + size_t capacity = 1; + int timeout = 11; + auto conditionVar = new MockConditionVar(); + PIBlockingDequeue dequeue(capacity, conditionVar); + ASSERT_EQ(dequeue.poll(timeout, 111), 111); +} + +TEST(BlockingDequeueUnitTest, poll_is_not_block_when_not_empty) { + size_t capacity = 1; + int timeout = 11; + auto conditionVar = new MockConditionVar(); + PIBlockingDequeue dequeue(capacity, conditionVar); + dequeue.offer(111); + dequeue.poll(timeout, -1); + + EXPECT_TRUE(conditionVar->isWaitForCalled); + ASSERT_TRUE(conditionVar->isTrueCondition); +} + +TEST(BlockingDequeueUnitTest, poll_is_offer_value_when_not_empty) { + size_t capacity = 1; + int timeout = 11; + auto conditionVar = new MockConditionVar(); + PIBlockingDequeue dequeue(capacity, conditionVar); + dequeue.offer(111); + ASSERT_EQ(dequeue.poll(timeout, -1), 111); +} + +TEST(BlockingDequeueUnitTest, poll_is_last) { + size_t capacity = 10; + auto conditionVar = new MockConditionVar(); + PIBlockingDequeue dequeue(capacity, conditionVar); + dequeue.offer(111); + dequeue.offer(222); + ASSERT_EQ(dequeue.poll(10, -1), 111); + ASSERT_EQ(dequeue.poll(10, -1), 222); +} + +TEST(BlockingDequeueUnitTest, capacity_is_eq_constructor_capacity) { + size_t capacity = 10; + PIBlockingDequeue dequeue(capacity); + ASSERT_EQ(dequeue.capacity(), capacity); +} + +TEST(BlockingDequeueUnitTest, remainingCapacity_is_dif_of_capacity_and_size) { + size_t capacity = 2; + PIBlockingDequeue dequeue(capacity); + ASSERT_EQ(dequeue.remainingCapacity(), capacity); + dequeue.offer(111); + ASSERT_EQ(dequeue.remainingCapacity(), capacity - 1); +} + +TEST(BlockingDequeueUnitTest, remainingCapacity_is_zero_when_capacity_reach) { + size_t capacity = 1; + PIBlockingDequeue dequeue(capacity); + dequeue.offer(111); + dequeue.offer(111); + ASSERT_EQ(dequeue.remainingCapacity(), 0); +} + +TEST(BlockingDequeueUnitTest, size_is_eq_to_num_of_elements) { + size_t capacity = 1; + PIBlockingDequeue dequeue(capacity); + ASSERT_EQ(dequeue.size(), 0); + dequeue.offer(111); + ASSERT_EQ(dequeue.size(), 1); +} + +TEST(BlockingDequeueUnitTest, size_is_eq_to_capacity_when_capacity_reach) { + size_t capacity = 1; + PIBlockingDequeue dequeue(capacity); + dequeue.offer(111); + dequeue.offer(111); + ASSERT_EQ(dequeue.size(), capacity); +} + +TEST(BlockingDequeueUnitTest, drainTo_is_elements_moved) { + size_t capacity = 10; + PIDeque refDeque; + for (size_t i = 0; i < capacity / 2; ++i) refDeque.push_back(i * 10); + PIBlockingDequeue blockingDequeue(refDeque); + PIDeque deque; + blockingDequeue.drainTo(deque); + ASSERT_EQ(blockingDequeue.size(), 0); + ASSERT_TRUE(deque == refDeque); +} + +TEST(BlockingDequeueUnitTest, drainTo_is_ret_eq_to_size_when_all_moved) { + size_t capacity = 10; + PIDeque refDeque; + for (size_t i = 0; i < capacity / 2; ++i) refDeque.push_back(i * 10); + PIBlockingDequeue blockingDequeue(refDeque); + PIDeque deque; + ASSERT_EQ(blockingDequeue.drainTo(deque), refDeque.size()); +} + +TEST(BlockingDequeueUnitTest, drainTo_is_ret_eq_to_maxCount) { + size_t capacity = 10; + PIDeque refDeque; + for (size_t i = 0; i < capacity / 2; ++i) refDeque.push_back(i * 10); + PIBlockingDequeue blockingDequeue(refDeque); + PIDeque deque; + ASSERT_EQ(blockingDequeue.drainTo(deque, refDeque.size() - 1), refDeque.size() - 1); +} diff --git a/src_concurrent/test/CMakeLists.txt b/src_concurrent/test/CMakeLists.txt new file mode 100644 index 00000000..3a120782 --- /dev/null +++ b/src_concurrent/test/CMakeLists.txt @@ -0,0 +1,45 @@ +project(concurrent_test) +# Download and unpack googletest at configure time +configure_file(CMakeLists.txt.in googletest-download/CMakeLists.txt) +execute_process(COMMAND ${CMAKE_COMMAND} -G "${CMAKE_GENERATOR}" . + RESULT_VARIABLE result + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/googletest-download ) +if(result) + message(FATAL_ERROR "CMake step for googletest failed: ${result}") +endif() +execute_process(COMMAND ${CMAKE_COMMAND} --build . + RESULT_VARIABLE result + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/googletest-download ) +if(result) + message(FATAL_ERROR "Build step for googletest failed: ${result}") +endif() + +# Prevent overriding the parent project's compiler/linker +# settings on Windows +set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) + +# Add googletest directly to our build. This defines +# the gtest and gtest_main targets. +add_subdirectory(${CMAKE_CURRENT_BINARY_DIR}/googletest-src + ${CMAKE_CURRENT_BINARY_DIR}/googletest-build + EXCLUDE_FROM_ALL) + +# The gtest/gtest_main targets carry header search path +# dependencies automatically when using CMake 2.8.11 or +# later. Otherwise we have to add them here ourselves. +if (CMAKE_VERSION VERSION_LESS 2.8.11) + include_directories("${gtest_SOURCE_DIR}/include") +endif() + +file(GLOB_RECURSE CPPS "*.cpp") +#find_package(GTest REQUIRED) +#message(STATUS "GTEST_INCLUDES: ${GTEST_INCLUDES}") +include_directories(${PROJECT_BINARY_DIR}) + +add_executable(${PROJECT_NAME} ${CPPS}) +target_link_libraries(${PROJECT_NAME} gtest_main gmock_main pip_concurrent) +add_test(NAME ${PROJECT_NAME}_test COMMAND tests) + +ADD_CUSTOM_COMMAND(TARGET ${PROJECT_NAME} POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy $ ${CMAKE_CURRENT_BINARY_DIR}) +add_custom_target(${PROJECT_NAME}_perform ALL COMMAND ${PROJECT_NAME}) \ No newline at end of file diff --git a/src_concurrent/test/CMakeLists.txt.in b/src_concurrent/test/CMakeLists.txt.in new file mode 100644 index 00000000..25794cc9 --- /dev/null +++ b/src_concurrent/test/CMakeLists.txt.in @@ -0,0 +1,15 @@ +cmake_minimum_required(VERSION 2.8.2) + +project(googletest-download NONE) + +include(ExternalProject) +ExternalProject_Add(googletest + GIT_REPOSITORY https://github.com/google/googletest.git + GIT_TAG "dea0216d0c6bc5e63cf5f6c8651cd268668032ec" + SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/googletest-src" + BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/googletest-build" + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + TEST_COMMAND "" +) \ No newline at end of file diff --git a/src_concurrent/test/ConditionLockIntegrationTest.cpp b/src_concurrent/test/ConditionLockIntegrationTest.cpp new file mode 100644 index 00000000..3cbde7e3 --- /dev/null +++ b/src_concurrent/test/ConditionLockIntegrationTest.cpp @@ -0,0 +1,65 @@ +// +// Created by fomenko on 26.09.2019. +// + +#include "gtest/gtest.h" +#include "gmock/gmock.h" + +#include "piconditionvar.h" +#include +#include "testutil.h" + +class ConditionLock : public ::testing::Test, public TestUtil { +protected: + void TearDown() override { + if (adapter != nullptr) delete adapter; + } +}; + +TEST_F(ConditionLock, lock_is_protect) { + PIConditionLock m; + m.lock(); + bool isProtect = true; + + createThread([&](){ + m.lock(); + isProtect = false; + }); + ASSERT_TRUE(isProtect); +} + +TEST_F(ConditionLock, unlock_is_release) { + PIConditionLock m; + m.lock(); + volatile bool isReleased = false; + m.unlock(); + + createThread([&](){ + m.lock(); + isReleased = true; + m.unlock(); + }); + EXPECT_TRUE(thread->waitForFinish(WAIT_THREAD_TIME_MS)); + ASSERT_TRUE(isReleased); +} + +TEST_F(ConditionLock, tryLock_is_false_when_locked) { + PIConditionLock m; + + createThread([&](){ + m.lock(); + piMSleep(WAIT_THREAD_TIME_MS); + }); + ASSERT_FALSE(m.tryLock()); +} + +TEST_F(ConditionLock, tryLock_is_true_when_unlocked) { + PIConditionLock m; + ASSERT_TRUE(m.tryLock()); +} + +TEST_F(ConditionLock, tryLock_is_recursive_lock_enable) { + PIConditionLock m; + m.lock(); + ASSERT_TRUE(m.tryLock()); +} \ No newline at end of file diff --git a/src_concurrent/test/ConditionVariableIntegrationTest.cpp b/src_concurrent/test/ConditionVariableIntegrationTest.cpp new file mode 100644 index 00000000..026a7009 --- /dev/null +++ b/src_concurrent/test/ConditionVariableIntegrationTest.cpp @@ -0,0 +1,224 @@ +// +// Created by fomenko on 24.09.2019. +// + +#include "gtest/gtest.h" +#include "piconditionvar.h" +#include "pithread.h" +#include "testutil.h" + +class ConditionVariable : public ::testing::Test, public TestUtil { +public: + PIConditionLock m; + PIConditionVariable* variable; + +protected: + void SetUp() override { + isRunning = false; + variable = new PIConditionVariable(); + adapterFunctionDefault = [&](){ + isRunning = true; + m.lock(); + variable->wait(m); + m.unlock(); + }; + } + + void TearDown() override { + if (adapter != nullptr) delete adapter; + } +}; + +TEST(ThreadFuncAdapter, registerToInvoke_is_stdFun_invoke) { + bool isInvoke = false; + StdFunctionThreadFuncAdapter adapter([&](){ + isInvoke = true; + }); + adapter.threadFunc()(adapter.data()); + ASSERT_TRUE(isInvoke); +} + +TEST_F(ConditionVariable, wait_is_block) { + createThread(); + ASSERT_FALSE(thread->waitForFinish(WAIT_THREAD_TIME_MS)); +} + +TEST_F(ConditionVariable, wait_is_block_when_notifyOne_before_wait) { + variable->notifyOne(); + createThread(); + ASSERT_FALSE(thread->waitForFinish(WAIT_THREAD_TIME_MS)); +} + +TEST_F(ConditionVariable, wait_is_block_when_notifyAll_before_wait) { + variable->notifyAll(); + createThread(); + ASSERT_FALSE(thread->waitForFinish(WAIT_THREAD_TIME_MS)); +} + +TEST_F(ConditionVariable, wait_is_unblock_when_notifyOne_after_wait) { + createThread(); + variable->notifyOne(); + ASSERT_TRUE(thread->waitForFinish(WAIT_THREAD_TIME_MS)); +} + +TEST_F(ConditionVariable, wait_is_unblock_when_notifyAll_after_wait) { + PIVector adapters; + PIVector threads; + + for (int i = 0; i < THREAD_COUNT; ++i) { + adapters.push_back(new StdFunctionThreadFuncAdapter(adapterFunctionDefault)); + threads.push_back(new PIThread(adapters.back()->data(), adapters.back()->threadFunc())); + } + + piForeach(PIThread* thread, threads) thread->startOnce(); + piMSleep(WAIT_THREAD_TIME_MS * THREAD_COUNT); + variable->notifyAll(); + PITimeMeasurer measurer; + piForeach(PIThread* thread, threads) { + int timeout = WAIT_THREAD_TIME_MS * THREAD_COUNT - (int)measurer.elapsed_m(); + thread->waitForFinish(timeout > 0 ? timeout : 0); + } + for (size_t i = 0; i < threads.size(); ++i) EXPECT_FALSE(threads[i]->isRunning()) << "Thread " << i << " still running"; + piForeach(PIThread* thread, threads) delete thread; + piForeach(StdFunctionThreadFuncAdapter* adapter, adapters) delete adapter; +} + +TEST_F(ConditionVariable, wait_is_one_unblock_when_notifyOne) { + PIVector adapters; + PIVector threads; + + for (int i = 0; i < THREAD_COUNT; ++i) { + adapters.push_back(new StdFunctionThreadFuncAdapter(adapterFunctionDefault)); + threads.push_back(new PIThread(adapters.back()->data(), adapters.back()->threadFunc())); + } + + piForeach(PIThread* thread, threads) thread->startOnce(); + piMSleep(WAIT_THREAD_TIME_MS * THREAD_COUNT); + variable->notifyOne(); + piMSleep(WAIT_THREAD_TIME_MS * THREAD_COUNT); + int runningThreadCount = 0; + piForeach(PIThread* thread, threads) if (thread->isRunning()) runningThreadCount++; + ASSERT_EQ(runningThreadCount, THREAD_COUNT - 1); +} + +TEST_F(ConditionVariable, wait_is_protected_unblock_when_notifyOne) { + createThread([&](){ + m.lock(); + variable->wait(m); + piMSleep(2 * WAIT_THREAD_TIME_MS); + // Missing unlock + }); + variable->notifyOne(); + msleep(WAIT_THREAD_TIME_MS); + ASSERT_FALSE(m.tryLock()); +} + +TEST_F(ConditionVariable, wait_condition_is_block) { + createThread([&](){ + m.lock(); + variable->wait(m, [](){ return false; }); + m.unlock(); + }); + ASSERT_FALSE(thread->waitForFinish(WAIT_THREAD_TIME_MS)); +} + +TEST_F(ConditionVariable, wait_condition_is_check_condition_before_block) { + bool isConditionChecked = false; + createThread([&](){ + m.lock(); + variable->wait(m, [&](){ + isConditionChecked = true; + return false; + }); + m.unlock(); + }); + m.lock(); + ASSERT_TRUE(isConditionChecked); + m.unlock(); +} + +TEST_F(ConditionVariable, wait_condition_is_check_condition_when_notifyOne) { + bool isConditionChecked; + createThread([&](){ + m.lock(); + variable->wait(m, [&](){ + isConditionChecked = true; + return false; + }); + m.unlock(); + }); + m.lock(); + isConditionChecked = false; + m.unlock(); + variable->notifyOne(); + msleep(threadStartTime + 1); + m.lock(); + ASSERT_TRUE(isConditionChecked); + m.unlock(); +} + +TEST_F(ConditionVariable, wait_condition_is_unblock_when_condition_and_notifyOne) { + bool condition = false; + createThread([&](){ + m.lock(); + variable->wait(m, [&](){ return condition; }); + m.unlock(); + }); + m.lock(); + condition = true; + m.unlock(); + variable->notifyOne(); + ASSERT_TRUE(thread->waitForFinish(WAIT_THREAD_TIME_MS)); +} + +TEST_F(ConditionVariable, DISABLED_waitFor_is_block_before_timeout) { + createThread([&](){ + PITimeMeasurer measurer; + m.lock(); + variable->waitFor(m, WAIT_THREAD_TIME_MS * 2); + m.unlock(); + // Not reliable because spurious wakeup may happen + ASSERT_GE(measurer.elapsed_m(), WAIT_THREAD_TIME_MS); + }); + EXPECT_TRUE(thread->waitForFinish(WAIT_THREAD_TIME_MS * 3)); +} + +TEST_F(ConditionVariable, waitFor_is_unblock_when_timeout) { + volatile bool isUnblock = false; + createThread([&](){ + m.lock(); + variable->waitFor(m, WAIT_THREAD_TIME_MS); + isUnblock = true; + m.unlock(); + }); + // Test failed if suspend forever + EXPECT_TRUE(thread->waitForFinish(2 * WAIT_THREAD_TIME_MS)); + ASSERT_TRUE(isUnblock); +} + +TEST_F(ConditionVariable, waitFor_is_false_when_timeout) { + bool waitRet = true; + createThread([&](){ + m.lock(); + waitRet = variable->waitFor(m, WAIT_THREAD_TIME_MS); + m.unlock(); + }); + EXPECT_TRUE(thread->waitForFinish(2 * WAIT_THREAD_TIME_MS)); + ASSERT_FALSE(waitRet); +} + +TEST_F(ConditionVariable, waitFor_is_unblock_when_condition_and_notifyOne) { + bool condition = false; + createThread([&](){ + m.lock(); + variable->waitFor(m, 3 * WAIT_THREAD_TIME_MS, [&](){ return condition; }); + m.unlock(); + }); + EXPECT_TRUE(thread->isRunning()); + m.lock(); + condition = true; + m.unlock(); + variable->notifyOne(); + msleep(WAIT_THREAD_TIME_MS); + ASSERT_FALSE(thread->isRunning()); +} \ No newline at end of file diff --git a/src_concurrent/test/ExecutorIntegrationTest.cpp b/src_concurrent/test/ExecutorIntegrationTest.cpp new file mode 100644 index 00000000..843a5fe9 --- /dev/null +++ b/src_concurrent/test/ExecutorIntegrationTest.cpp @@ -0,0 +1,33 @@ +// +// Created by fomenko on 24.09.2019. +// + +#include "gtest/gtest.h" +#include "executor.h" +#include "pimutex.h" + +const int WAIT_THREAD_TIME_MS = 30; + +TEST(ExcutorIntegrationTest, execute_is_runnable_invoke) { + PIMutex m; + int invokedRunnables = 0; + PIThreadPoolExecutor executorService(1); + executorService.execute([&]() { + m.lock(); + invokedRunnables++; + m.unlock(); + }); + piMSleep(WAIT_THREAD_TIME_MS); + ASSERT_EQ(invokedRunnables, 1); +} + +TEST(ExcutorIntegrationTest, execute_is_not_execute_after_shutdown) { + bool isRunnableInvoke = false; + PIThreadPoolExecutor executorService(1); + executorService.shutdown(); + executorService.execute([&]() { + isRunnableInvoke = true; + }); + piMSleep(WAIT_THREAD_TIME_MS); + ASSERT_FALSE(isRunnableInvoke); +} diff --git a/src_concurrent/test/ExecutorUnitTest.cpp b/src_concurrent/test/ExecutorUnitTest.cpp new file mode 100644 index 00000000..63e6fc4b --- /dev/null +++ b/src_concurrent/test/ExecutorUnitTest.cpp @@ -0,0 +1,127 @@ +// +// Created by fomenko on 23.09.2019. +// + +#include "executor.h" +#include "gtest/gtest.h" +#include "gmock/gmock.h" + +using ::testing::_; +using ::testing::SetArgReferee; +using ::testing::DoAll; +using ::testing::DeleteArg; +using ::testing::Return; +using ::testing::AtLeast; +using ::testing::ByRef; +using ::testing::Eq; + +typedef std::function VoidFunc; + +namespace std { + inline bool operator ==(const VoidFunc& s, const VoidFunc& v) { + // TODO VoidFunc operator == + return true; + } +} + +const int THREAD_COUNT = 2; + +class MockThread : public AbstractThread { +public: + MOCK_METHOD0(start, bool()); + MOCK_METHOD0(stop, void()); + MOCK_METHOD1(waitForStart, bool(int timeout_msecs)); + MOCK_METHOD1(waitForFinish, bool(int timeout_msecs)); +}; + +class MockDeque : public PIBlockingDequeue { +public: + MOCK_METHOD1(offer, bool(const VoidFunc&)); + MOCK_METHOD0(take, VoidFunc()); + MOCK_METHOD2(poll, VoidFunc(int timeoutMs, const VoidFunc& defaultVal)); + MOCK_METHOD0(capacity, size_t()); + MOCK_METHOD0(remainingCapacity, size_t()); +}; + +class MockThreadFactory : public PIThreadFactory { +public: + int callCount = 0; + PIVector threads; + std::function checkThreadExpectations = [](MockThread* thread){ + EXPECT_CALL(*thread, start()) + .WillOnce(Return(true)); + EXPECT_CALL(*thread, stop()) + .WillOnce(Return()); + }; + + std::function checkThreadFunc = [](const VoidFunc& fun) { }; + + AbstractThread* newThread(const VoidFunc& fun) override { + callCount++; + auto* thread = new MockThread(); + threads.push_back(thread); + checkThreadExpectations(thread); + checkThreadFunc(fun); + return threads.back(); + } + +}; + +TEST(ExecutorUnitTest, is_corePool_created) { + auto* deque = new MockDeque(); + auto* threadFactory = new MockThreadFactory(); + PIThreadPoolExecutor executor(THREAD_COUNT, deque, threadFactory); + ASSERT_EQ(THREAD_COUNT, threadFactory->callCount); +} + +TEST(ExecutorUnitTest, is_corePool_started) { + auto* deque = new MockDeque(); + auto* threadFactory = new MockThreadFactory(); + PIThreadPoolExecutor executor(THREAD_COUNT, deque, threadFactory); +} + +TEST(ExecutorUnitTest, execute_is_added_to_taskQueue) { + VoidFunc voidFunc = [](){}; + auto* deque = new MockDeque(); + EXPECT_CALL(*deque, offer(Eq(voidFunc))) + .WillOnce(Return(true)); + + auto* threadFactory = new MockThreadFactory(); + PIThreadPoolExecutor executor(THREAD_COUNT, deque, threadFactory); + executor.execute([]() {}); +} + +TEST(ExecutorUnitTest, is_corePool_execute_queue_elements) { + auto* deque = new MockDeque(); + auto* threadFactory = new MockThreadFactory(); + threadFactory->checkThreadFunc = [](const VoidFunc& fun) { + fun(); + }; + ON_CALL(*deque, take()) + .WillByDefault(Return([](){})); + EXPECT_CALL(*deque, poll(_, _)) + .Times(THREAD_COUNT) + .WillRepeatedly(Return([](){})); + PIThreadPoolExecutor executor(THREAD_COUNT, deque, threadFactory); +} + +TEST(ExecutorUnitTest, shutdown_is_stop_threads) { + auto* deque = new MockDeque(); + auto* threadFactory = new MockThreadFactory(); + PIVector threadFuncs; + threadFactory->checkThreadExpectations = [](MockThread* thread) { + EXPECT_CALL(*thread, start()) + .WillOnce(Return(true)); + EXPECT_CALL(*thread, stop()) + .WillRepeatedly(Return()); + }; + threadFactory->checkThreadFunc = [&](const VoidFunc& threadFunc) { threadFuncs.push_back(threadFunc); }; + ON_CALL(*deque, take()) + .WillByDefault(Return(VoidFunc())); + EXPECT_CALL(*deque, poll(_, _)) + .Times(THREAD_COUNT) + .WillRepeatedly(Return(VoidFunc())); + PIThreadPoolExecutor executor(THREAD_COUNT, deque, threadFactory); + executor.shutdown(); + piForeachC(VoidFunc& threadFunc, threadFuncs) threadFunc(); +} \ No newline at end of file diff --git a/src_concurrent/test/testutil.h b/src_concurrent/test/testutil.h new file mode 100644 index 00000000..8be14302 --- /dev/null +++ b/src_concurrent/test/testutil.h @@ -0,0 +1,60 @@ +// +// Created by fomenko on 27.09.2019. +// + +#ifndef AWRCANFLASHER_TESTUTIL_H +#define AWRCANFLASHER_TESTUTIL_H + +#include "pithread.h" + +/** + * Minimum wait thread start, switch context or another interthread communication action time. Increase it if tests + * write "Start thread timeout reach!" message. You can reduce it if you want increase test performance. + */ +const int WAIT_THREAD_TIME_MS = 40; + +const int THREAD_COUNT = 5; + +class TestUtil: public PIObject { +PIOBJECT(TestUtil) +public: + double threadStartTime; + PIThread* thread = new PIThread(); + StdFunctionThreadFuncAdapter* adapter = nullptr; + volatile bool isRunning; + std::function adapterFunctionDefault; + + bool createThread(const std::function& fun = nullptr, PIThread* thread_ = nullptr) { + adapter = new StdFunctionThreadFuncAdapter(fun == nullptr ? adapterFunctionDefault : fun); + if (thread_ == nullptr) thread_ = thread; + adapter->registerToInvoke(thread_); + thread_->startOnce(); + return waitThread(thread_); + } + + bool waitThread(PIThread* thread_, bool runningStatus = true) { + PITimeMeasurer measurer; + bool isTimeout = !thread_->waitForStart(WAIT_THREAD_TIME_MS); + while (!isRunning) { + isTimeout = WAIT_THREAD_TIME_MS <= measurer.elapsed_m(); + if (isTimeout) break; + piMSleep(1); + } + + threadStartTime = measurer.elapsed_m(); + + if (isTimeout) piCout << "Start thread timeout reach!"; + + if (threadStartTime > 1) { + piCout << "Start time" << threadStartTime << "ms"; + } else if (threadStartTime > 0.001) { + piCout << "Start time" << threadStartTime * 1000 << "mcs"; + } else { + piCout << "Start time" << threadStartTime * 1000 * 1000 << "ns"; + } + + return !isTimeout; + } +}; + +#endif //AWRCANFLASHER_TESTUTIL_H diff --git a/src_main/concurrent/executor.h b/src_main/concurrent/executor.h new file mode 100644 index 00000000..71742144 --- /dev/null +++ b/src_main/concurrent/executor.h @@ -0,0 +1,89 @@ +// +// Created by fomenko on 20.09.2019. +// + +#ifndef PIP_TESTS_EXECUTOR_H +#define PIP_TESTS_EXECUTOR_H + +#include +#include +#include +#include "piblockingdequeue.h" + +class AbstractThread { +public: + virtual bool start() = 0; + virtual bool waitForStart(int timeout_msecs) = 0; + virtual bool waitForFinish(int timeout_msecs) = 0; + virtual void stop() = 0; + virtual ~AbstractThread() = default; +}; + +class Thread : public AbstractThread { +public: + explicit Thread(const std::function& fun = [](){}) : adapter(fun) { + adapter.registerToInvoke(&thread); + } + virtual ~Thread() = default; + + inline bool start() override { return thread.start(); } + inline bool waitForStart(int timeout_msecs) override { return thread.waitForStart(timeout_msecs); } + inline bool waitForFinish(int timeout_msecs) override { return thread.waitForFinish(timeout_msecs); } + inline void stop() override { thread.stop(); } + +private: + PIThread thread; + StdFunctionThreadFuncAdapter adapter; +}; + +class PIThreadFactory { +public: + inline virtual AbstractThread* newThread(const std::function& fun) { + return new Thread(fun); + } + virtual ~PIThreadFactory() = default; +}; + + +/** + * @brief Thread pools address two different problems: they usually provide improved performance when executing large + * numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and + * managing the resources, including threads, consumed when executing a collection of tasks. + */ +class PIThreadPoolExecutor { +public: + explicit PIThreadPoolExecutor(size_t corePoolSize = 1, PIBlockingDequeue >* taskQueue_ = new PIBlockingDequeue >(), PIThreadFactory* threadFactory = new PIThreadFactory()); + + virtual ~PIThreadPoolExecutor(); + + /** + * @brief Executes the given task sometime in the future. The task execute in an existing pooled thread. If the task + * cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been + * reached. + * + * @param runnable not empty function for thread pool execution + */ + void execute(const std::function& runnable); + + void shutdownNow(); + + /** + * @brief Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be + * accepted. Invocation has no additional effect if already shut down. This method does not wait for previously + * submitted tasks to complete execution. Use awaitTermination to do that. + */ + void shutdown() { + isShutdown_ = true; + } + + volatile bool isShutdown() const; + + bool awaitTermination(int timeoutMs); +private: + volatile bool isShutdown_; + PIBlockingDequeue >* taskQueue; + PIThreadFactory* threadFactory; + PIVector threadPool; +}; + +#endif //PIP_TESTS_EXECUTOR_H diff --git a/src_main/concurrent/piblockingdequeue.h b/src_main/concurrent/piblockingdequeue.h new file mode 100644 index 00000000..c1b50a35 --- /dev/null +++ b/src_main/concurrent/piblockingdequeue.h @@ -0,0 +1,148 @@ +// +// Created by fomenko on 20.09.2019. +// + +#ifndef PIP_TESTS_PIBLOCKINGDEQUEUE_H +#define PIP_TESTS_PIBLOCKINGDEQUEUE_H + +#include "pideque.h" +#include "piconditionvar.h" + +/** + * @brief A Queue that supports operations that wait for the queue to become non-empty when retrieving an element, and + * wait for space to become available in the queue when storing an element. + */ +template +class PIBlockingDequeue: private PIDeque { +public: + explicit inline PIBlockingDequeue(size_t capacity = SIZE_MAX, PIConditionVariable* cond_var = new PIConditionVariable()) : condition_var(cond_var), max_size(capacity) { } + explicit inline PIBlockingDequeue(const PIDeque& other) : condition_var(new PIConditionVariable()) { + max_size = SIZE_MAX; + PIDeque::append(other); + } + inline PIBlockingDequeue(PIBlockingDequeue & other) : condition_var(new PIConditionVariable()) { + other.mutex.lock(); + max_size = other.max_size; + PIDeque::append(static_cast&>(other)); + other.mutex.unlock(); + } + virtual ~PIBlockingDequeue() { + delete condition_var; + } + + /** + * @brief Inserts the specified element at the end of this queue if it is possible to do so immediately without + * exceeding the queue's capacity, returning true upon success and false if this queue is full. + * + * @param v the element to add + * @return true if the element was added to this queue, else false + */ + virtual bool offer(const T & v) { + mutex.lock(); + if (PIDeque::size() >= max_size) { + mutex.unlock(); + return false; + } + PIDeque::push_back(v); + mutex.unlock(); + condition_var->notifyOne(); + return true; + } + + /** + * @brief Retrieves and removes the head of this queue, waiting if necessary until an element becomes available. + * + * @return the head of this queue + */ + virtual T take() { + T t; + mutex.lock(); + condition_var->wait(mutex, [&]() { return !PIDeque::isEmpty(); }); + t = T(PIDeque::take_front()); + mutex.unlock(); + return t; + } + + /** + * @brief Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an + * element to become available. + * + * @param timeoutMs how long to wait before giving up, in milliseconds + * @param defaultVal value, which returns if the specified waiting time elapses before an element is available + * @return the head of this queue, or defaultVal if the specified waiting time elapses before an element is available + */ + virtual T poll(int timeoutMs, const T & defaultVal) { + T t; + mutex.lock(); + bool isOk = condition_var->waitFor(mutex, timeoutMs, [&]() { return !PIDeque::isEmpty(); }); + t = isOk ? T(PIDeque::take_front()) : defaultVal; + mutex.unlock(); + return t; + } + + virtual size_t capacity() { + size_t c; + mutex.lock(); + c = max_size; + mutex.unlock(); + return c; + } + + /** + * @brief Returns the number of additional elements that this queue can ideally (in the absence of memory or resource + * constraints) accept. This is always equal to the initial capacity of this queue less the current size of this queue. + * + * @return the remaining capacity + */ + virtual size_t remainingCapacity() { + mutex.lock(); + size_t c = max_size - PIDeque::size(); + mutex.unlock(); + return c; + } + + virtual size_t size() { + mutex.lock(); + size_t s = PIDeque::size(); + mutex.unlock(); + return s; + } + + /** + * @brief Removes all available elements from this queue and adds them to other given queue. + */ + virtual size_t drainTo(PIDeque& other, size_t maxCount = SIZE_MAX) { + mutex.lock(); + size_t count = maxCount > PIDeque::size() ? PIDeque::size() : maxCount; + for (size_t i = 0; i < count; ++i) other.push_back(PIDeque::take_front()); + mutex.unlock(); + return count; + } + + /** + * @brief Removes all available elements from this queue and adds them to other given queue. + */ + virtual size_t drainTo(PIBlockingDequeue& other, size_t maxCount = SIZE_MAX) { + mutex.lock(); + other.mutex.lock(); + size_t count = maxCount > PIDeque::size() ? PIDeque::size() : maxCount; + size_t otherRemainingCapacity = other.max_size - static_cast>(other).size(); + if (count > otherRemainingCapacity) count = otherRemainingCapacity; + for (size_t i = 0; i < count; ++i) other.push_back(PIDeque::take_front()); + other.mutex.unlock(); + mutex.unlock(); + return count; + } + + PIConditionVariable *getConditionVar() const { + return condition_var; + } + +private: + PIConditionLock mutex; + PIConditionVariable* condition_var; + size_t max_size; +}; + + +#endif //PIP_TESTS_PIBLOCKINGDEQUEUE_H diff --git a/src_main/concurrent/piconditionlock.h b/src_main/concurrent/piconditionlock.h new file mode 100644 index 00000000..722885d7 --- /dev/null +++ b/src_main/concurrent/piconditionlock.h @@ -0,0 +1,26 @@ +// +// Created by fomenko on 25.09.2019. +// + +#ifndef AWRCANFLASHER_PICONDITIONLOCK_H +#define AWRCANFLASHER_PICONDITIONLOCK_H + +#include +#include + +class PIP_EXPORT PIConditionLock { +public: + explicit PIConditionLock(); + ~PIConditionLock(); + + void lock(); + void unlock(); + bool tryLock(); + void* handle(); +private: + NO_COPY_CLASS(PIConditionLock) + PRIVATE_DECLARATION +}; + + +#endif //AWRCANFLASHER_PICONDITIONLOCK_H diff --git a/src_main/concurrent/piconditionvar.h b/src_main/concurrent/piconditionvar.h new file mode 100644 index 00000000..b87aacc0 --- /dev/null +++ b/src_main/concurrent/piconditionvar.h @@ -0,0 +1,48 @@ +// +// Created by fomenko on 20.09.2019. +// + +#ifndef PIP_TESTS_PICONDITIONVAR_H +#define PIP_TESTS_PICONDITIONVAR_H + +#include "piconditionlock.h" +#include +#include "piinit.h" + +#define PICONDITION_RELIABLE true + +class PIP_EXPORT PIConditionVariable { +public: + explicit PIConditionVariable(); + virtual ~PIConditionVariable(); + + virtual void notifyOne(); + virtual void notifyAll(); + virtual void wait(PIConditionLock& lk); + virtual void wait(PIConditionLock& lk, const std::function& condition); + virtual bool waitFor(PIConditionLock& lk, int timeoutMs); + virtual bool waitFor(PIConditionLock& lk, int timeoutMs, const std::function& condition); +private: + NO_COPY_CLASS(PIConditionVariable) + + PRIVATE_DECLARATION +}; + + +class PIThread; +typedef void (*ThreadFunc)(void * ); + +class StdFunctionThreadFuncAdapter { +public: + static void threadFuncStdFunctionAdapter(void* it); + + explicit StdFunctionThreadFuncAdapter(const std::function& fun_): fun(fun_) {} + + void registerToInvoke(PIThread* thread); + void* data() const { return (void*)this; } + ThreadFunc threadFunc() const { return threadFuncStdFunctionAdapter; } +private: + std::function fun; +}; + +#endif //PIP_TESTS_PICONDITIONVAR_H