From 3ba6a7b0e84f2b3b1d765830a00ae77c138461b5 Mon Sep 17 00:00:00 2001 From: andrey Date: Tue, 11 Aug 2020 19:05:47 +0300 Subject: [PATCH] rename PIBlockingDequeue -> PIBlockingQueue --- ...{piblockingdequeue.h => piblockingqueue.h} | 25 +++++---- lib/main/thread/pithreadpoolexecutor.h | 4 +- tests/concurrent/BlockingDequeueUnitTest.cpp | 52 +++++++++---------- 3 files changed, 43 insertions(+), 38 deletions(-) rename lib/main/thread/{piblockingdequeue.h => piblockingqueue.h} (89%) diff --git a/lib/main/thread/piblockingdequeue.h b/lib/main/thread/piblockingqueue.h similarity index 89% rename from lib/main/thread/piblockingdequeue.h rename to lib/main/thread/piblockingqueue.h index 4b7fc57f..6021e4d9 100644 --- a/lib/main/thread/piblockingdequeue.h +++ b/lib/main/thread/piblockingqueue.h @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef PIBLOCKINGDEQUEUE_H -#define PIBLOCKINGDEQUEUE_H +#ifndef PIBLOCKINGQUEUE_H +#define PIBLOCKINGQUEUE_H #include "pideque.h" #include "piconditionvar.h" @@ -28,13 +28,13 @@ * wait for space to become available in the queue when storing an element. */ template -class PIBlockingDequeue: private PIDeque { +class PIBlockingQueue: private PIQueue { public: /** * @brief Constructor */ - explicit inline PIBlockingDequeue(size_t capacity = SIZE_MAX, + explicit inline PIBlockingQueue(size_t capacity = SIZE_MAX, PIConditionVariable* cond_var_add = new PIConditionVariable(), PIConditionVariable* cond_var_rem = new PIConditionVariable()) : cond_var_add(cond_var_add), cond_var_rem(cond_var_rem), max_size(capacity) { } @@ -42,7 +42,7 @@ public: /** * @brief Copy constructor. Initialize queue with copy of other queue elements. Not thread-safe for other queue. */ - explicit inline PIBlockingDequeue(const PIDeque& other) : cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) { + explicit inline PIBlockingQueue(const PIDeque& other) : cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) { mutex.lock(); max_size = SIZE_MAX; PIDeque::append(other); @@ -52,7 +52,7 @@ public: /** * @brief Thread-safe copy constructor. Initialize queue with copy of other queue elements. */ - inline PIBlockingDequeue(PIBlockingDequeue & other) : cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) { + inline PIBlockingQueue(PIBlockingQueue & other) : cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) { other.mutex.lock(); mutex.lock(); max_size = other.max_size; @@ -61,7 +61,7 @@ public: other.mutex.unlock(); } - ~PIBlockingDequeue() { + ~PIBlockingQueue() { delete cond_var_add; delete cond_var_rem; } @@ -71,14 +71,17 @@ public: * * @param v the element to add */ - void put(const T & v) { + PIBlockingQueue & put(const T & v) { mutex.lock(); cond_var_rem->wait(mutex, [&]() { return PIDeque::size() < max_size; }); PIDeque::push_back(v); mutex.unlock(); cond_var_add->notifyOne(); + return *this; } + PIBlockingQueue & enqueue(const T & v) {return put(v);} + /** * @brief Inserts the specified element at the end of this queue if it is possible to do so immediately without * exceeding the queue's capacity, returning true upon success and false if this queue is full. @@ -114,6 +117,8 @@ public: return t; } + T dequeue() {return take();} + /** * @brief Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an * element to become available. @@ -190,7 +195,7 @@ public: /** * @brief Removes all available elements from this queue and adds them to other given queue. */ - size_t drainTo(PIBlockingDequeue& other, size_t maxCount = SIZE_MAX) { + size_t drainTo(PIBlockingQueue& other, size_t maxCount = SIZE_MAX) { mutex.lock(); other.mutex.lock(); size_t count = maxCount > PIDeque::size() ? PIDeque::size() : maxCount; @@ -210,4 +215,4 @@ private: }; -#endif // PIBLOCKINGDEQUEUE_H +#endif // PIBLOCKINGQUEUE_H diff --git a/lib/main/thread/pithreadpoolexecutor.h b/lib/main/thread/pithreadpoolexecutor.h index 266103ea..1fa93fc6 100644 --- a/lib/main/thread/pithreadpoolexecutor.h +++ b/lib/main/thread/pithreadpoolexecutor.h @@ -20,7 +20,7 @@ #ifndef PITHREADPOOLEXECUTOR_H #define PITHREADPOOLEXECUTOR_H -#include "piblockingdequeue.h" +#include "piblockingqueue.h" #include @@ -54,7 +54,7 @@ public: private: std::atomic_bool isShutdown_; - PIBlockingDequeue > taskQueue; + PIBlockingQueue > taskQueue; PIVector threadPool; bool queue_own; diff --git a/tests/concurrent/BlockingDequeueUnitTest.cpp b/tests/concurrent/BlockingDequeueUnitTest.cpp index 08a4b7ce..993326dd 100644 --- a/tests/concurrent/BlockingDequeueUnitTest.cpp +++ b/tests/concurrent/BlockingDequeueUnitTest.cpp @@ -1,5 +1,5 @@ #include "gtest/gtest.h" -#include "piblockingdequeue.h" +#include "piblockingqueue.h" class MockConditionVar: public PIConditionVariable { public: @@ -39,7 +39,7 @@ TEST(BlockingDequeueUnitTest, put_is_block_when_capacity_reach) { size_t capacity = 0; auto conditionVarAdd = new MockConditionVar(); auto conditionVarRem = new MockConditionVar(); - PIBlockingDequeue dequeue(capacity, conditionVarAdd, conditionVarRem); + PIBlockingQueue dequeue(capacity, conditionVarAdd, conditionVarRem); dequeue.put(11); ASSERT_TRUE(conditionVarRem->isWaitCalled); ASSERT_FALSE(conditionVarRem->isTrueCondition); @@ -50,7 +50,7 @@ TEST(BlockingDequeueUnitTest, offer_timedout_is_false_when_capacity_reach) { int timeout = 11; auto conditionVarAdd = new MockConditionVar(); auto conditionVarRem = new MockConditionVar(); - PIBlockingDequeue dequeue(capacity, conditionVarAdd, conditionVarRem); + PIBlockingQueue dequeue(capacity, conditionVarAdd, conditionVarRem); ASSERT_FALSE(dequeue.offer(11, timeout)); } @@ -59,7 +59,7 @@ TEST(BlockingDequeueUnitTest, offer_timedout_is_block_when_capacity_reach) { int timeout = 11; auto conditionVarAdd = new MockConditionVar(); auto conditionVarRem = new MockConditionVar(); - PIBlockingDequeue dequeue(capacity, conditionVarAdd, conditionVarRem); + PIBlockingQueue dequeue(capacity, conditionVarAdd, conditionVarRem); dequeue.offer(11, timeout); EXPECT_TRUE(conditionVarRem->isWaitForCalled); EXPECT_EQ(timeout, conditionVarRem->timeout); @@ -68,13 +68,13 @@ TEST(BlockingDequeueUnitTest, offer_timedout_is_block_when_capacity_reach) { TEST(BlockingDequeueUnitTest, offer_is_true_before_capacity_reach) { size_t capacity = 1; - PIBlockingDequeue dequeue(capacity); + PIBlockingQueue dequeue(capacity); ASSERT_TRUE(dequeue.offer(10)); } TEST(BlockingDequeueUnitTest, offer_is_false_when_capacity_reach) { size_t capacity = 1; - PIBlockingDequeue dequeue(capacity); + PIBlockingQueue dequeue(capacity); dequeue.offer(11); ASSERT_FALSE(dequeue.offer(10)); } @@ -83,7 +83,7 @@ TEST(BlockingDequeueUnitTest, offer_is_false_when_capacity_reach) { TEST(DISABLED_BlockingDequeueUnitTest, take_is_block_when_empty) { size_t capacity = 1; auto conditionVar = new MockConditionVar(); - PIBlockingDequeue dequeue(capacity, conditionVar); + PIBlockingQueue dequeue(capacity, conditionVar); // May cause segfault because take front of empty queue dequeue.take(); EXPECT_TRUE(conditionVar->isWaitCalled); @@ -93,7 +93,7 @@ TEST(DISABLED_BlockingDequeueUnitTest, take_is_block_when_empty) { TEST(BlockingDequeueUnitTest, take_is_not_block_when_not_empty) { size_t capacity = 1; auto conditionVar = new MockConditionVar(); - PIBlockingDequeue dequeue(capacity, conditionVar); + PIBlockingQueue dequeue(capacity, conditionVar); dequeue.offer(111); dequeue.take(); @@ -104,7 +104,7 @@ TEST(BlockingDequeueUnitTest, take_is_not_block_when_not_empty) { TEST(BlockingDequeueUnitTest, take_is_value_eq_to_offer_value) { size_t capacity = 1; auto conditionVar = new MockConditionVar(); - PIBlockingDequeue dequeue(capacity, conditionVar); + PIBlockingQueue dequeue(capacity, conditionVar); dequeue.offer(111); ASSERT_EQ(dequeue.take(), 111); @@ -113,7 +113,7 @@ TEST(BlockingDequeueUnitTest, take_is_value_eq_to_offer_value) { TEST(BlockingDequeueUnitTest, take_is_last) { size_t capacity = 10; auto conditionVar = new MockConditionVar(); - PIBlockingDequeue dequeue(capacity, conditionVar); + PIBlockingQueue dequeue(capacity, conditionVar); EXPECT_TRUE(dequeue.offer(111)); EXPECT_TRUE(dequeue.offer(222)); ASSERT_EQ(dequeue.take(), 111); @@ -124,7 +124,7 @@ TEST(BlockingDequeueUnitTest, poll_is_not_block_when_empty) { size_t capacity = 1; bool isOk; auto conditionVar = new MockConditionVar(); - PIBlockingDequeue dequeue(capacity, conditionVar); + PIBlockingQueue dequeue(capacity, conditionVar); dequeue.poll(0, 111, &isOk); EXPECT_FALSE(conditionVar->isWaitForCalled); } @@ -133,7 +133,7 @@ TEST(BlockingDequeueUnitTest, poll_is_default_value_when_empty) { size_t capacity = 1; bool isOk; auto conditionVar = new MockConditionVar(); - PIBlockingDequeue dequeue(capacity, conditionVar); + PIBlockingQueue dequeue(capacity, conditionVar); ASSERT_EQ(dequeue.poll(0, 111, &isOk), 111); } @@ -141,7 +141,7 @@ TEST(BlockingDequeueUnitTest, poll_is_offer_value_when_not_empty) { size_t capacity = 1; bool isOk; auto conditionVar = new MockConditionVar(); - PIBlockingDequeue dequeue(capacity, conditionVar); + PIBlockingQueue dequeue(capacity, conditionVar); dequeue.offer(111); ASSERT_EQ(dequeue.poll(0, -1, &isOk), 111); } @@ -150,7 +150,7 @@ TEST(BlockingDequeueUnitTest, poll_timeouted_is_block_when_empty) { size_t capacity = 1; int timeout = 11; auto conditionVar = new MockConditionVar(); - PIBlockingDequeue dequeue(capacity, conditionVar); + PIBlockingQueue dequeue(capacity, conditionVar); dequeue.poll(timeout, 111); EXPECT_TRUE(conditionVar->isWaitForCalled); EXPECT_EQ(timeout, conditionVar->timeout); @@ -161,7 +161,7 @@ TEST(BlockingDequeueUnitTest, poll_timeouted_is_default_value_when_empty) { size_t capacity = 1; int timeout = 11; auto conditionVar = new MockConditionVar(); - PIBlockingDequeue dequeue(capacity, conditionVar); + PIBlockingQueue dequeue(capacity, conditionVar); ASSERT_EQ(dequeue.poll(timeout, 111), 111); } @@ -169,7 +169,7 @@ TEST(BlockingDequeueUnitTest, poll_timeouted_is_not_block_when_not_empty) { size_t capacity = 1; int timeout = 11; auto conditionVar = new MockConditionVar(); - PIBlockingDequeue dequeue(capacity, conditionVar); + PIBlockingQueue dequeue(capacity, conditionVar); dequeue.offer(111); dequeue.poll(timeout, -1); @@ -181,7 +181,7 @@ TEST(BlockingDequeueUnitTest, poll_timeouted_is_offer_value_when_not_empty) { size_t capacity = 1; int timeout = 11; auto conditionVar = new MockConditionVar(); - PIBlockingDequeue dequeue(capacity, conditionVar); + PIBlockingQueue dequeue(capacity, conditionVar); dequeue.offer(111); ASSERT_EQ(dequeue.poll(timeout, -1), 111); } @@ -189,7 +189,7 @@ TEST(BlockingDequeueUnitTest, poll_timeouted_is_offer_value_when_not_empty) { TEST(BlockingDequeueUnitTest, poll_timeouted_is_last) { size_t capacity = 10; auto conditionVar = new MockConditionVar(); - PIBlockingDequeue dequeue(capacity, conditionVar); + PIBlockingQueue dequeue(capacity, conditionVar); dequeue.offer(111); dequeue.offer(222); ASSERT_EQ(dequeue.poll(10, -1), 111); @@ -198,13 +198,13 @@ TEST(BlockingDequeueUnitTest, poll_timeouted_is_last) { TEST(BlockingDequeueUnitTest, capacity_is_eq_constructor_capacity) { size_t capacity = 10; - PIBlockingDequeue dequeue(capacity); + PIBlockingQueue dequeue(capacity); ASSERT_EQ(dequeue.capacity(), capacity); } TEST(BlockingDequeueUnitTest, remainingCapacity_is_dif_of_capacity_and_size) { size_t capacity = 2; - PIBlockingDequeue dequeue(capacity); + PIBlockingQueue dequeue(capacity); ASSERT_EQ(dequeue.remainingCapacity(), capacity); dequeue.offer(111); ASSERT_EQ(dequeue.remainingCapacity(), capacity - 1); @@ -212,7 +212,7 @@ TEST(BlockingDequeueUnitTest, remainingCapacity_is_dif_of_capacity_and_size) { TEST(BlockingDequeueUnitTest, remainingCapacity_is_zero_when_capacity_reach) { size_t capacity = 1; - PIBlockingDequeue dequeue(capacity); + PIBlockingQueue dequeue(capacity); dequeue.offer(111); dequeue.offer(111); ASSERT_EQ(dequeue.remainingCapacity(), 0); @@ -220,7 +220,7 @@ TEST(BlockingDequeueUnitTest, remainingCapacity_is_zero_when_capacity_reach) { TEST(BlockingDequeueUnitTest, size_is_eq_to_num_of_elements) { size_t capacity = 1; - PIBlockingDequeue dequeue(capacity); + PIBlockingQueue dequeue(capacity); ASSERT_EQ(dequeue.size(), 0); dequeue.offer(111); ASSERT_EQ(dequeue.size(), 1); @@ -228,7 +228,7 @@ TEST(BlockingDequeueUnitTest, size_is_eq_to_num_of_elements) { TEST(BlockingDequeueUnitTest, size_is_eq_to_capacity_when_capacity_reach) { size_t capacity = 1; - PIBlockingDequeue dequeue(capacity); + PIBlockingQueue dequeue(capacity); dequeue.offer(111); dequeue.offer(111); ASSERT_EQ(dequeue.size(), capacity); @@ -238,7 +238,7 @@ TEST(BlockingDequeueUnitTest, drainTo_is_elements_moved) { size_t capacity = 10; PIDeque refDeque; for (size_t i = 0; i < capacity / 2; ++i) refDeque.push_back(i * 10); - PIBlockingDequeue blockingDequeue(refDeque); + PIBlockingQueue blockingDequeue(refDeque); PIDeque deque; blockingDequeue.drainTo(deque); ASSERT_EQ(blockingDequeue.size(), 0); @@ -249,7 +249,7 @@ TEST(BlockingDequeueUnitTest, drainTo_is_ret_eq_to_size_when_all_moved) { size_t capacity = 10; PIDeque refDeque; for (size_t i = 0; i < capacity / 2; ++i) refDeque.push_back(i * 10); - PIBlockingDequeue blockingDequeue(refDeque); + PIBlockingQueue blockingDequeue(refDeque); PIDeque deque; ASSERT_EQ(blockingDequeue.drainTo(deque), refDeque.size()); } @@ -258,7 +258,7 @@ TEST(BlockingDequeueUnitTest, drainTo_is_ret_eq_to_maxCount) { size_t capacity = 10; PIDeque refDeque; for (size_t i = 0; i < capacity / 2; ++i) refDeque.push_back(i * 10); - PIBlockingDequeue blockingDequeue(refDeque); + PIBlockingQueue blockingDequeue(refDeque); PIDeque deque; ASSERT_EQ(blockingDequeue.drainTo(deque, refDeque.size() - 1), refDeque.size() - 1); }