From 92ac2b12cf2b3681354800601ec3dd722ac76a64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=91=D1=8B=D1=87=D0=BA=D0=BE=D0=B2=20=D0=90=D0=BD=D0=B4?= =?UTF-8?q?=D1=80=D0=B5=D0=B9?= Date: Tue, 25 Feb 2020 15:58:02 +0000 Subject: [PATCH] fix concurrent git-svn-id: svn://db.shs.com.ru/pip@884 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5 --- src_concurrent/executor.cpp | 67 ++--- src_concurrent/piconditionlock.cpp | 60 +++-- src_concurrent/piconditionvar.cpp | 122 ++++----- src_main/concurrent/executor.h | 83 ++---- src_main/concurrent/piblockingdequeue.h | 340 ++++++++++++------------ src_main/concurrent/piconditionlock.h | 35 ++- src_main/concurrent/piconditionvar.h | 168 ++++++------ 7 files changed, 408 insertions(+), 467 deletions(-) diff --git a/src_concurrent/executor.cpp b/src_concurrent/executor.cpp index 6b537c32..dbd489b1 100644 --- a/src_concurrent/executor.cpp +++ b/src_concurrent/executor.cpp @@ -1,54 +1,55 @@ -// -// Created by fomenko on 23.09.2019. -// - #include "executor.h" -PIThreadPoolExecutor::PIThreadPoolExecutor(size_t corePoolSize, PIBlockingDequeue> *taskQueue_, - PIThreadFactory *threadFactory) : isShutdown_(false), taskQueue(taskQueue_), threadFactory(threadFactory) { - for (size_t i = 0; i < corePoolSize; ++i) { - AbstractThread* thread = threadFactory->newThread([&, i](){ - auto runnable = taskQueue->poll(100, std::function()); - if (runnable) { - runnable(); - } - if (isShutdown_ && taskQueue->size() == 0) threadPool[i]->stop(); - }); - threadPool.push_back(thread); - thread->start(); - } + +PIThreadPoolExecutor::PIThreadPoolExecutor(size_t corePoolSize, PIBlockingDequeue> *taskQueue_) : isShutdown_(false), taskQueue(taskQueue_) { + for (size_t i = 0; i < corePoolSize; ++i) { + PIThread * thread = new PIThread([&, i](){ + auto runnable = taskQueue->poll(100, std::function()); + if (runnable) { + runnable(); + } + if (isShutdown_ && taskQueue->size() == 0) threadPool[i]->stop(); + }); + threadPool.push_back(thread); + thread->start(); + } } + bool PIThreadPoolExecutor::awaitTermination(int timeoutMs) { - PITimeMeasurer measurer; - for (size_t i = 0; i < threadPool.size(); ++i) { - int dif = timeoutMs - (int)measurer.elapsed_m(); - if (dif < 0) return false; - if (!threadPool[i]->waitForFinish(dif)) return false; - } - return true; + PITimeMeasurer measurer; + for (size_t i = 0; i < threadPool.size(); ++i) { + int dif = timeoutMs - (int)measurer.elapsed_m(); + if (dif < 0) return false; + if (!threadPool[i]->waitForFinish(dif)) return false; + } + return true; } + void PIThreadPoolExecutor::shutdownNow() { - isShutdown_ = true; - for (size_t i = 0; i < threadPool.size(); ++i) threadPool[i]->stop(); + isShutdown_ = true; + for (size_t i = 0; i < threadPool.size(); ++i) threadPool[i]->stop(); } + PIThreadPoolExecutor::~PIThreadPoolExecutor() { - shutdownNow(); - while (threadPool.size() > 0) delete threadPool.take_back(); - delete threadFactory; - delete taskQueue; + shutdownNow(); + while (threadPool.size() > 0) delete threadPool.take_back(); + delete taskQueue; } + void PIThreadPoolExecutor::execute(const std::function &runnable) { - if (!isShutdown_) taskQueue->offer(runnable); + if (!isShutdown_) taskQueue->offer(runnable); } + volatile bool PIThreadPoolExecutor::isShutdown() const { - return isShutdown_; + return isShutdown_; } + void PIThreadPoolExecutor::shutdown() { - isShutdown_ = true; + isShutdown_ = true; } diff --git a/src_concurrent/piconditionlock.cpp b/src_concurrent/piconditionlock.cpp index 5c1f4ec7..668326f9 100644 --- a/src_concurrent/piconditionlock.cpp +++ b/src_concurrent/piconditionlock.cpp @@ -1,7 +1,3 @@ -// -// Created by fomenko on 25.09.2019. -// - #include "piconditionlock.h" #ifdef WINDOWS #include "synchapi.h" @@ -9,70 +5,82 @@ #include "pthread.h" #endif + PRIVATE_DEFINITION_START(PIConditionLock) #ifdef WINDOWS - CRITICAL_SECTION +CRITICAL_SECTION #else - pthread_mutex_t +pthread_mutex_t #endif - nativeHandle; +nativeHandle; PRIVATE_DEFINITION_END(PIConditionLock) + #ifdef WINDOWS PIConditionLock::PIConditionLock() { - InitializeCriticalSection(&PRIVATE->nativeHandle); + InitializeCriticalSection(&PRIVATE->nativeHandle); } + PIConditionLock::~PIConditionLock() { - DeleteCriticalSection(&PRIVATE->nativeHandle); + DeleteCriticalSection(&PRIVATE->nativeHandle); } + void PIConditionLock::lock() { - EnterCriticalSection(&PRIVATE->nativeHandle); + EnterCriticalSection(&PRIVATE->nativeHandle); } + void PIConditionLock::unlock() { - LeaveCriticalSection(&PRIVATE->nativeHandle); + LeaveCriticalSection(&PRIVATE->nativeHandle); } + void *PIConditionLock::handle() { - return &PRIVATE->nativeHandle; + return &PRIVATE->nativeHandle; } + bool PIConditionLock::tryLock() { - return TryEnterCriticalSection(&PRIVATE->nativeHandle) != 0; + return TryEnterCriticalSection(&PRIVATE->nativeHandle) != 0; } - #else + PIConditionLock::PIConditionLock() { - pthread_mutexattr_t attr; - memset(&attr, 0, sizeof(attr)); - pthread_mutexattr_init(&attr); - pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); - memset(&(PRIVATE->nativeHandle), 0, sizeof(PRIVATE->nativeHandle)); - pthread_mutex_init(&(PRIVATE->nativeHandle), &attr); - pthread_mutexattr_destroy(&attr); + pthread_mutexattr_t attr; + memset(&attr, 0, sizeof(attr)); + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); + memset(&(PRIVATE->nativeHandle), 0, sizeof(PRIVATE->nativeHandle)); + pthread_mutex_init(&(PRIVATE->nativeHandle), &attr); + pthread_mutexattr_destroy(&attr); } + PIConditionLock::~PIConditionLock() { - pthread_mutex_destroy(&(PRIVATE->nativeHandle)); + pthread_mutex_destroy(&(PRIVATE->nativeHandle)); } + void PIConditionLock::lock() { - pthread_mutex_lock(&(PRIVATE->nativeHandle)); + pthread_mutex_lock(&(PRIVATE->nativeHandle)); } + void PIConditionLock::unlock() { - pthread_mutex_unlock(&(PRIVATE->nativeHandle)); + pthread_mutex_unlock(&(PRIVATE->nativeHandle)); } + void *PIConditionLock::handle() { - return &PRIVATE->nativeHandle; + return &PRIVATE->nativeHandle; } + bool PIConditionLock::tryLock() { - return (pthread_mutex_trylock(&(PRIVATE->nativeHandle)) == 0);; + return (pthread_mutex_trylock(&(PRIVATE->nativeHandle)) == 0); } #endif diff --git a/src_concurrent/piconditionvar.cpp b/src_concurrent/piconditionvar.cpp index f11f57b6..a52558e9 100644 --- a/src_concurrent/piconditionvar.cpp +++ b/src_concurrent/piconditionvar.cpp @@ -1,8 +1,7 @@ -// -// Created by fomenko on 20.09.2019. -// - #include "piplatform.h" +#include "piconditionvar.h" +#include "pithread.h" +#include "pitime.h" #ifdef WINDOWS #define _WIN32_WINNT 0x0600 @@ -11,118 +10,113 @@ #include #endif -#include "piconditionvar.h" -#include "pithread.h" -#include "pitime.h" PRIVATE_DEFINITION_START(PIConditionVariable) #ifdef WINDOWS - CONDITION_VARIABLE nativeHandle; +CONDITION_VARIABLE nativeHandle; #else - pthread_cond_t nativeHandle; - PIConditionLock* currentLock; +pthread_cond_t nativeHandle; +PIConditionLock* currentLock; #endif - bool isDestroying; +bool isDestroying; PRIVATE_DEFINITION_END(PIConditionVariable) + PIConditionVariable::PIConditionVariable() { #ifdef WINDOWS - InitializeConditionVariable(&PRIVATE->nativeHandle); + InitializeConditionVariable(&PRIVATE->nativeHandle); #else - PRIVATE->isDestroying = false; - PRIVATE->currentLock = nullptr; - memset(&(PRIVATE->nativeHandle), 0, sizeof(PRIVATE->nativeHandle)); - pthread_cond_init(&PRIVATE->nativeHandle, NULL); + PRIVATE->isDestroying = false; + PRIVATE->currentLock = nullptr; + memset(&(PRIVATE->nativeHandle), 0, sizeof(PRIVATE->nativeHandle)); + pthread_cond_init(&PRIVATE->nativeHandle, NULL); #endif } + PIConditionVariable::~PIConditionVariable() { #ifdef WINDOWS #else - pthread_cond_destroy(&PRIVATE->nativeHandle); + pthread_cond_destroy(&PRIVATE->nativeHandle); #endif - } + void PIConditionVariable::wait(PIConditionLock& lk) { #ifdef WINDOWS - SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), INFINITE); + SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), INFINITE); #else - pthread_cond_wait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle()); + pthread_cond_wait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle()); #endif } + void PIConditionVariable::wait(PIConditionLock& lk, const std::function& condition) { - bool isCondition; - while (true) { - isCondition = condition(); - if (isCondition) break; + bool isCondition; + while (true) { + isCondition = condition(); + if (isCondition) break; #ifdef WINDOWS - SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), INFINITE); + SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), INFINITE); #else - pthread_cond_wait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle()); + pthread_cond_wait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle()); #endif - if (PRIVATE->isDestroying) return; - } + if (PRIVATE->isDestroying) return; + } } + bool PIConditionVariable::waitFor(PIConditionLock &lk, int timeoutMs) { - bool isNotTimeout; + bool isNotTimeout; #ifdef WINDOWS - isNotTimeout = SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), timeoutMs) != 0; + isNotTimeout = SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), timeoutMs) != 0; #else - timespec abstime = {.tv_sec = timeoutMs / 1000, .tv_nsec = timeoutMs * 1000 * 1000}; - isNotTimeout = pthread_cond_timedwait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle(), &abstime) == 0; + timespec abstime = {.tv_sec = timeoutMs / 1000, .tv_nsec = timeoutMs * 1000 * 1000}; + isNotTimeout = pthread_cond_timedwait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle(), &abstime) == 0; #endif - if (PRIVATE->isDestroying) return false; - return isNotTimeout; + if (PRIVATE->isDestroying) return false; + return isNotTimeout; } + bool PIConditionVariable::waitFor(PIConditionLock& lk, int timeoutMs, const std::function &condition) { - bool isCondition; - PITimeMeasurer measurer; - while (true) { - isCondition = condition(); - if (isCondition) break; + bool isCondition; + PITimeMeasurer measurer; + while (true) { + isCondition = condition(); + if (isCondition) break; #ifdef WINDOWS - WINBOOL isTimeout = SleepConditionVariableCS( - &PRIVATE->nativeHandle, - (PCRITICAL_SECTION)lk.handle(), - timeoutMs - (int)measurer.elapsed_m()); - if (isTimeout == 0) return false; + WINBOOL isTimeout = SleepConditionVariableCS( + &PRIVATE->nativeHandle, + (PCRITICAL_SECTION)lk.handle(), + timeoutMs - (int)measurer.elapsed_m()); + if (isTimeout == 0) return false; #else - int timeoutCurr = timeoutMs - (int)measurer.elapsed_m(); - timespec abstime = {.tv_sec = timeoutCurr / 1000, .tv_nsec = timeoutCurr * 1000 * 1000}; - bool isTimeout = pthread_cond_timedwait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle(), &abstime) == 0; - if (isTimeout) return false; + int timeoutCurr = timeoutMs - (int)measurer.elapsed_m(); + timespec abstime = {.tv_sec = timeoutCurr / 1000, .tv_nsec = timeoutCurr * 1000 * 1000}; + bool isTimeout = pthread_cond_timedwait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle(), &abstime) == 0; + if (isTimeout) return false; #endif - if (PRIVATE->isDestroying) return false; - } - return true; + if (PRIVATE->isDestroying) return false; + } + return true; } void PIConditionVariable::notifyOne() { #ifdef WINDOWS - WakeConditionVariable(&PRIVATE->nativeHandle); + WakeConditionVariable(&PRIVATE->nativeHandle); #else - pthread_cond_signal(&PRIVATE->nativeHandle); + pthread_cond_signal(&PRIVATE->nativeHandle); #endif } + + void PIConditionVariable::notifyAll() { #ifdef WINDOWS - WakeAllConditionVariable(&PRIVATE->nativeHandle); + WakeAllConditionVariable(&PRIVATE->nativeHandle); #else - pthread_cond_broadcast(&PRIVATE->nativeHandle); + pthread_cond_broadcast(&PRIVATE->nativeHandle); #endif } -void StdFunctionThreadFuncAdapter::threadFuncStdFunctionAdapter(void *it) { - auto consumer = (StdFunctionThreadFuncAdapter*)it; - consumer->fun(); -} - -void StdFunctionThreadFuncAdapter::registerToInvoke(PIThread *thread) { - thread->setData(data()); - thread->setSlot((ThreadFunc) threadFunc()); -} diff --git a/src_main/concurrent/executor.h b/src_main/concurrent/executor.h index 7095172f..f4568af0 100644 --- a/src_main/concurrent/executor.h +++ b/src_main/concurrent/executor.h @@ -5,46 +5,8 @@ #ifndef PIP_TESTS_EXECUTOR_H #define PIP_TESTS_EXECUTOR_H -#include -#include -#include #include "piblockingdequeue.h" -class AbstractThread { -public: - virtual bool start() = 0; - virtual bool waitForStart(int timeout_msecs) = 0; - virtual bool waitForFinish(int timeout_msecs) = 0; - virtual void stop() = 0; - virtual ~AbstractThread() = default; -}; - -class Thread : public AbstractThread { -public: - explicit Thread(const std::function& fun = [](){}) : adapter(fun) { - adapter.registerToInvoke(&thread); - } - virtual ~Thread() = default; - - inline bool start() override { return thread.start(); } - inline bool waitForStart(int timeout_msecs) override { return thread.waitForStart(timeout_msecs); } - inline bool waitForFinish(int timeout_msecs) override { return thread.waitForFinish(timeout_msecs); } - inline void stop() override { thread.stop(); } - -private: - PIThread thread; - StdFunctionThreadFuncAdapter adapter; -}; - -class PIThreadFactory { -public: - inline virtual AbstractThread* newThread(const std::function& fun) { - return new Thread(fun); - } - virtual ~PIThreadFactory() = default; -}; - - /** * @brief Thread pools address two different problems: they usually provide improved performance when executing large * numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and @@ -52,36 +14,35 @@ public: */ class PIThreadPoolExecutor { public: - explicit PIThreadPoolExecutor(size_t corePoolSize = 1, PIBlockingDequeue >* taskQueue_ = new PIBlockingDequeue >(), PIThreadFactory* threadFactory = new PIThreadFactory()); + explicit PIThreadPoolExecutor(size_t corePoolSize = 1, PIBlockingDequeue >* taskQueue_ = new PIBlockingDequeue >()); - virtual ~PIThreadPoolExecutor(); + virtual ~PIThreadPoolExecutor(); - /** - * @brief Executes the given task sometime in the future. The task execute in an existing pooled thread. If the task - * cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been - * reached. - * - * @param runnable not empty function for thread pool execution - */ - void execute(const std::function& runnable); + /** + * @brief Executes the given task sometime in the future. The task execute in an existing pooled thread. If the task + * cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been + * reached. + * + * @param runnable not empty function for thread pool execution + */ + void execute(const std::function& runnable); - void shutdownNow(); + void shutdownNow(); - /** - * @brief Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be - * accepted. Invocation has no additional effect if already shut down. This method does not wait for previously - * submitted tasks to complete execution. Use awaitTermination to do that. - */ - void shutdown(); + /** + * @brief Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be + * accepted. Invocation has no additional effect if already shut down. This method does not wait for previously + * submitted tasks to complete execution. Use awaitTermination to do that. + */ + void shutdown(); - volatile bool isShutdown() const; + volatile bool isShutdown() const; - bool awaitTermination(int timeoutMs); + bool awaitTermination(int timeoutMs); private: - volatile bool isShutdown_; - PIBlockingDequeue >* taskQueue; - PIThreadFactory* threadFactory; - PIVector threadPool; + volatile bool isShutdown_; + PIBlockingDequeue >* taskQueue; + PIVector threadPool; }; #endif //PIP_TESTS_EXECUTOR_H diff --git a/src_main/concurrent/piblockingdequeue.h b/src_main/concurrent/piblockingdequeue.h index d9c30000..98abb021 100644 --- a/src_main/concurrent/piblockingdequeue.h +++ b/src_main/concurrent/piblockingdequeue.h @@ -16,190 +16,190 @@ template class PIBlockingDequeue: private PIDeque { public: - /** - * @brief Constructor - */ - explicit inline PIBlockingDequeue(size_t capacity = SIZE_MAX, - PIConditionVariable* cond_var_add = new PIConditionVariable(), - PIConditionVariable* cond_var_rem = new PIConditionVariable()) - : cond_var_add(cond_var_add), cond_var_rem(cond_var_rem), max_size(capacity) { } + /** + * @brief Constructor + */ + explicit inline PIBlockingDequeue(size_t capacity = SIZE_MAX, + PIConditionVariable* cond_var_add = new PIConditionVariable(), + PIConditionVariable* cond_var_rem = new PIConditionVariable()) + : cond_var_add(cond_var_add), cond_var_rem(cond_var_rem), max_size(capacity) { } - /** - * @brief Copy constructor. Initialize queue with copy of other queue elements. Not thread-safe for other queue. - */ - explicit inline PIBlockingDequeue(const PIDeque& other) : cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) { - mutex.lock(); - max_size = SIZE_MAX; - PIDeque::append(other); - mutex.unlock(); - } + /** + * @brief Copy constructor. Initialize queue with copy of other queue elements. Not thread-safe for other queue. + */ + explicit inline PIBlockingDequeue(const PIDeque& other) : cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) { + mutex.lock(); + max_size = SIZE_MAX; + PIDeque::append(other); + mutex.unlock(); + } - /** - * @brief Thread-safe copy constructor. Initialize queue with copy of other queue elements. - */ - inline PIBlockingDequeue(PIBlockingDequeue & other) : cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) { - other.mutex.lock(); - mutex.lock(); - max_size = other.max_size; - PIDeque::append(static_cast&>(other)); - mutex.unlock(); - other.mutex.unlock(); - } - virtual ~PIBlockingDequeue() { - delete cond_var_add; - delete cond_var_rem; - } + /** + * @brief Thread-safe copy constructor. Initialize queue with copy of other queue elements. + */ + inline PIBlockingDequeue(PIBlockingDequeue & other) : cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) { + other.mutex.lock(); + mutex.lock(); + max_size = other.max_size; + PIDeque::append(static_cast&>(other)); + mutex.unlock(); + other.mutex.unlock(); + } + virtual ~PIBlockingDequeue() { + delete cond_var_add; + delete cond_var_rem; + } - /** - * @brief Inserts the specified element into this queue, waiting if necessary for space to become available. - * - * @param v the element to add - */ - virtual void put(const T & v) { - mutex.lock(); - cond_var_rem->wait(mutex, [&]() { return PIDeque::size() < max_size; }); - PIDeque::push_back(v); - mutex.unlock(); - cond_var_add->notifyOne(); - } + /** + * @brief Inserts the specified element into this queue, waiting if necessary for space to become available. + * + * @param v the element to add + */ + virtual void put(const T & v) { + mutex.lock(); + cond_var_rem->wait(mutex, [&]() { return PIDeque::size() < max_size; }); + PIDeque::push_back(v); + mutex.unlock(); + cond_var_add->notifyOne(); + } - /** - * @brief Inserts the specified element at the end of this queue if it is possible to do so immediately without - * exceeding the queue's capacity, returning true upon success and false if this queue is full. - * - * @param v the element to add - * @return true if the element was added to this queue, else false - */ - virtual bool offer(const T & v) { - mutex.lock(); - if (PIDeque::size() >= max_size) { - mutex.unlock(); - return false; - } - PIDeque::push_back(v); - mutex.unlock(); - cond_var_add->notifyOne(); - return true; - } + /** + * @brief Inserts the specified element at the end of this queue if it is possible to do so immediately without + * exceeding the queue's capacity, returning true upon success and false if this queue is full. + * + * @param v the element to add + * @return true if the element was added to this queue, else false + */ + virtual bool offer(const T & v) { + mutex.lock(); + if (PIDeque::size() >= max_size) { + mutex.unlock(); + return false; + } + PIDeque::push_back(v); + mutex.unlock(); + cond_var_add->notifyOne(); + return true; + } - /** - * @brief Inserts the specified element into this queue, waiting up to the specified wait time if necessary for - * space to become available. - * - * @param v the element to add - * @param timeoutMs how long to wait before giving up, in milliseconds - * @return true if successful, or false if the specified waiting time elapses before space is available - */ - virtual bool offer(const T & v, int timeoutMs) { - mutex.lock(); - bool isOk = cond_var_rem->waitFor(mutex, timeoutMs, [&]() { return PIDeque::size() < max_size; } ); - if (isOk) PIDeque::push_back(v); - mutex.unlock(); - if (isOk) cond_var_add->notifyOne(); - return isOk; - } + /** + * @brief Inserts the specified element into this queue, waiting up to the specified wait time if necessary for + * space to become available. + * + * @param v the element to add + * @param timeoutMs how long to wait before giving up, in milliseconds + * @return true if successful, or false if the specified waiting time elapses before space is available + */ + virtual bool offer(const T & v, int timeoutMs) { + mutex.lock(); + bool isOk = cond_var_rem->waitFor(mutex, timeoutMs, [&]() { return PIDeque::size() < max_size; } ); + if (isOk) PIDeque::push_back(v); + mutex.unlock(); + if (isOk) cond_var_add->notifyOne(); + return isOk; + } - /** - * @brief Retrieves and removes the head of this queue, waiting if necessary until an element becomes available. - * - * @return the head of this queue - */ - virtual T take() { - T t; - mutex.lock(); - cond_var_add->wait(mutex, [&]() { return !PIDeque::isEmpty(); }); - t = T(PIDeque::take_front()); - mutex.unlock(); - cond_var_rem->notifyOne(); - return t; - } + /** + * @brief Retrieves and removes the head of this queue, waiting if necessary until an element becomes available. + * + * @return the head of this queue + */ + virtual T take() { + T t; + mutex.lock(); + cond_var_add->wait(mutex, [&]() { return !PIDeque::isEmpty(); }); + t = T(PIDeque::take_front()); + mutex.unlock(); + cond_var_rem->notifyOne(); + return t; + } - /** - * @brief Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an - * element to become available. - * - * @param timeoutMs how long to wait before giving up, in milliseconds - * @param defaultVal value, which returns if the specified waiting time elapses before an element is available - * @return the head of this queue, or defaultVal if the specified waiting time elapses before an element is available - */ - virtual T poll(int timeoutMs, const T & defaultVal) { - T t; - mutex.lock(); - bool isOk = cond_var_add->waitFor(mutex, timeoutMs, [&]() { return !PIDeque::isEmpty(); }); - t = isOk ? T(PIDeque::take_front()) : defaultVal; - mutex.unlock(); - if (isOk) cond_var_rem->notifyOne(); - return t; - } + /** + * @brief Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an + * element to become available. + * + * @param timeoutMs how long to wait before giving up, in milliseconds + * @param defaultVal value, which returns if the specified waiting time elapses before an element is available + * @return the head of this queue, or defaultVal if the specified waiting time elapses before an element is available + */ + virtual T poll(int timeoutMs, const T & defaultVal) { + T t; + mutex.lock(); + bool isOk = cond_var_add->waitFor(mutex, timeoutMs, [&]() { return !PIDeque::isEmpty(); }); + t = isOk ? T(PIDeque::take_front()) : defaultVal; + mutex.unlock(); + if (isOk) cond_var_rem->notifyOne(); + return t; + } - /** - * @brief Returns the number of elements that this queue can ideally (in the absence of memory or resource - * constraints) contains. This is always equal to the initial capacity of this queue less the current size of this queue. - * - * @return the capacity - */ - virtual size_t capacity() { - size_t c; - mutex.lock(); - c = max_size; - mutex.unlock(); - return c; - } + /** + * @brief Returns the number of elements that this queue can ideally (in the absence of memory or resource + * constraints) contains. This is always equal to the initial capacity of this queue less the current size of this queue. + * + * @return the capacity + */ + virtual size_t capacity() { + size_t c; + mutex.lock(); + c = max_size; + mutex.unlock(); + return c; + } - /** - * @brief Returns the number of additional elements that this queue can ideally (in the absence of memory or resource - * constraints) accept. This is always equal to the initial capacity of this queue less the current size of this queue. - * - * @return the remaining capacity - */ - virtual size_t remainingCapacity() { - mutex.lock(); - size_t c = max_size - PIDeque::size(); - mutex.unlock(); - return c; - } + /** + * @brief Returns the number of additional elements that this queue can ideally (in the absence of memory or resource + * constraints) accept. This is always equal to the initial capacity of this queue less the current size of this queue. + * + * @return the remaining capacity + */ + virtual size_t remainingCapacity() { + mutex.lock(); + size_t c = max_size - PIDeque::size(); + mutex.unlock(); + return c; + } - /** - * @brief Returns the number of elements in this collection. - */ - virtual size_t size() { - mutex.lock(); - size_t s = PIDeque::size(); - mutex.unlock(); - return s; - } + /** + * @brief Returns the number of elements in this collection. + */ + virtual size_t size() { + mutex.lock(); + size_t s = PIDeque::size(); + mutex.unlock(); + return s; + } - /** - * @brief Removes all available elements from this queue and adds them to other given queue. - */ - virtual size_t drainTo(PIDeque& other, size_t maxCount = SIZE_MAX) { - mutex.lock(); - size_t count = maxCount > PIDeque::size() ? PIDeque::size() : maxCount; - for (size_t i = 0; i < count; ++i) other.push_back(PIDeque::take_front()); - mutex.unlock(); - return count; - } + /** + * @brief Removes all available elements from this queue and adds them to other given queue. + */ + virtual size_t drainTo(PIDeque& other, size_t maxCount = SIZE_MAX) { + mutex.lock(); + size_t count = maxCount > PIDeque::size() ? PIDeque::size() : maxCount; + for (size_t i = 0; i < count; ++i) other.push_back(PIDeque::take_front()); + mutex.unlock(); + return count; + } - /** - * @brief Removes all available elements from this queue and adds them to other given queue. - */ - virtual size_t drainTo(PIBlockingDequeue& other, size_t maxCount = SIZE_MAX) { - mutex.lock(); - other.mutex.lock(); - size_t count = maxCount > PIDeque::size() ? PIDeque::size() : maxCount; - size_t otherRemainingCapacity = other.max_size - static_cast>(other).size(); - if (count > otherRemainingCapacity) count = otherRemainingCapacity; - for (size_t i = 0; i < count; ++i) other.push_back(PIDeque::take_front()); - other.mutex.unlock(); - mutex.unlock(); - return count; - } + /** + * @brief Removes all available elements from this queue and adds them to other given queue. + */ + virtual size_t drainTo(PIBlockingDequeue& other, size_t maxCount = SIZE_MAX) { + mutex.lock(); + other.mutex.lock(); + size_t count = maxCount > PIDeque::size() ? PIDeque::size() : maxCount; + size_t otherRemainingCapacity = other.max_size - static_cast>(other).size(); + if (count > otherRemainingCapacity) count = otherRemainingCapacity; + for (size_t i = 0; i < count; ++i) other.push_back(PIDeque::take_front()); + other.mutex.unlock(); + mutex.unlock(); + return count; + } private: - PIConditionLock mutex; - PIConditionVariable* cond_var_add; - PIConditionVariable* cond_var_rem; - size_t max_size; + PIConditionLock mutex; + PIConditionVariable* cond_var_add; + PIConditionVariable* cond_var_rem; + size_t max_size; }; diff --git a/src_main/concurrent/piconditionlock.h b/src_main/concurrent/piconditionlock.h index b119fb0a..88452844 100644 --- a/src_main/concurrent/piconditionlock.h +++ b/src_main/concurrent/piconditionlock.h @@ -1,36 +1,31 @@ -// -// Created by fomenko on 25.09.2019. -// - #ifndef AWRCANFLASHER_PICONDITIONLOCK_H #define AWRCANFLASHER_PICONDITIONLOCK_H -#include -#include +#include "pimutex.h" + /** * @brief Continued */ class PIP_EXPORT PIConditionLock { public: - explicit PIConditionLock(); - virtual ~PIConditionLock(); + explicit PIConditionLock(); + virtual ~PIConditionLock(); - /** - * @brief lock - */ - void lock(); + //! \brief lock + void lock(); - /** - * @brief unlock - */ - void unlock(); + //! \brief unlock + void unlock(); + + //! \brief tryLock + bool tryLock(); + + void * handle(); - bool tryLock(); - void* handle(); private: - NO_COPY_CLASS(PIConditionLock) - PRIVATE_DECLARATION + NO_COPY_CLASS(PIConditionLock) + PRIVATE_DECLARATION }; diff --git a/src_main/concurrent/piconditionvar.h b/src_main/concurrent/piconditionvar.h index 99886b6c..b51124e6 100644 --- a/src_main/concurrent/piconditionvar.h +++ b/src_main/concurrent/piconditionvar.h @@ -1,7 +1,3 @@ -// -// Created by fomenko on 20.09.2019. -// - #ifndef PIP_TESTS_PICONDITIONVAR_H #define PIP_TESTS_PICONDITIONVAR_H @@ -9,6 +5,7 @@ #include "pithread.h" #include "piinit.h" + /** * @brief A condition variable is an object able to block the calling thread until notified to resume. * @@ -17,103 +14,88 @@ */ class PIP_EXPORT PIConditionVariable { public: - explicit PIConditionVariable(); - virtual ~PIConditionVariable(); + explicit PIConditionVariable(); + virtual ~PIConditionVariable(); - /** - * @brief Unblocks one of the threads currently waiting for this condition. If no threads are waiting, the function - * does nothing. If more than one, it is unspecified which of the threads is selected. - */ - virtual void notifyOne(); + /** + * @brief Unblocks one of the threads currently waiting for this condition. If no threads are waiting, the function + * does nothing. If more than one, it is unspecified which of the threads is selected. + */ + virtual void notifyOne(); - /** - * @brief Unblocks all threads currently waiting for this condition. If no threads are waiting, the function does - * nothing. - */ - virtual void notifyAll(); + /** + * @brief Unblocks all threads currently waiting for this condition. If no threads are waiting, the function does + * nothing. + */ + virtual void notifyAll(); - /** - * @brief see wait(PIConditionLock&, const std::function&) - */ - virtual void wait(PIConditionLock& lk); + /** + * @brief see wait(PIConditionLock&, const std::function&) + */ + virtual void wait(PIConditionLock& lk); - /** - * @brief Wait until notified - * - * The execution of the current thread (which shall have locked with lk method PIConditionLock::lock()) is blocked - * until notified. - * - * At the moment of blocking the thread, the function automatically calls lk.unlock() (PIConditionLock::unlock()), - * allowing other locked threads to continue. - * - * Once notified (explicitly, by some other thread), the function unblocks and calls lk.lock() (PIConditionLock::lock()), - * leaving lk in the same state as when the function was called. Then the function returns (notice that this last mutex - * locking may block again the thread before returning). - * - * Generally, the function is notified to wake up by a call in another thread either to member notifyOne() or to - * member notifyAll(). But certain implementations may produce spurious wake-up calls without any of these functions - * being called. Therefore, users of this function shall ensure their condition for resumption is met. - * - * If condition is specified, the function only blocks if condition returns false, and notifications can only unblock - * the thread when it becomes true (which is specially useful to check against spurious wake-up calls). - * - * @param lk lock object used by method wait for data protection - * @param condition A callable object or function that takes no arguments and returns a value that can be evaluated - * as a bool. This is called repeatedly until it evaluates to true. - */ - virtual void wait(PIConditionLock& lk, const std::function& condition); + /** + * @brief Wait until notified + * + * The execution of the current thread (which shall have locked with lk method PIConditionLock::lock()) is blocked + * until notified. + * + * At the moment of blocking the thread, the function automatically calls lk.unlock() (PIConditionLock::unlock()), + * allowing other locked threads to continue. + * + * Once notified (explicitly, by some other thread), the function unblocks and calls lk.lock() (PIConditionLock::lock()), + * leaving lk in the same state as when the function was called. Then the function returns (notice that this last mutex + * locking may block again the thread before returning). + * + * Generally, the function is notified to wake up by a call in another thread either to member notifyOne() or to + * member notifyAll(). But certain implementations may produce spurious wake-up calls without any of these functions + * being called. Therefore, users of this function shall ensure their condition for resumption is met. + * + * If condition is specified, the function only blocks if condition returns false, and notifications can only unblock + * the thread when it becomes true (which is specially useful to check against spurious wake-up calls). + * + * @param lk lock object used by method wait for data protection + * @param condition A callable object or function that takes no arguments and returns a value that can be evaluated + * as a bool. This is called repeatedly until it evaluates to true. + */ + virtual void wait(PIConditionLock& lk, const std::function& condition); - /** - * @brief see waitFor(PIConditionLock&, int, const std::function&) - */ - virtual bool waitFor(PIConditionLock& lk, int timeoutMs); + /** + * @brief see waitFor(PIConditionLock&, int, const std::function&) + */ + virtual bool waitFor(PIConditionLock& lk, int timeoutMs); + + /** + * @brief Wait for timeout or until notified + * + * The execution of the current thread (which shall have locked with lk method PIConditionLock::lock()) is blocked + * during timeoutMs, or until notified (if the latter happens first). + * + * At the moment of blocking the thread, the function automatically calls lk.lock() (PIConditionLock::lock()), allowing + * other locked threads to continue. + * + * Once notified or once timeoutMs has passed, the function unblocks and calls lk.unlock() (PIConditionLock::unlock()), + * leaving lk in the same state as when the function was called. Then the function returns (notice that this last + * mutex locking may block again the thread before returning). + * + * Generally, the function is notified to wake up by a call in another thread either to member notifyOne() or to + * member notifyAll(). But certain implementations may produce spurious wake-up calls without any of these functions + * being called. Therefore, users of this function shall ensure their condition for resumption is met. + * + * If condition is specified, the function only blocks if condition returns false, and notifications can only unblock + * the thread when it becomes true (which is especially useful to check against spurious wake-up calls). + * + * @param lk lock object used by method wait for data protection + * @param condition A callable object or function that takes no arguments and returns a value that can be evaluated + * as a bool. This is called repeatedly until it evaluates to true. + * @return false if timeout reached or true if wakeup condition is true + */ + virtual bool waitFor(PIConditionLock& lk, int timeoutMs, const std::function& condition); - /** - * @brief Wait for timeout or until notified - * - * The execution of the current thread (which shall have locked with lk method PIConditionLock::lock()) is blocked - * during timeoutMs, or until notified (if the latter happens first). - * - * At the moment of blocking the thread, the function automatically calls lk.lock() (PIConditionLock::lock()), allowing - * other locked threads to continue. - * - * Once notified or once timeoutMs has passed, the function unblocks and calls lk.unlock() (PIConditionLock::unlock()), - * leaving lk in the same state as when the function was called. Then the function returns (notice that this last - * mutex locking may block again the thread before returning). - * - * Generally, the function is notified to wake up by a call in another thread either to member notifyOne() or to - * member notifyAll(). But certain implementations may produce spurious wake-up calls without any of these functions - * being called. Therefore, users of this function shall ensure their condition for resumption is met. - * - * If condition is specified, the function only blocks if condition returns false, and notifications can only unblock - * the thread when it becomes true (which is especially useful to check against spurious wake-up calls). - * - * @param lk lock object used by method wait for data protection - * @param condition A callable object or function that takes no arguments and returns a value that can be evaluated - * as a bool. This is called repeatedly until it evaluates to true. - * @return false if timeout reached or true if wakeup condition is true - */ - virtual bool waitFor(PIConditionLock& lk, int timeoutMs, const std::function& condition); private: - NO_COPY_CLASS(PIConditionVariable) - - PRIVATE_DECLARATION + NO_COPY_CLASS(PIConditionVariable) + PRIVATE_DECLARATION }; - -// FIXME: remove that! -class StdFunctionThreadFuncAdapter { -public: - static void threadFuncStdFunctionAdapter(void* it); - - explicit StdFunctionThreadFuncAdapter(const std::function& fun_): fun(fun_) {} - - void registerToInvoke(PIThread* thread); - void* data() const { return (void*)this; } - ThreadFunc threadFunc() const { return threadFuncStdFunctionAdapter; } -private: - std::function fun; -}; - #endif //PIP_TESTS_PICONDITIONVAR_H