//! \~\file piblockingqueue.h
//! \~\ingroup Thread
//! \~\brief
//! \~english Blocking queue template
//! \~russian Шаблон блокирующей очереди
/*
PIP - Platform Independent Primitives
Blocking queue template
Stephan Fomenko
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see .
*/
#ifndef PIBLOCKINGQUEUE_H
#define PIBLOCKINGQUEUE_H
#include "piconditionvar.h"
#include "piqueue.h"
//! \~\ingroup Thread
//! \~\brief
//! \~english Thread-safe queue that supports blocking operations - waits for space when storing and waits for element when retrieving.
//! \~russian Потокобезопасная очередь с поддержкой блокирующих операций - ожидает место при добавлении и ожидает элемент при получении.
template
class PIBlockingQueue: private PIQueue {
public:
//! \~\brief
//! \~english Constructs queue with capacity \a capacity.
//! \~russian Создает очередь с емкостью \a capacity.
//! \~\details
//! \~english Passed condition variables become owned by the queue and are deleted with it.
//! \~russian Переданные переменные условия переходят во владение очереди и удаляются вместе с ней.
explicit inline PIBlockingQueue(size_t capacity = SIZE_MAX,
PIConditionVariable * cond_var_add = new PIConditionVariable(),
PIConditionVariable * cond_var_rem = new PIConditionVariable())
: cond_var_add(cond_var_add)
, cond_var_rem(cond_var_rem)
, max_size(capacity) {}
//! \~english Constructs queue from snapshot of \a other without synchronizing access to the source deque.
//! \~russian Создает очередь из снимка \a other без синхронизации доступа к исходной очереди.
explicit inline PIBlockingQueue(const PIDeque & other)
: cond_var_add(new PIConditionVariable())
, cond_var_rem(new PIConditionVariable()) {
mutex.lock();
max_size = SIZE_MAX;
PIDeque::append(other);
mutex.unlock();
}
//! \~english Constructs queue by copying another blocking queue while locking both queues.
//! \~russian Создает очередь копированием другой блокирующей очереди с блокировкой обеих очередей.
inline PIBlockingQueue(PIBlockingQueue & other): cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) {
other.mutex.lock();
mutex.lock();
max_size = other.max_size;
PIDeque::append(static_cast &>(other));
mutex.unlock();
other.mutex.unlock();
}
//! \~english Destroys queue and owned condition variables.
//! \~russian Уничтожает очередь и принадлежащие ей переменные условия.
~PIBlockingQueue() {
delete cond_var_add;
delete cond_var_rem;
}
//! \~english Appends \a v, waiting indefinitely until space becomes available when the queue is full.
//! \~russian Добавляет \a v в конец, ожидая без ограничения по времени появления свободного места при заполненной очереди.
PIBlockingQueue & put(const T & v) {
mutex.lock();
cond_var_rem->wait(mutex, [&]() { return PIDeque::size() < max_size; });
PIDeque::push_back(v);
mutex.unlock();
cond_var_add->notifyOne();
return *this;
}
//! \~english Alias for \a put().
//! \~russian Псевдоним для \a put().
PIBlockingQueue & enqueue(const T & v) { return put(v); }
//! \~\brief
//! \~english Tries to append \a v.
//! \~russian Пытается добавить \a v в конец очереди.
//! \~\details
//! \~english
//! With null \a timeout this method checks capacity once and returns immediately.
//! \~russian
//! При пустом \a timeout метод однократно проверяет емкость и возвращается сразу.
bool offer(const T & v, PISystemTime timeout = {}) {
bool isOk;
mutex.lock();
if (timeout.isNull())
isOk = PIDeque::size() < max_size;
else
isOk = cond_var_rem->waitFor(mutex, timeout, [&]() { return PIDeque::size() < max_size; });
if (isOk) PIDeque::push_back(v);
mutex.unlock();
if (isOk) cond_var_add->notifyOne();
return isOk;
}
//! \~english Removes and returns the front element, waiting indefinitely until one becomes available.
//! \~russian Удаляет и возвращает элемент из начала очереди, ожидая его появления без ограничения по времени.
T take() {
T t;
mutex.lock();
cond_var_add->wait(mutex, [&]() { return !PIDeque::isEmpty(); });
t = T(PIDeque::take_front());
mutex.unlock();
cond_var_rem->notifyOne();
return t;
}
//! \~english Alias for \a take().
//! \~russian Псевдоним для \a take().
T dequeue() { return take(); }
//! \~\brief
//! \~english Tries to remove and return the front element.
//! \~russian Пытается удалить и вернуть элемент из начала очереди.
//! \~\details
//! \~english
//! With null \a timeout this method checks once and returns \a defaultVal immediately if the queue is empty.
//! If \a isOk is not null, it is set to \c true on successful retrieval.
//! \~russian
//! При пустом \a timeout метод проверяет очередь один раз и сразу возвращает \a defaultVal, если очередь пуста.
//! \Если \a isOk не равен null, он получает значение \c true при успешном извлечении.
T poll(PISystemTime timeout = {}, const T & defaultVal = T(), bool * isOk = nullptr) {
T t = defaultVal;
bool isNotEmpty;
mutex.lock();
if (timeout.isNull())
isNotEmpty = !PIDeque::isEmpty();
else
isNotEmpty = cond_var_add->waitFor(mutex, timeout, [&]() { return !PIDeque::isEmpty(); });
if (isNotEmpty) t = PIDeque::take_front();
mutex.unlock();
if (isNotEmpty) cond_var_rem->notifyOne();
if (isOk) *isOk = isNotEmpty;
return t;
}
//! \~\brief
//! \~english Returns configured capacity limit.
//! \~russian Возвращает настроенный предел емкости.
//! \~\details
//! \~english
//! For the default unbounded queue this value is \c SIZE_MAX.
//! \~russian
//! Для очереди без ограничения по умолчанию это значение равно \c SIZE_MAX.
size_t capacity() {
size_t c;
mutex.lock();
c = max_size;
mutex.unlock();
return c;
}
//! \~english Returns how many more elements can be inserted without blocking at the moment of the call.
//! \~russian Возвращает, сколько элементов еще можно вставить без блокировки в момент вызова.
size_t remainingCapacity() {
mutex.lock();
size_t c = max_size - PIDeque::size();
mutex.unlock();
return c;
}
//! \~english Returns current number of queued elements.
//! \~russian Возвращает текущее количество элементов в очереди.
size_t size() {
mutex.lock();
size_t s = PIDeque::size();
mutex.unlock();
return s;
}
//! \~english Moves up to \a maxCount currently available elements into deque \a other without waiting.
//! \~russian Перемещает до \a maxCount доступных в данный момент элементов в деку \a other без ожидания.
size_t drainTo(PIDeque & other, size_t maxCount = SIZE_MAX) {
mutex.lock();
size_t count = ((maxCount > PIDeque::size()) ? PIDeque::size() : maxCount);
for (size_t i = 0; i < count; ++i)
other.push_back(PIDeque::take_front());
mutex.unlock();
return count;
}
//! \~\brief
//! \~english Moves up to \a maxCount currently available elements into blocking queue \a other without waiting.
//! \~russian Перемещает до \a maxCount доступных в данный момент элементов в блокирующую очередь \a other без ожидания.
//! \~\details
//! \~english
//! The actual count is also limited by the remaining capacity of \a other.
//! \~russian
//! Фактическое количество также ограничено оставшейся емкостью \a other.
size_t drainTo(PIBlockingQueue & other, size_t maxCount = SIZE_MAX) {
mutex.lock();
other.mutex.lock();
size_t count = maxCount > PIDeque::size() ? PIDeque::size() : maxCount;
size_t otherRemainingCapacity = other.max_size - static_cast>(other).size();
if (count > otherRemainingCapacity) count = otherRemainingCapacity;
for (size_t i = 0; i < count; ++i)
other.push_back(PIDeque::take_front());
other.mutex.unlock();
mutex.unlock();
return count;
}
private:
PIMutex mutex;
PIConditionVariable *cond_var_add, *cond_var_rem;
size_t max_size;
};
#endif // PIBLOCKINGQUEUE_H