Added inter thread communication lib

git-svn-id: svn://db.shs.com.ru/pip@858 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5
This commit is contained in:
16 changed files with 1366 additions and 3 deletions

View File

@@ -0,0 +1,50 @@
//
// Created by fomenko on 23.09.2019.
//
#include "executor.h"
PIThreadPoolExecutor::PIThreadPoolExecutor(size_t corePoolSize, PIBlockingDequeue<std::function<void()>> *taskQueue_,
PIThreadFactory *threadFactory) : isShutdown_(false), taskQueue(taskQueue_), threadFactory(threadFactory) {
for (size_t i = 0; i < corePoolSize; ++i) {
AbstractThread* thread = threadFactory->newThread([&, i](){
auto runnable = taskQueue->poll(100, std::function<void()>());
if (runnable) {
runnable();
} else if (isShutdown_) threadPool[i]->stop();
});
threadPool.push_back(thread);
thread->start();
}
}
bool PIThreadPoolExecutor::awaitTermination(int timeoutMs) {
PITimeMeasurer measurer;
for (size_t i = 0; i < threadPool.size(); ++i) {
int dif = timeoutMs - (int)measurer.elapsed_m();
if (dif < 0) return false;
if (!threadPool[i]->waitForFinish(dif)) return false;
}
return true;
}
void PIThreadPoolExecutor::shutdownNow() {
isShutdown_ = true;
for (size_t i = 0; i < threadPool.size(); ++i) threadPool[i]->stop();
}
PIThreadPoolExecutor::~PIThreadPoolExecutor() {
shutdownNow();
taskQueue->getConditionVar()->notifyAll();
while (threadPool.size() > 0) delete threadPool.take_back();
delete threadFactory;
delete taskQueue;
}
void PIThreadPoolExecutor::execute(const std::function<void()> &runnable) {
if (!isShutdown_) taskQueue->offer(runnable);
}
volatile bool PIThreadPoolExecutor::isShutdown() const {
return isShutdown_;
}

View File

@@ -0,0 +1,81 @@
//
// Created by fomenko on 25.09.2019.
//
#include "piconditionlock.h"
#ifdef WINDOWS
#include "synchapi.h"
#else
#include "pthread.h"
#endif
PRIVATE_DEFINITION_START(PIConditionLock)
#ifdef WINDOWS
CRITICAL_SECTION
#else
pthread_mutex_t
#endif
nativeHandle;
PRIVATE_DEFINITION_END(PIConditionLock)
#ifdef WINDOWS
PIConditionLock::PIConditionLock() {
InitializeCriticalSection(&PRIVATE->nativeHandle);
}
PIConditionLock::~PIConditionLock() {
DeleteCriticalSection(&PRIVATE->nativeHandle);
}
void PIConditionLock::lock() {
EnterCriticalSection(&PRIVATE->nativeHandle);
}
void PIConditionLock::unlock() {
LeaveCriticalSection(&PRIVATE->nativeHandle);
}
void *PIConditionLock::handle() {
return &PRIVATE->nativeHandle;
}
bool PIConditionLock::tryLock() {
return TryEnterCriticalSection(&PRIVATE->nativeHandle) != 0;
}
#else
PIConditionLock::PIConditionLock() {
pthread_mutexattr_t attr;
memset(&attr, 0, sizeof(attr));
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
memset(&(PRIVATE->nativeHandle), 0, sizeof(PRIVATE->nativeHandle));
pthread_mutex_init(&(PRIVATE->nativeHandle), &attr);
pthread_mutexattr_destroy(&attr);
}
PIConditionLock::~PIConditionLock() {
pthread_mutex_destroy(&(PRIVATE->nativeHandle));
}
void PIConditionLock::lock() {
pthread_mutex_lock(&(PRIVATE->nativeHandle));
}
void PIConditionLock::unlock() {
pthread_mutex_unlock(&(PRIVATE->nativeHandle));
}
void *PIConditionLock::handle() {
return &PRIVATE->nativeHandle;
}
bool PIConditionLock::tryLock() {
return (pthread_mutex_trylock(&(PRIVATE->nativeHandle)) == 0);;
}
#endif

View File

@@ -0,0 +1,129 @@
//
// Created by fomenko on 20.09.2019.
//
#include "piplatform.h"
#ifdef WINDOWS
#define _WIN32_WINNT 0x0600
#include "synchapi.h"
#include <windef.h>
#include <winbase.h>
#else
#endif
#include "piconditionvar.h"
#include "pithread.h"
#include "pitime.h"
PRIVATE_DEFINITION_START(PIConditionVariable)
#ifdef WINDOWS
CONDITION_VARIABLE nativeHandle;
#else
pthread_cond_t nativeHandle;
PIConditionLock* currentLock;
#endif
bool isDestroying;
PRIVATE_DEFINITION_END(PIConditionVariable)
PIConditionVariable::PIConditionVariable() {
#ifdef WINDOWS
InitializeConditionVariable(&PRIVATE->nativeHandle);
#else
PRIVATE->isDestroying = false;
PRIVATE->currentLock = nullptr;
memset(&(PRIVATE->nativeHandle), 0, sizeof(PRIVATE->nativeHandle));
pthread_cond_init(&PRIVATE->nativeHandle, NULL);
#endif
}
PIConditionVariable::~PIConditionVariable() {
#ifdef WINDOWS
#else
pthread_cond_destroy(&PRIVATE->nativeHandle);
#endif
}
void PIConditionVariable::wait(PIConditionLock& lk) {
#ifdef WINDOWS
SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), INFINITE);
#else
pthread_cond_wait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle());
#endif
}
void PIConditionVariable::wait(PIConditionLock& lk, const std::function<bool()>& condition) {
bool isCondition;
while (true) {
isCondition = condition();
if (isCondition) break;
#ifdef WINDOWS
SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), INFINITE);
#else
pthread_cond_wait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle());
#endif
if (PRIVATE->isDestroying) return;
}
}
bool PIConditionVariable::waitFor(PIConditionLock &lk, int timeoutMs) {
bool isTimeout;
#ifdef WINDOWS
isTimeout = SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), timeoutMs) != 0;
#else
timespec abstime = {.tv_sec = timeoutMs / 1000, .tv_nsec = timeoutMs * 1000 * 1000};
isTimeout = pthread_cond_timedwait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle(), &abstime) == 0;
#endif
if (PRIVATE->isDestroying) return false;
return isTimeout;
}
bool PIConditionVariable::waitFor(PIConditionLock& lk, int timeoutMs, const std::function<bool()> &condition) {
bool isCondition;
PITimeMeasurer measurer;
while (true) {
isCondition = condition();
if (isCondition) break;
#ifdef WINDOWS
WINBOOL isTimeout = SleepConditionVariableCS(
&PRIVATE->nativeHandle,
(PCRITICAL_SECTION)lk.handle(),
timeoutMs - (int)measurer.elapsed_m());
if (isTimeout == 0) return false;
#else
int timeoutCurr = timeoutMs - (int)measurer.elapsed_m();
timespec abstime = {.tv_sec = timeoutCurr / 1000, .tv_nsec = timeoutCurr * 1000 * 1000};
bool isTimeout = pthread_cond_timedwait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle(), &abstime) == 0;
if (isTimeout) return false;
#endif
if (PRIVATE->isDestroying) return false;
}
return true;
}
void PIConditionVariable::notifyOne() {
#ifdef WINDOWS
WakeConditionVariable(&PRIVATE->nativeHandle);
#else
pthread_cond_signal(&PRIVATE->nativeHandle);
#endif
}
void PIConditionVariable::notifyAll() {
#ifdef WINDOWS
WakeAllConditionVariable(&PRIVATE->nativeHandle);
#else
pthread_cond_broadcast(&PRIVATE->nativeHandle);
#endif
}
void StdFunctionThreadFuncAdapter::threadFuncStdFunctionAdapter(void *it) {
auto consumer = (StdFunctionThreadFuncAdapter*)it;
consumer->fun();
}
void StdFunctionThreadFuncAdapter::registerToInvoke(PIThread *thread) {
thread->setData(data());
thread->setSlot((ThreadFunc) threadFunc());
}

View File

@@ -0,0 +1,211 @@
//
// Created by fomenko on 23.09.2019.
//
#include "gtest/gtest.h"
#include "piblockingdequeue.h"
class MockConditionVar: public PIConditionVariable {
public:
bool isWaitCalled = false;
bool isWaitForCalled = false;
bool isTrueCondition = false;
int timeout = -1;
void wait(PIConditionLock& lk) override {
isWaitCalled = true;
}
void wait(PIConditionLock& lk, const std::function<bool()>& condition) override {
isWaitCalled = true;
lk.lock();
isTrueCondition = condition();
lk.unlock();
}
bool waitFor(PIConditionLock& lk, int timeoutMs) override {
isWaitForCalled = true;
timeout = timeoutMs;
return false;
}
bool waitFor(PIConditionLock& lk, int timeoutMs, const std::function<bool()>& condition) override {
isWaitForCalled = true;
lk.lock();
isTrueCondition = condition();
timeout = timeoutMs;
lk.unlock();
return isTrueCondition;
}
};
TEST(BlockingDequeueUnitTest, offer_is_true_before_capacity_reach) {
size_t capacity = 1;
PIBlockingDequeue<int> dequeue(capacity);
ASSERT_TRUE(dequeue.offer(10));
}
TEST(BlockingDequeueUnitTest, offer_is_false_when_capacity_reach) {
size_t capacity = 1;
PIBlockingDequeue<int> dequeue(capacity);
dequeue.offer(11);
ASSERT_FALSE(dequeue.offer(10));
}
// TODO change take_is_block_when_empty to prevent segfault
TEST(DISABLED_BlockingDequeueUnitTest, take_is_block_when_empty) {
size_t capacity = 1;
auto conditionVar = new MockConditionVar();
PIBlockingDequeue<int> dequeue(capacity, conditionVar);
// May cause segfault because take front of empty queue
dequeue.take();
EXPECT_TRUE(conditionVar->isWaitCalled);
ASSERT_FALSE(conditionVar->isTrueCondition);
}
TEST(BlockingDequeueUnitTest, take_is_not_block_when_not_empty) {
size_t capacity = 1;
auto conditionVar = new MockConditionVar();
PIBlockingDequeue<int> dequeue(capacity, conditionVar);
dequeue.offer(111);
dequeue.take();
EXPECT_TRUE(conditionVar->isWaitCalled);
ASSERT_TRUE(conditionVar->isTrueCondition);
}
TEST(BlockingDequeueUnitTest, take_is_value_eq_to_offer_value) {
size_t capacity = 1;
auto conditionVar = new MockConditionVar();
PIBlockingDequeue<int> dequeue(capacity, conditionVar);
dequeue.offer(111);
ASSERT_EQ(dequeue.take(), 111);
}
TEST(BlockingDequeueUnitTest, take_is_last) {
size_t capacity = 10;
auto conditionVar = new MockConditionVar();
PIBlockingDequeue<int> dequeue(capacity, conditionVar);
EXPECT_TRUE(dequeue.offer(111));
EXPECT_TRUE(dequeue.offer(222));
ASSERT_EQ(dequeue.take(), 111);
ASSERT_EQ(dequeue.take(), 222);
}
TEST(BlockingDequeueUnitTest, poll_is_block_when_empty) {
size_t capacity = 1;
int timeout = 11;
auto conditionVar = new MockConditionVar();
PIBlockingDequeue<int> dequeue(capacity, conditionVar);
dequeue.poll(timeout, 111);
EXPECT_TRUE(conditionVar->isWaitForCalled);
EXPECT_EQ(timeout, conditionVar->timeout);
ASSERT_FALSE(conditionVar->isTrueCondition);
}
TEST(BlockingDequeueUnitTest, poll_is_default_value_when_empty) {
size_t capacity = 1;
int timeout = 11;
auto conditionVar = new MockConditionVar();
PIBlockingDequeue<int> dequeue(capacity, conditionVar);
ASSERT_EQ(dequeue.poll(timeout, 111), 111);
}
TEST(BlockingDequeueUnitTest, poll_is_not_block_when_not_empty) {
size_t capacity = 1;
int timeout = 11;
auto conditionVar = new MockConditionVar();
PIBlockingDequeue<int> dequeue(capacity, conditionVar);
dequeue.offer(111);
dequeue.poll(timeout, -1);
EXPECT_TRUE(conditionVar->isWaitForCalled);
ASSERT_TRUE(conditionVar->isTrueCondition);
}
TEST(BlockingDequeueUnitTest, poll_is_offer_value_when_not_empty) {
size_t capacity = 1;
int timeout = 11;
auto conditionVar = new MockConditionVar();
PIBlockingDequeue<int> dequeue(capacity, conditionVar);
dequeue.offer(111);
ASSERT_EQ(dequeue.poll(timeout, -1), 111);
}
TEST(BlockingDequeueUnitTest, poll_is_last) {
size_t capacity = 10;
auto conditionVar = new MockConditionVar();
PIBlockingDequeue<int> dequeue(capacity, conditionVar);
dequeue.offer(111);
dequeue.offer(222);
ASSERT_EQ(dequeue.poll(10, -1), 111);
ASSERT_EQ(dequeue.poll(10, -1), 222);
}
TEST(BlockingDequeueUnitTest, capacity_is_eq_constructor_capacity) {
size_t capacity = 10;
PIBlockingDequeue<int> dequeue(capacity);
ASSERT_EQ(dequeue.capacity(), capacity);
}
TEST(BlockingDequeueUnitTest, remainingCapacity_is_dif_of_capacity_and_size) {
size_t capacity = 2;
PIBlockingDequeue<int> dequeue(capacity);
ASSERT_EQ(dequeue.remainingCapacity(), capacity);
dequeue.offer(111);
ASSERT_EQ(dequeue.remainingCapacity(), capacity - 1);
}
TEST(BlockingDequeueUnitTest, remainingCapacity_is_zero_when_capacity_reach) {
size_t capacity = 1;
PIBlockingDequeue<int> dequeue(capacity);
dequeue.offer(111);
dequeue.offer(111);
ASSERT_EQ(dequeue.remainingCapacity(), 0);
}
TEST(BlockingDequeueUnitTest, size_is_eq_to_num_of_elements) {
size_t capacity = 1;
PIBlockingDequeue<int> dequeue(capacity);
ASSERT_EQ(dequeue.size(), 0);
dequeue.offer(111);
ASSERT_EQ(dequeue.size(), 1);
}
TEST(BlockingDequeueUnitTest, size_is_eq_to_capacity_when_capacity_reach) {
size_t capacity = 1;
PIBlockingDequeue<int> dequeue(capacity);
dequeue.offer(111);
dequeue.offer(111);
ASSERT_EQ(dequeue.size(), capacity);
}
TEST(BlockingDequeueUnitTest, drainTo_is_elements_moved) {
size_t capacity = 10;
PIDeque<int> refDeque;
for (size_t i = 0; i < capacity / 2; ++i) refDeque.push_back(i * 10);
PIBlockingDequeue<int> blockingDequeue(refDeque);
PIDeque<int> deque;
blockingDequeue.drainTo(deque);
ASSERT_EQ(blockingDequeue.size(), 0);
ASSERT_TRUE(deque == refDeque);
}
TEST(BlockingDequeueUnitTest, drainTo_is_ret_eq_to_size_when_all_moved) {
size_t capacity = 10;
PIDeque<int> refDeque;
for (size_t i = 0; i < capacity / 2; ++i) refDeque.push_back(i * 10);
PIBlockingDequeue<int> blockingDequeue(refDeque);
PIDeque<int> deque;
ASSERT_EQ(blockingDequeue.drainTo(deque), refDeque.size());
}
TEST(BlockingDequeueUnitTest, drainTo_is_ret_eq_to_maxCount) {
size_t capacity = 10;
PIDeque<int> refDeque;
for (size_t i = 0; i < capacity / 2; ++i) refDeque.push_back(i * 10);
PIBlockingDequeue<int> blockingDequeue(refDeque);
PIDeque<int> deque;
ASSERT_EQ(blockingDequeue.drainTo(deque, refDeque.size() - 1), refDeque.size() - 1);
}

View File

@@ -0,0 +1,45 @@
project(concurrent_test)
# Download and unpack googletest at configure time
configure_file(CMakeLists.txt.in googletest-download/CMakeLists.txt)
execute_process(COMMAND ${CMAKE_COMMAND} -G "${CMAKE_GENERATOR}" .
RESULT_VARIABLE result
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/googletest-download )
if(result)
message(FATAL_ERROR "CMake step for googletest failed: ${result}")
endif()
execute_process(COMMAND ${CMAKE_COMMAND} --build .
RESULT_VARIABLE result
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/googletest-download )
if(result)
message(FATAL_ERROR "Build step for googletest failed: ${result}")
endif()
# Prevent overriding the parent project's compiler/linker
# settings on Windows
set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
# Add googletest directly to our build. This defines
# the gtest and gtest_main targets.
add_subdirectory(${CMAKE_CURRENT_BINARY_DIR}/googletest-src
${CMAKE_CURRENT_BINARY_DIR}/googletest-build
EXCLUDE_FROM_ALL)
# The gtest/gtest_main targets carry header search path
# dependencies automatically when using CMake 2.8.11 or
# later. Otherwise we have to add them here ourselves.
if (CMAKE_VERSION VERSION_LESS 2.8.11)
include_directories("${gtest_SOURCE_DIR}/include")
endif()
file(GLOB_RECURSE CPPS "*.cpp")
#find_package(GTest REQUIRED)
#message(STATUS "GTEST_INCLUDES: ${GTEST_INCLUDES}")
include_directories(${PROJECT_BINARY_DIR})
add_executable(${PROJECT_NAME} ${CPPS})
target_link_libraries(${PROJECT_NAME} gtest_main gmock_main pip_concurrent)
add_test(NAME ${PROJECT_NAME}_test COMMAND tests)
ADD_CUSTOM_COMMAND(TARGET ${PROJECT_NAME} POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy $<TARGET_FILE:pip_concurrent> ${CMAKE_CURRENT_BINARY_DIR})
add_custom_target(${PROJECT_NAME}_perform ALL COMMAND ${PROJECT_NAME})

View File

@@ -0,0 +1,15 @@
cmake_minimum_required(VERSION 2.8.2)
project(googletest-download NONE)
include(ExternalProject)
ExternalProject_Add(googletest
GIT_REPOSITORY https://github.com/google/googletest.git
GIT_TAG "dea0216d0c6bc5e63cf5f6c8651cd268668032ec"
SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/googletest-src"
BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/googletest-build"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)

View File

@@ -0,0 +1,65 @@
//
// Created by fomenko on 26.09.2019.
//
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include "piconditionvar.h"
#include <pithread.h>
#include "testutil.h"
class ConditionLock : public ::testing::Test, public TestUtil {
protected:
void TearDown() override {
if (adapter != nullptr) delete adapter;
}
};
TEST_F(ConditionLock, lock_is_protect) {
PIConditionLock m;
m.lock();
bool isProtect = true;
createThread([&](){
m.lock();
isProtect = false;
});
ASSERT_TRUE(isProtect);
}
TEST_F(ConditionLock, unlock_is_release) {
PIConditionLock m;
m.lock();
volatile bool isReleased = false;
m.unlock();
createThread([&](){
m.lock();
isReleased = true;
m.unlock();
});
EXPECT_TRUE(thread->waitForFinish(WAIT_THREAD_TIME_MS));
ASSERT_TRUE(isReleased);
}
TEST_F(ConditionLock, tryLock_is_false_when_locked) {
PIConditionLock m;
createThread([&](){
m.lock();
piMSleep(WAIT_THREAD_TIME_MS);
});
ASSERT_FALSE(m.tryLock());
}
TEST_F(ConditionLock, tryLock_is_true_when_unlocked) {
PIConditionLock m;
ASSERT_TRUE(m.tryLock());
}
TEST_F(ConditionLock, tryLock_is_recursive_lock_enable) {
PIConditionLock m;
m.lock();
ASSERT_TRUE(m.tryLock());
}

View File

@@ -0,0 +1,224 @@
//
// Created by fomenko on 24.09.2019.
//
#include "gtest/gtest.h"
#include "piconditionvar.h"
#include "pithread.h"
#include "testutil.h"
class ConditionVariable : public ::testing::Test, public TestUtil {
public:
PIConditionLock m;
PIConditionVariable* variable;
protected:
void SetUp() override {
isRunning = false;
variable = new PIConditionVariable();
adapterFunctionDefault = [&](){
isRunning = true;
m.lock();
variable->wait(m);
m.unlock();
};
}
void TearDown() override {
if (adapter != nullptr) delete adapter;
}
};
TEST(ThreadFuncAdapter, registerToInvoke_is_stdFun_invoke) {
bool isInvoke = false;
StdFunctionThreadFuncAdapter adapter([&](){
isInvoke = true;
});
adapter.threadFunc()(adapter.data());
ASSERT_TRUE(isInvoke);
}
TEST_F(ConditionVariable, wait_is_block) {
createThread();
ASSERT_FALSE(thread->waitForFinish(WAIT_THREAD_TIME_MS));
}
TEST_F(ConditionVariable, wait_is_block_when_notifyOne_before_wait) {
variable->notifyOne();
createThread();
ASSERT_FALSE(thread->waitForFinish(WAIT_THREAD_TIME_MS));
}
TEST_F(ConditionVariable, wait_is_block_when_notifyAll_before_wait) {
variable->notifyAll();
createThread();
ASSERT_FALSE(thread->waitForFinish(WAIT_THREAD_TIME_MS));
}
TEST_F(ConditionVariable, wait_is_unblock_when_notifyOne_after_wait) {
createThread();
variable->notifyOne();
ASSERT_TRUE(thread->waitForFinish(WAIT_THREAD_TIME_MS));
}
TEST_F(ConditionVariable, wait_is_unblock_when_notifyAll_after_wait) {
PIVector<StdFunctionThreadFuncAdapter*> adapters;
PIVector<PIThread*> threads;
for (int i = 0; i < THREAD_COUNT; ++i) {
adapters.push_back(new StdFunctionThreadFuncAdapter(adapterFunctionDefault));
threads.push_back(new PIThread(adapters.back()->data(), adapters.back()->threadFunc()));
}
piForeach(PIThread* thread, threads) thread->startOnce();
piMSleep(WAIT_THREAD_TIME_MS * THREAD_COUNT);
variable->notifyAll();
PITimeMeasurer measurer;
piForeach(PIThread* thread, threads) {
int timeout = WAIT_THREAD_TIME_MS * THREAD_COUNT - (int)measurer.elapsed_m();
thread->waitForFinish(timeout > 0 ? timeout : 0);
}
for (size_t i = 0; i < threads.size(); ++i) EXPECT_FALSE(threads[i]->isRunning()) << "Thread " << i << " still running";
piForeach(PIThread* thread, threads) delete thread;
piForeach(StdFunctionThreadFuncAdapter* adapter, adapters) delete adapter;
}
TEST_F(ConditionVariable, wait_is_one_unblock_when_notifyOne) {
PIVector<StdFunctionThreadFuncAdapter*> adapters;
PIVector<PIThread*> threads;
for (int i = 0; i < THREAD_COUNT; ++i) {
adapters.push_back(new StdFunctionThreadFuncAdapter(adapterFunctionDefault));
threads.push_back(new PIThread(adapters.back()->data(), adapters.back()->threadFunc()));
}
piForeach(PIThread* thread, threads) thread->startOnce();
piMSleep(WAIT_THREAD_TIME_MS * THREAD_COUNT);
variable->notifyOne();
piMSleep(WAIT_THREAD_TIME_MS * THREAD_COUNT);
int runningThreadCount = 0;
piForeach(PIThread* thread, threads) if (thread->isRunning()) runningThreadCount++;
ASSERT_EQ(runningThreadCount, THREAD_COUNT - 1);
}
TEST_F(ConditionVariable, wait_is_protected_unblock_when_notifyOne) {
createThread([&](){
m.lock();
variable->wait(m);
piMSleep(2 * WAIT_THREAD_TIME_MS);
// Missing unlock
});
variable->notifyOne();
msleep(WAIT_THREAD_TIME_MS);
ASSERT_FALSE(m.tryLock());
}
TEST_F(ConditionVariable, wait_condition_is_block) {
createThread([&](){
m.lock();
variable->wait(m, [](){ return false; });
m.unlock();
});
ASSERT_FALSE(thread->waitForFinish(WAIT_THREAD_TIME_MS));
}
TEST_F(ConditionVariable, wait_condition_is_check_condition_before_block) {
bool isConditionChecked = false;
createThread([&](){
m.lock();
variable->wait(m, [&](){
isConditionChecked = true;
return false;
});
m.unlock();
});
m.lock();
ASSERT_TRUE(isConditionChecked);
m.unlock();
}
TEST_F(ConditionVariable, wait_condition_is_check_condition_when_notifyOne) {
bool isConditionChecked;
createThread([&](){
m.lock();
variable->wait(m, [&](){
isConditionChecked = true;
return false;
});
m.unlock();
});
m.lock();
isConditionChecked = false;
m.unlock();
variable->notifyOne();
msleep(threadStartTime + 1);
m.lock();
ASSERT_TRUE(isConditionChecked);
m.unlock();
}
TEST_F(ConditionVariable, wait_condition_is_unblock_when_condition_and_notifyOne) {
bool condition = false;
createThread([&](){
m.lock();
variable->wait(m, [&](){ return condition; });
m.unlock();
});
m.lock();
condition = true;
m.unlock();
variable->notifyOne();
ASSERT_TRUE(thread->waitForFinish(WAIT_THREAD_TIME_MS));
}
TEST_F(ConditionVariable, DISABLED_waitFor_is_block_before_timeout) {
createThread([&](){
PITimeMeasurer measurer;
m.lock();
variable->waitFor(m, WAIT_THREAD_TIME_MS * 2);
m.unlock();
// Not reliable because spurious wakeup may happen
ASSERT_GE(measurer.elapsed_m(), WAIT_THREAD_TIME_MS);
});
EXPECT_TRUE(thread->waitForFinish(WAIT_THREAD_TIME_MS * 3));
}
TEST_F(ConditionVariable, waitFor_is_unblock_when_timeout) {
volatile bool isUnblock = false;
createThread([&](){
m.lock();
variable->waitFor(m, WAIT_THREAD_TIME_MS);
isUnblock = true;
m.unlock();
});
// Test failed if suspend forever
EXPECT_TRUE(thread->waitForFinish(2 * WAIT_THREAD_TIME_MS));
ASSERT_TRUE(isUnblock);
}
TEST_F(ConditionVariable, waitFor_is_false_when_timeout) {
bool waitRet = true;
createThread([&](){
m.lock();
waitRet = variable->waitFor(m, WAIT_THREAD_TIME_MS);
m.unlock();
});
EXPECT_TRUE(thread->waitForFinish(2 * WAIT_THREAD_TIME_MS));
ASSERT_FALSE(waitRet);
}
TEST_F(ConditionVariable, waitFor_is_unblock_when_condition_and_notifyOne) {
bool condition = false;
createThread([&](){
m.lock();
variable->waitFor(m, 3 * WAIT_THREAD_TIME_MS, [&](){ return condition; });
m.unlock();
});
EXPECT_TRUE(thread->isRunning());
m.lock();
condition = true;
m.unlock();
variable->notifyOne();
msleep(WAIT_THREAD_TIME_MS);
ASSERT_FALSE(thread->isRunning());
}

View File

@@ -0,0 +1,33 @@
//
// Created by fomenko on 24.09.2019.
//
#include "gtest/gtest.h"
#include "executor.h"
#include "pimutex.h"
const int WAIT_THREAD_TIME_MS = 30;
TEST(ExcutorIntegrationTest, execute_is_runnable_invoke) {
PIMutex m;
int invokedRunnables = 0;
PIThreadPoolExecutor executorService(1);
executorService.execute([&]() {
m.lock();
invokedRunnables++;
m.unlock();
});
piMSleep(WAIT_THREAD_TIME_MS);
ASSERT_EQ(invokedRunnables, 1);
}
TEST(ExcutorIntegrationTest, execute_is_not_execute_after_shutdown) {
bool isRunnableInvoke = false;
PIThreadPoolExecutor executorService(1);
executorService.shutdown();
executorService.execute([&]() {
isRunnableInvoke = true;
});
piMSleep(WAIT_THREAD_TIME_MS);
ASSERT_FALSE(isRunnableInvoke);
}

View File

@@ -0,0 +1,127 @@
//
// Created by fomenko on 23.09.2019.
//
#include "executor.h"
#include "gtest/gtest.h"
#include "gmock/gmock.h"
using ::testing::_;
using ::testing::SetArgReferee;
using ::testing::DoAll;
using ::testing::DeleteArg;
using ::testing::Return;
using ::testing::AtLeast;
using ::testing::ByRef;
using ::testing::Eq;
typedef std::function<void()> VoidFunc;
namespace std {
inline bool operator ==(const VoidFunc& s, const VoidFunc& v) {
// TODO VoidFunc operator ==
return true;
}
}
const int THREAD_COUNT = 2;
class MockThread : public AbstractThread {
public:
MOCK_METHOD0(start, bool());
MOCK_METHOD0(stop, void());
MOCK_METHOD1(waitForStart, bool(int timeout_msecs));
MOCK_METHOD1(waitForFinish, bool(int timeout_msecs));
};
class MockDeque : public PIBlockingDequeue<VoidFunc> {
public:
MOCK_METHOD1(offer, bool(const VoidFunc&));
MOCK_METHOD0(take, VoidFunc());
MOCK_METHOD2(poll, VoidFunc(int timeoutMs, const VoidFunc& defaultVal));
MOCK_METHOD0(capacity, size_t());
MOCK_METHOD0(remainingCapacity, size_t());
};
class MockThreadFactory : public PIThreadFactory {
public:
int callCount = 0;
PIVector<MockThread*> threads;
std::function<void(MockThread*)> checkThreadExpectations = [](MockThread* thread){
EXPECT_CALL(*thread, start())
.WillOnce(Return(true));
EXPECT_CALL(*thread, stop())
.WillOnce(Return());
};
std::function<void(const VoidFunc& fun)> checkThreadFunc = [](const VoidFunc& fun) { };
AbstractThread* newThread(const VoidFunc& fun) override {
callCount++;
auto* thread = new MockThread();
threads.push_back(thread);
checkThreadExpectations(thread);
checkThreadFunc(fun);
return threads.back();
}
};
TEST(ExecutorUnitTest, is_corePool_created) {
auto* deque = new MockDeque();
auto* threadFactory = new MockThreadFactory();
PIThreadPoolExecutor executor(THREAD_COUNT, deque, threadFactory);
ASSERT_EQ(THREAD_COUNT, threadFactory->callCount);
}
TEST(ExecutorUnitTest, is_corePool_started) {
auto* deque = new MockDeque();
auto* threadFactory = new MockThreadFactory();
PIThreadPoolExecutor executor(THREAD_COUNT, deque, threadFactory);
}
TEST(ExecutorUnitTest, execute_is_added_to_taskQueue) {
VoidFunc voidFunc = [](){};
auto* deque = new MockDeque();
EXPECT_CALL(*deque, offer(Eq(voidFunc)))
.WillOnce(Return(true));
auto* threadFactory = new MockThreadFactory();
PIThreadPoolExecutor executor(THREAD_COUNT, deque, threadFactory);
executor.execute([]() {});
}
TEST(ExecutorUnitTest, is_corePool_execute_queue_elements) {
auto* deque = new MockDeque();
auto* threadFactory = new MockThreadFactory();
threadFactory->checkThreadFunc = [](const VoidFunc& fun) {
fun();
};
ON_CALL(*deque, take())
.WillByDefault(Return([](){}));
EXPECT_CALL(*deque, poll(_, _))
.Times(THREAD_COUNT)
.WillRepeatedly(Return([](){}));
PIThreadPoolExecutor executor(THREAD_COUNT, deque, threadFactory);
}
TEST(ExecutorUnitTest, shutdown_is_stop_threads) {
auto* deque = new MockDeque();
auto* threadFactory = new MockThreadFactory();
PIVector<VoidFunc> threadFuncs;
threadFactory->checkThreadExpectations = [](MockThread* thread) {
EXPECT_CALL(*thread, start())
.WillOnce(Return(true));
EXPECT_CALL(*thread, stop())
.WillRepeatedly(Return());
};
threadFactory->checkThreadFunc = [&](const VoidFunc& threadFunc) { threadFuncs.push_back(threadFunc); };
ON_CALL(*deque, take())
.WillByDefault(Return(VoidFunc()));
EXPECT_CALL(*deque, poll(_, _))
.Times(THREAD_COUNT)
.WillRepeatedly(Return(VoidFunc()));
PIThreadPoolExecutor executor(THREAD_COUNT, deque, threadFactory);
executor.shutdown();
piForeachC(VoidFunc& threadFunc, threadFuncs) threadFunc();
}

View File

@@ -0,0 +1,60 @@
//
// Created by fomenko on 27.09.2019.
//
#ifndef AWRCANFLASHER_TESTUTIL_H
#define AWRCANFLASHER_TESTUTIL_H
#include "pithread.h"
/**
* Minimum wait thread start, switch context or another interthread communication action time. Increase it if tests
* write "Start thread timeout reach!" message. You can reduce it if you want increase test performance.
*/
const int WAIT_THREAD_TIME_MS = 40;
const int THREAD_COUNT = 5;
class TestUtil: public PIObject {
PIOBJECT(TestUtil)
public:
double threadStartTime;
PIThread* thread = new PIThread();
StdFunctionThreadFuncAdapter* adapter = nullptr;
volatile bool isRunning;
std::function<void()> adapterFunctionDefault;
bool createThread(const std::function<void()>& fun = nullptr, PIThread* thread_ = nullptr) {
adapter = new StdFunctionThreadFuncAdapter(fun == nullptr ? adapterFunctionDefault : fun);
if (thread_ == nullptr) thread_ = thread;
adapter->registerToInvoke(thread_);
thread_->startOnce();
return waitThread(thread_);
}
bool waitThread(PIThread* thread_, bool runningStatus = true) {
PITimeMeasurer measurer;
bool isTimeout = !thread_->waitForStart(WAIT_THREAD_TIME_MS);
while (!isRunning) {
isTimeout = WAIT_THREAD_TIME_MS <= measurer.elapsed_m();
if (isTimeout) break;
piMSleep(1);
}
threadStartTime = measurer.elapsed_m();
if (isTimeout) piCout << "Start thread timeout reach!";
if (threadStartTime > 1) {
piCout << "Start time" << threadStartTime << "ms";
} else if (threadStartTime > 0.001) {
piCout << "Start time" << threadStartTime * 1000 << "mcs";
} else {
piCout << "Start time" << threadStartTime * 1000 * 1000 << "ns";
}
return !isTimeout;
}
};
#endif //AWRCANFLASHER_TESTUTIL_H