From 46d93c6c9f3aa0242e16784fca0fcce61c822e5f Mon Sep 17 00:00:00 2001 From: Stepan Fomenko Date: Fri, 7 Aug 2020 19:12:09 +0300 Subject: [PATCH] Improved PIBlockingDeque behaviour and unit tests for put, offer, take methods - Methods put, offer, take begins working with move and copy semantics - Mocking queue condition variables with GMock in Unit tests - Rewrite part of unit tests --- lib/main/thread/piblockingdequeue.h | 20 +- tests/concurrent/BlockingDequeueUnitTest.cpp | 340 ++++++++++++++----- 2 files changed, 275 insertions(+), 85 deletions(-) diff --git a/lib/main/thread/piblockingdequeue.h b/lib/main/thread/piblockingdequeue.h index 4a82ae14..c904e9fb 100644 --- a/lib/main/thread/piblockingdequeue.h +++ b/lib/main/thread/piblockingdequeue.h @@ -70,10 +70,11 @@ public: * * @param v the element to add */ - void put(T && v) { + template + void put(Type && v) { mutex.lock(); cond_var_rem->wait(mutex, [&]() { return data_queue.size() < max_size; }); - data_queue.push_back(std::forward(v)); + data_queue.push_back(std::forward(v)); mutex.unlock(); cond_var_add->notifyOne(); } @@ -106,10 +107,11 @@ public: * @param timeoutMs how long to wait before giving up, in milliseconds * @return true if successful, or false if the specified waiting time elapses before space is available */ - bool offer(T && v, int timeoutMs) { + template + bool offer(Type && v, int timeoutMs) { mutex.lock(); bool isOk = cond_var_rem->waitFor(mutex, timeoutMs, [&]() { return data_queue.size() < max_size; } ); - if (isOk) data_queue.push_back(std::forward(v)); + if (isOk) data_queue.push_back(std::forward(v)); mutex.unlock(); if (isOk) cond_var_add->notifyOne(); return isOk; @@ -140,7 +142,8 @@ public: * return value is retrieved value * @return the head of this queue, or defaultVal if the specified waiting time elapses before an element is available */ - T poll(int timeoutMs, T && defaultVal = T(), bool * isOk = nullptr) { + template + T poll(int timeoutMs, Type && defaultVal = Type(), bool * isOk = nullptr) { mutex.lock(); bool isNotEmpty = cond_var_add->waitFor(mutex, timeoutMs, [&]() { return data_queue.size() != 0; }); T t; @@ -148,7 +151,7 @@ public: t = std::move(data_queue.front()); data_queue.pop_front(); } else { - t = std::move(defaultVal); + t = std::forward(defaultVal); } mutex.unlock(); if (isNotEmpty) cond_var_rem->notifyOne(); @@ -165,7 +168,8 @@ public: * return value is retrieved value * @return the head of this queue, or defaultVal if the specified waiting time elapses before an element is available */ - T poll(T && defaultVal = T(), bool * isOk = nullptr) { + template + T poll(Type && defaultVal = Type(), bool * isOk = nullptr) { T t; mutex.lock(); bool isNotEmpty = data_queue.size() != 0; @@ -173,7 +177,7 @@ public: t = std::move(data_queue.front()); data_queue.pop_front(); } else { - t = std::move(defaultVal); + t = std::forward(defaultVal); } mutex.unlock(); if (isNotEmpty) cond_var_rem->notifyOne(); diff --git a/tests/concurrent/BlockingDequeueUnitTest.cpp b/tests/concurrent/BlockingDequeueUnitTest.cpp index ac8e1d2b..f5a51332 100644 --- a/tests/concurrent/BlockingDequeueUnitTest.cpp +++ b/tests/concurrent/BlockingDequeueUnitTest.cpp @@ -3,130 +3,315 @@ #include "testutil.h" #include "piblockingdequeue.h" +using ::testing::_; using ::testing::Return; using ::testing::Eq; +using ::testing::Ne; using ::testing::Matcher; +using ::testing::Expectation; +using ::testing::Sequence; +using ::testing::NiceMock; -class MockConditionVar: public PIConditionVariable { +class MockConditionVar { public: bool isWaitCalled = false; bool isWaitForCalled = false; bool isTrueCondition = false; int timeout = -1; - void wait(PIMutex& lk) override { - isWaitCalled = true; - } + MOCK_METHOD1(wait, void(PIMutex&)); + MOCK_METHOD2(wait, void(PIMutex&, const std::function&)); + MOCK_METHOD2(waitFor, bool(PIMutex&, int)); + MOCK_METHOD3(waitFor, bool(PIMutex&, int, const std::function&)); + MOCK_METHOD0(notifyOne, void()); +}; - void wait(PIMutex& lk, const std::function& condition) override { - isWaitCalled = true; - lk.lock(); - isTrueCondition = condition(); - lk.unlock(); - } +struct QueueElement { + bool is_empty; + int value; + int copy_count; - bool waitFor(PIMutex& lk, int timeoutMs) override { - isWaitForCalled = true; - timeout = timeoutMs; - return false; - } + QueueElement(): is_empty(true), value(0), copy_count(0) { } + explicit QueueElement(int value): is_empty(false), value(value), copy_count(0) { } - bool waitFor(PIMutex& lk, int timeoutMs, const std::function& condition) override { - isWaitForCalled = true; - lk.lock(); - isTrueCondition = condition(); - timeout = timeoutMs; - lk.unlock(); - return isTrueCondition; - } + QueueElement(const QueueElement& other) { + this->is_empty = other.is_empty; + this->value = other.value; + this->copy_count = 0; + const_cast(other.copy_count)++; + } + QueueElement(QueueElement&& other) noexcept : QueueElement() { + std::swap(is_empty, other.is_empty); + std::swap(value, other.value); + std::swap(copy_count, other.copy_count); + } + + bool operator==(const QueueElement &rhs) const { + return is_empty == rhs.is_empty && + value == rhs.value; + } + + bool operator!=(const QueueElement &rhs) const { + return !(rhs == *this); + } + + friend std::ostream& operator<<(std::ostream& os, const QueueElement& el) { + return os << "{ is_empty:" << el.is_empty << ", value:" << el.value << ", copy_count:" << el.copy_count << " }"; + } }; template -class MockDeque { +class MockDequeBase { public: - MOCK_METHOD1_T(push_back, void(T&&)); + MOCK_METHOD1_T(push_back_rval, void(T)); MOCK_METHOD1_T(push_back, void(const T&)); MOCK_METHOD0(size, size_t()); MOCK_METHOD0_T(front, T()); MOCK_METHOD0(pop_front, void()); + + void push_back(T&& t) { + push_back_rval(t); + } }; template -class PIBlockingDequeuePrepare: public PIBlockingDequeue { -public: - typedef PIBlockingDequeue SuperClass; +class MockDeque: public NiceMock> {}; - PIBlockingDequeuePrepare(size_t capacity = SIZE_MAX): SuperClass(capacity) { } +class PIBlockingDequeuePrepare: public PIBlockingDequeue> { +public: + typedef PIBlockingDequeue> SuperClass; + + explicit PIBlockingDequeuePrepare(size_t capacity = SIZE_MAX): SuperClass(capacity) { } template explicit PIBlockingDequeuePrepare(const Iterable& other): SuperClass(other) { } MockConditionVar* getCondVarAdd() { return this->cond_var_add; } MockConditionVar* getCondVarRem() { return this->cond_var_rem; } - MockDeque& getQueue() { return this->data_queue; } + MockDeque& getQueue() { return this->data_queue; } }; -TEST(BlockingDequeueUnitTest, put_is_block_when_capacity_reach) { - size_t capacity = 0; - PIBlockingDequeuePrepare dequeue(capacity); - dequeue.put(11); - ASSERT_TRUE(dequeue.getCondVarRem()->isWaitCalled); - ASSERT_FALSE(dequeue.getCondVarRem()->isTrueCondition); +class BlockingDequeueUnitTest: public ::testing::Test { +public: + int timeout = 100; + size_t capacity; + PIBlockingDequeuePrepare dequeue; + QueueElement element; + + BlockingDequeueUnitTest(): capacity(1), dequeue(capacity), element(11) {} + + void offer2_is_wait_predicate(bool isCapacityReach); + void put_is_wait_predicate(bool isCapacityReach); + void take_is_wait_predicate(bool isEmpty); +}; + +void BlockingDequeueUnitTest::put_is_wait_predicate(bool isCapacityReach) { + std::function conditionVarPredicate; + EXPECT_CALL(*dequeue.getCondVarRem(), wait(_, _)) + .WillOnce([&](PIMutex& m, const std::function& predicate){ conditionVarPredicate = predicate; }); + dequeue.put(element); + + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(isCapacityReach ? capacity : capacity - 1)); + ASSERT_EQ(conditionVarPredicate(), !isCapacityReach); } -TEST(BlockingDequeueUnitTest, offer2_timedout_is_false_when_capacity_reach) { - size_t capacity = 0; - int timeout = 11; - PIBlockingDequeuePrepare dequeue(capacity); - ASSERT_FALSE(dequeue.offer(11, timeout)); +TEST_F(BlockingDequeueUnitTest, put_is_wait_predicate_true) { + put_is_wait_predicate(false); } -TEST(BlockingDequeueUnitTest, offer2_timedout_is_block_when_capacity_reach) { - size_t capacity = 0; - int timeout = 11; - PIBlockingDequeuePrepare dequeue(capacity); - dequeue.offer(11, timeout); - EXPECT_TRUE(dequeue.getCondVarRem()->isWaitForCalled); - EXPECT_EQ(timeout, dequeue.getCondVarRem()->timeout); - ASSERT_FALSE(dequeue.getCondVarRem()->isTrueCondition); +TEST_F(BlockingDequeueUnitTest, put_is_wait_predicate_false_when_capacity_reach) { + put_is_wait_predicate(true); } -TEST(BlockingDequeueUnitTest, offer1_is_true) { - size_t capacity = 1; - PIBlockingDequeuePrepare dequeue(capacity); - EXPECT_CALL(dequeue.getQueue(), size()) - .WillOnce(Return(capacity - 1)); - ASSERT_TRUE(dequeue.offer(10)); +TEST_F(BlockingDequeueUnitTest, put_is_insert_by_copy) { + EXPECT_CALL(dequeue.getQueue(), push_back( Eq(element) )) + .WillOnce(Return()); + dequeue.put(element); } -TEST(BlockingDequeueUnitTest, offer1_is_pop) { - size_t capacity = 1; - int val = 10; - PIBlockingDequeuePrepare dequeue(capacity); - EXPECT_CALL(dequeue.getQueue(), size()) - .WillRepeatedly(Return(capacity - 1)); - EXPECT_CALL(dequeue.getQueue(), push_back(Matcher( Eq(val)) )).Times(1); - dequeue.offer(val); +TEST_F(BlockingDequeueUnitTest, put_is_insert_by_move) { + QueueElement copyElement = element; + EXPECT_CALL(dequeue.getQueue(), push_back_rval( Eq(element) )) + .WillOnce(Return()); + dequeue.put(std::move(copyElement)); } -TEST(BlockingDequeueUnitTest, offer1_is_false_when_capacity_reach) { - size_t capacity = 1; - PIBlockingDequeuePrepare dequeue(capacity); - EXPECT_CALL(dequeue.getQueue(), size()) - .WillOnce(Return(capacity + 1)); - ASSERT_FALSE(dequeue.offer(10)); +TEST_F(BlockingDequeueUnitTest, put_is_notify_about_insert) { + EXPECT_CALL(*dequeue.getCondVarAdd(), notifyOne) + .WillOnce(Return()); + dequeue.put(element); } -TEST(BlockingDequeueUnitTest, offer1_is_not_pop_when_capacity_reach) { - size_t capacity = 1; - PIBlockingDequeuePrepare dequeue(capacity); - EXPECT_CALL(dequeue.getQueue(), size()) - .WillRepeatedly(Return(capacity + 1)); - EXPECT_CALL(dequeue.getQueue(), front()).Times(0); - EXPECT_CALL(dequeue.getQueue(), pop_front()).Times(0); - dequeue.offer(10); +TEST_F(BlockingDequeueUnitTest, offer1_is_insert_by_copy) { + EXPECT_CALL(dequeue.getQueue(), push_back( Eq(element) )) + .WillOnce(Return()); + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(capacity - 1)); + dequeue.offer(element); } +TEST_F(BlockingDequeueUnitTest, offer1_is_insert_by_move) { + QueueElement copyElement = element; + EXPECT_CALL(dequeue.getQueue(), push_back_rval( Eq(element) )) + .WillOnce(Return()); + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(capacity - 1)); + dequeue.offer(std::move(copyElement)); +} + +TEST_F(BlockingDequeueUnitTest, offer1_is_not_insert_when_capacity_reach) { + EXPECT_CALL(dequeue.getQueue(), push_back(_)) + .Times(0); + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(capacity)); + dequeue.offer(element); +} + +TEST_F(BlockingDequeueUnitTest, offer1_is_true_when_insert) { + ON_CALL(dequeue.getQueue(), push_back(_)) + .WillByDefault(Return()); + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(capacity - 1)); + ASSERT_TRUE(dequeue.offer(element)); +} + +TEST_F(BlockingDequeueUnitTest, offer1_is_false_when_capacity_reach) { + ON_CALL(dequeue.getQueue(), push_back(_)) + .WillByDefault(Return()); + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(capacity)); + ASSERT_FALSE(dequeue.offer(element)); +} + +TEST_F(BlockingDequeueUnitTest, offer1_is_notify_about_insert) { + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(capacity - 1)); + EXPECT_CALL(*dequeue.getCondVarAdd(), notifyOne) + .WillOnce(Return()); + dequeue.offer(element); +} + +TEST_F(BlockingDequeueUnitTest, offer1_is_not_notify_about_insert_when_capacity_reach) { + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(capacity)); + EXPECT_CALL(*dequeue.getCondVarAdd(), notifyOne) + .Times(0); + dequeue.offer(element); +} + +void BlockingDequeueUnitTest::offer2_is_wait_predicate(bool isCapacityReach) { + std::function conditionVarPredicate; + EXPECT_CALL(*dequeue.getCondVarRem(), waitFor(_, Eq(timeout), _)) + .WillOnce([&](PIMutex& m, int timeout, const std::function& predicate) { + conditionVarPredicate = predicate; + return isCapacityReach; + }); + dequeue.offer(element, timeout); + + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(isCapacityReach ? capacity : capacity - 1)); + ASSERT_EQ(conditionVarPredicate(), !isCapacityReach); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_wait_predicate_true) { + offer2_is_wait_predicate(false); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_wait_predicate_false_when_capacity_reach) { + offer2_is_wait_predicate(true); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_insert_by_copy) { + EXPECT_CALL(*dequeue.getCondVarRem(), waitFor(_, Eq(timeout), _)) + .WillOnce(Return(true)); + EXPECT_CALL(dequeue.getQueue(), push_back( Eq(element) )) + .WillOnce(Return()); + dequeue.offer(element, timeout); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_insert_by_move) { + QueueElement copyElement = element; + EXPECT_CALL(*dequeue.getCondVarRem(), waitFor(_, Eq(timeout), _)) + .WillOnce(Return(true)); + EXPECT_CALL(dequeue.getQueue(), push_back_rval( Eq(element) )) + .WillOnce(Return()); + dequeue.offer(std::move(copyElement), timeout); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_not_insert_when_timeout) { + EXPECT_CALL(*dequeue.getCondVarRem(), waitFor(_, Eq(timeout), _)) + .WillOnce(Return(false)); + EXPECT_CALL(dequeue.getQueue(), push_back(_)) + .Times(0); + dequeue.offer(element, timeout); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_true_when_insert) { + ON_CALL(*dequeue.getCondVarRem(), waitFor(_, _, _)) + .WillByDefault(Return(true)); + ASSERT_TRUE(dequeue.offer(element, timeout)); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_false_when_timeout) { + ON_CALL(*dequeue.getCondVarRem(), waitFor(_, _, _)) + .WillByDefault(Return(false)); + ASSERT_FALSE(dequeue.offer(element, timeout)); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_notify_about_insert) { + ON_CALL(*dequeue.getCondVarRem(), waitFor(_, _, _)) + .WillByDefault(Return(true)); + EXPECT_CALL(*dequeue.getCondVarAdd(), notifyOne) + .WillOnce(Return()); + dequeue.offer(element, timeout); +} + +TEST_F(BlockingDequeueUnitTest, offer2_is_not_notify_about_insert_when_timeout) { + ON_CALL(*dequeue.getCondVarRem(), waitFor(_, _, _)) + .WillByDefault(Return(false)); + EXPECT_CALL(*dequeue.getCondVarAdd(), notifyOne) + .Times(0); + dequeue.offer(element, timeout); +} + +void BlockingDequeueUnitTest::take_is_wait_predicate(bool isEmpty) { + std::function conditionVarPredicate; + EXPECT_CALL(*dequeue.getCondVarAdd(), wait(_, _)) + .WillOnce([&](PIMutex& m, const std::function& predicate) { conditionVarPredicate = predicate; }); + dequeue.take(); + + ON_CALL(dequeue.getQueue(), size) + .WillByDefault(Return(isEmpty ? 0 : 1)); + ASSERT_EQ(conditionVarPredicate(), !isEmpty); +} + +TEST_F(BlockingDequeueUnitTest, take_is_wait_predicate_true) { + take_is_wait_predicate(false); +} + +TEST_F(BlockingDequeueUnitTest, take_is_wait_predicate_false_when_queue_empty) { + take_is_wait_predicate(true); +} + +TEST_F(BlockingDequeueUnitTest, take_is_get_and_remove) { + Expectation front = EXPECT_CALL(dequeue.getQueue(), front()) + .WillOnce(Return(element)); + EXPECT_CALL(dequeue.getQueue(), pop_front()) + .After(front) + .WillOnce(Return()); + + QueueElement takenElement = dequeue.take(); + ASSERT_EQ(element, takenElement); +} + +TEST_F(BlockingDequeueUnitTest, take_is_notify_about_remove) { + EXPECT_CALL(*dequeue.getCondVarRem(), notifyOne) + .WillOnce(Return()); + dequeue.take(); +} + +/* // TODO change take_is_block_when_empty to prevent segfault TEST(DISABLED_BlockingDequeueUnitTest, take_is_block_when_empty) { size_t capacity = 1; @@ -299,3 +484,4 @@ TEST(BlockingDequeueUnitTest, drainTo_is_ret_eq_to_maxCount) { PIBlockingDequeuePrepare::QueueType deque; ASSERT_EQ(blockingDequeue.drainTo(deque, refDeque.size() - 1), refDeque.size() - 1); } +*/