diff --git a/src_concurrent/test/BlockingDequeueUnitTest.cpp b/src_concurrent/test/BlockingDequeueUnitTest.cpp index 3b1b9365..e62a22f9 100644 --- a/src_concurrent/test/BlockingDequeueUnitTest.cpp +++ b/src_concurrent/test/BlockingDequeueUnitTest.cpp @@ -39,6 +39,37 @@ public: } }; +TEST(BlockingDequeueUnitTest, put_is_block_when_capacity_reach) { + size_t capacity = 0; + auto conditionVarAdd = new MockConditionVar(); + auto conditionVarRem = new MockConditionVar(); + PIBlockingDequeue dequeue(capacity, conditionVarAdd, conditionVarRem); + dequeue.put(11); + ASSERT_TRUE(conditionVarRem->isWaitCalled); + ASSERT_FALSE(conditionVarRem->isTrueCondition); +} + +TEST(BlockingDequeueUnitTest, offer_timedout_is_false_when_capacity_reach) { + size_t capacity = 0; + int timeout = 11; + auto conditionVarAdd = new MockConditionVar(); + auto conditionVarRem = new MockConditionVar(); + PIBlockingDequeue dequeue(capacity, conditionVarAdd, conditionVarRem); + ASSERT_FALSE(dequeue.offer(11, timeout)); +} + +TEST(BlockingDequeueUnitTest, offer_timedout_is_block_when_capacity_reach) { + size_t capacity = 0; + int timeout = 11; + auto conditionVarAdd = new MockConditionVar(); + auto conditionVarRem = new MockConditionVar(); + PIBlockingDequeue dequeue(capacity, conditionVarAdd, conditionVarRem); + dequeue.offer(11, timeout); + EXPECT_TRUE(conditionVarRem->isWaitForCalled); + EXPECT_EQ(timeout, conditionVarRem->timeout); + ASSERT_FALSE(conditionVarRem->isTrueCondition); +} + TEST(BlockingDequeueUnitTest, offer_is_true_before_capacity_reach) { size_t capacity = 1; PIBlockingDequeue dequeue(capacity); diff --git a/src_main/concurrent/piblockingdequeue.h b/src_main/concurrent/piblockingdequeue.h index e0611e2b..d9c30000 100644 --- a/src_main/concurrent/piblockingdequeue.h +++ b/src_main/concurrent/piblockingdequeue.h @@ -15,18 +15,34 @@ template class PIBlockingDequeue: private PIDeque { public: + + /** + * @brief Constructor + */ explicit inline PIBlockingDequeue(size_t capacity = SIZE_MAX, PIConditionVariable* cond_var_add = new PIConditionVariable(), PIConditionVariable* cond_var_rem = new PIConditionVariable()) : cond_var_add(cond_var_add), cond_var_rem(cond_var_rem), max_size(capacity) { } + + /** + * @brief Copy constructor. Initialize queue with copy of other queue elements. Not thread-safe for other queue. + */ explicit inline PIBlockingDequeue(const PIDeque& other) : cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) { + mutex.lock(); max_size = SIZE_MAX; PIDeque::append(other); + mutex.unlock(); } + + /** + * @brief Thread-safe copy constructor. Initialize queue with copy of other queue elements. + */ inline PIBlockingDequeue(PIBlockingDequeue & other) : cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) { other.mutex.lock(); + mutex.lock(); max_size = other.max_size; PIDeque::append(static_cast&>(other)); + mutex.unlock(); other.mutex.unlock(); } virtual ~PIBlockingDequeue() { @@ -37,8 +53,6 @@ public: /** * @brief Inserts the specified element into this queue, waiting if necessary for space to become available. * - * @todo write tests - * * @param v the element to add */ virtual void put(const T & v) { @@ -68,6 +82,23 @@ public: return true; } + /** + * @brief Inserts the specified element into this queue, waiting up to the specified wait time if necessary for + * space to become available. + * + * @param v the element to add + * @param timeoutMs how long to wait before giving up, in milliseconds + * @return true if successful, or false if the specified waiting time elapses before space is available + */ + virtual bool offer(const T & v, int timeoutMs) { + mutex.lock(); + bool isOk = cond_var_rem->waitFor(mutex, timeoutMs, [&]() { return PIDeque::size() < max_size; } ); + if (isOk) PIDeque::push_back(v); + mutex.unlock(); + if (isOk) cond_var_add->notifyOne(); + return isOk; + } + /** * @brief Retrieves and removes the head of this queue, waiting if necessary until an element becomes available. * @@ -101,6 +132,12 @@ public: return t; } + /** + * @brief Returns the number of elements that this queue can ideally (in the absence of memory or resource + * constraints) contains. This is always equal to the initial capacity of this queue less the current size of this queue. + * + * @return the capacity + */ virtual size_t capacity() { size_t c; mutex.lock(); @@ -122,6 +159,9 @@ public: return c; } + /** + * @brief Returns the number of elements in this collection. + */ virtual size_t size() { mutex.lock(); size_t s = PIDeque::size(); diff --git a/src_main/concurrent/piconditionlock.h b/src_main/concurrent/piconditionlock.h index 158a81cb..b119fb0a 100644 --- a/src_main/concurrent/piconditionlock.h +++ b/src_main/concurrent/piconditionlock.h @@ -16,8 +16,16 @@ public: explicit PIConditionLock(); virtual ~PIConditionLock(); + /** + * @brief lock + */ void lock(); + + /** + * @brief unlock + */ void unlock(); + bool tryLock(); void* handle(); private: diff --git a/src_main/concurrent/piconditionvar.h b/src_main/concurrent/piconditionvar.h index 58fc5e46..08767eb1 100644 --- a/src_main/concurrent/piconditionvar.h +++ b/src_main/concurrent/piconditionvar.h @@ -32,9 +32,67 @@ public: */ virtual void notifyAll(); + /** + * @brief see wait(PIConditionLock&, const std::function&) + */ virtual void wait(PIConditionLock& lk); + + /** + * @brief Wait until notified + * + * The execution of the current thread (which shall have locked with lk method PIConditionLock::lock()) is blocked + * until notified. + * + * At the moment of blocking the thread, the function automatically calls lk.unlock() (PIConditionLock::unlock()), + * allowing other locked threads to continue. + * + * Once notified (explicitly, by some other thread), the function unblocks and calls lk.lock() (PIConditionLock::lock()), + * leaving lk in the same state as when the function was called. Then the function returns (notice that this last mutex + * locking may block again the thread before returning). + * + * Generally, the function is notified to wake up by a call in another thread either to member notifyOne() or to + * member notifyAll(). But certain implementations may produce spurious wake-up calls without any of these functions + * being called. Therefore, users of this function shall ensure their condition for resumption is met. + * + * If condition is specified, the function only blocks if condition returns false, and notifications can only unblock + * the thread when it becomes true (which is specially useful to check against spurious wake-up calls). + * + * @param lk lock object used by method wait for data protection + * @param condition A callable object or function that takes no arguments and returns a value that can be evaluated + * as a bool. This is called repeatedly until it evaluates to true. + */ virtual void wait(PIConditionLock& lk, const std::function& condition); + + /** + * @brief see waitFor(PIConditionLock&, int, const std::function&) + */ virtual bool waitFor(PIConditionLock& lk, int timeoutMs); + + /** + * @brief Wait for timeout or until notified + * + * The execution of the current thread (which shall have locked with lk method PIConditionLock::lock()) is blocked + * during timeoutMs, or until notified (if the latter happens first). + * + * At the moment of blocking the thread, the function automatically calls lk.lock() (PIConditionLock::lock()), allowing + * other locked threads to continue. + * + * Once notified or once timeoutMs has passed, the function unblocks and calls lk.unlock() (PIConditionLock::unlock()), + * leaving lk in the same state as when the function was called. Then the function returns (notice that this last + * mutex locking may block again the thread before returning). + * + * Generally, the function is notified to wake up by a call in another thread either to member notifyOne() or to + * member notifyAll(). But certain implementations may produce spurious wake-up calls without any of these functions + * being called. Therefore, users of this function shall ensure their condition for resumption is met. + * + * If condition is specified, the function only blocks if condition returns false, and notifications can only unblock + * the thread when it becomes true (which is especially useful to check against spurious wake-up calls). + * + * @param lk lock object used by method wait for data protection + * @param condition A callable object or function that takes no arguments and returns a value that can be evaluated + * as a bool. This is called repeatedly until it evaluates to true. + * @return false if timeout reached or true if wakeup condition is true + */ virtual bool waitFor(PIConditionLock& lk, int timeoutMs, const std::function& condition); private: NO_COPY_CLASS(PIConditionVariable) diff --git a/src_main/core/piincludes.cpp b/src_main/core/piincludes.cpp index 94a54b5d..478a6bd3 100755 --- a/src_main/core/piincludes.cpp +++ b/src_main/core/piincludes.cpp @@ -117,7 +117,10 @@ PIString PIPVersion() { * * byte array (\a PIByteArray) * * string (\a PIString, \a PIStringList) * * base object (events and handlers) (\a PIObject) - * * thread (\a PIThread) + * * multithreading + * * thread (\a PIThread) + * * executor (\a PIThreadPoolExecutor) + * * blocking dequeue (\a PIBlockingDequeue) * * timer (\a PITimer) * * console (information output) (\a PIConsole) * * stand-alone