236 lines
9.7 KiB
C++
236 lines
9.7 KiB
C++
//! \~\file piblockingqueue.h
|
|
//! \~\ingroup Thread
|
|
//! \~\brief
|
|
//! \~english Blocking queue template
|
|
//! \~russian Шаблон блокирующей очереди
|
|
/*
|
|
PIP - Platform Independent Primitives
|
|
Blocking queue template
|
|
Stephan Fomenko
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Lesser General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Lesser General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Lesser General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#ifndef PIBLOCKINGQUEUE_H
|
|
#define PIBLOCKINGQUEUE_H
|
|
|
|
#include "piconditionvar.h"
|
|
#include "piqueue.h"
|
|
|
|
//! \~\ingroup Thread
|
|
//! \~\brief
|
|
//! \~english Thread-safe queue that supports blocking operations - waits for space when storing and waits for element when retrieving.
|
|
//! \~russian Потокобезопасная очередь с поддержкой блокирующих операций - ожидает место при добавлении и ожидает элемент при получении.
|
|
template<typename T>
|
|
class PIBlockingQueue: private PIQueue<T> {
|
|
public:
|
|
//! \~\brief
|
|
//! \~english Constructs queue with capacity \a capacity.
|
|
//! \~russian Создает очередь с емкостью \a capacity.
|
|
//! \~\details
|
|
//! \~english Passed condition variables become owned by the queue and are deleted with it.
|
|
//! \~russian Переданные переменные условия переходят во владение очереди и удаляются вместе с ней.
|
|
explicit inline PIBlockingQueue(size_t capacity = SIZE_MAX,
|
|
PIConditionVariable * cond_var_add = new PIConditionVariable(),
|
|
PIConditionVariable * cond_var_rem = new PIConditionVariable())
|
|
: cond_var_add(cond_var_add)
|
|
, cond_var_rem(cond_var_rem)
|
|
, max_size(capacity) {}
|
|
|
|
//! \~english Constructs queue from snapshot of \a other without synchronizing access to the source deque.
|
|
//! \~russian Создает очередь из снимка \a other без синхронизации доступа к исходной очереди.
|
|
explicit inline PIBlockingQueue(const PIDeque<T> & other)
|
|
: cond_var_add(new PIConditionVariable())
|
|
, cond_var_rem(new PIConditionVariable()) {
|
|
mutex.lock();
|
|
max_size = SIZE_MAX;
|
|
PIDeque<T>::append(other);
|
|
mutex.unlock();
|
|
}
|
|
|
|
//! \~english Constructs queue by copying another blocking queue while locking both queues.
|
|
//! \~russian Создает очередь копированием другой блокирующей очереди с блокировкой обеих очередей.
|
|
inline PIBlockingQueue(PIBlockingQueue<T> & other): cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) {
|
|
other.mutex.lock();
|
|
mutex.lock();
|
|
max_size = other.max_size;
|
|
PIDeque<T>::append(static_cast<PIDeque<T> &>(other));
|
|
mutex.unlock();
|
|
other.mutex.unlock();
|
|
}
|
|
|
|
//! \~english Destroys queue and owned condition variables.
|
|
//! \~russian Уничтожает очередь и принадлежащие ей переменные условия.
|
|
~PIBlockingQueue() {
|
|
delete cond_var_add;
|
|
delete cond_var_rem;
|
|
}
|
|
|
|
|
|
//! \~english Appends \a v, waiting indefinitely until space becomes available when the queue is full.
|
|
//! \~russian Добавляет \a v в конец, ожидая без ограничения по времени появления свободного места при заполненной очереди.
|
|
PIBlockingQueue<T> & put(const T & v) {
|
|
mutex.lock();
|
|
cond_var_rem->wait(mutex, [&]() { return PIDeque<T>::size() < max_size; });
|
|
PIDeque<T>::push_back(v);
|
|
mutex.unlock();
|
|
cond_var_add->notifyOne();
|
|
return *this;
|
|
}
|
|
|
|
//! \~english Alias for \a put().
|
|
//! \~russian Псевдоним для \a put().
|
|
PIBlockingQueue<T> & enqueue(const T & v) { return put(v); }
|
|
|
|
//! \~\brief
|
|
//! \~english Tries to append \a v.
|
|
//! \~russian Пытается добавить \a v в конец очереди.
|
|
//! \~\details
|
|
//! \~english
|
|
//! With null \a timeout this method checks capacity once and returns immediately.
|
|
//! \~russian
|
|
//! При пустом \a timeout метод однократно проверяет емкость и возвращается сразу.
|
|
bool offer(const T & v, PISystemTime timeout = {}) {
|
|
bool isOk;
|
|
mutex.lock();
|
|
if (timeout.isNull())
|
|
isOk = PIDeque<T>::size() < max_size;
|
|
else
|
|
isOk = cond_var_rem->waitFor(mutex, timeout, [&]() { return PIDeque<T>::size() < max_size; });
|
|
if (isOk) PIDeque<T>::push_back(v);
|
|
mutex.unlock();
|
|
if (isOk) cond_var_add->notifyOne();
|
|
return isOk;
|
|
}
|
|
|
|
|
|
//! \~english Removes and returns the front element, waiting indefinitely until one becomes available.
|
|
//! \~russian Удаляет и возвращает элемент из начала очереди, ожидая его появления без ограничения по времени.
|
|
T take() {
|
|
T t;
|
|
mutex.lock();
|
|
cond_var_add->wait(mutex, [&]() { return !PIDeque<T>::isEmpty(); });
|
|
t = T(PIDeque<T>::take_front());
|
|
mutex.unlock();
|
|
cond_var_rem->notifyOne();
|
|
return t;
|
|
}
|
|
|
|
//! \~english Alias for \a take().
|
|
//! \~russian Псевдоним для \a take().
|
|
T dequeue() { return take(); }
|
|
|
|
|
|
//! \~\brief
|
|
//! \~english Tries to remove and return the front element.
|
|
//! \~russian Пытается удалить и вернуть элемент из начала очереди.
|
|
//! \~\details
|
|
//! \~english
|
|
//! With null \a timeout this method checks once and returns \a defaultVal immediately if the queue is empty.
|
|
//! If \a isOk is not null, it is set to \c true on successful retrieval.
|
|
//! \~russian
|
|
//! При пустом \a timeout метод проверяет очередь один раз и сразу возвращает \a defaultVal, если очередь пуста.
|
|
//! \Если \a isOk не равен null, он получает значение \c true при успешном извлечении.
|
|
T poll(PISystemTime timeout = {}, const T & defaultVal = T(), bool * isOk = nullptr) {
|
|
T t = defaultVal;
|
|
bool isNotEmpty;
|
|
mutex.lock();
|
|
if (timeout.isNull())
|
|
isNotEmpty = !PIDeque<T>::isEmpty();
|
|
else
|
|
isNotEmpty = cond_var_add->waitFor(mutex, timeout, [&]() { return !PIDeque<T>::isEmpty(); });
|
|
if (isNotEmpty) t = PIDeque<T>::take_front();
|
|
mutex.unlock();
|
|
if (isNotEmpty) cond_var_rem->notifyOne();
|
|
if (isOk) *isOk = isNotEmpty;
|
|
return t;
|
|
}
|
|
|
|
|
|
//! \~\brief
|
|
//! \~english Returns configured capacity limit.
|
|
//! \~russian Возвращает настроенный предел емкости.
|
|
//! \~\details
|
|
//! \~english
|
|
//! For the default unbounded queue this value is \c SIZE_MAX.
|
|
//! \~russian
|
|
//! Для очереди без ограничения по умолчанию это значение равно \c SIZE_MAX.
|
|
size_t capacity() {
|
|
size_t c;
|
|
mutex.lock();
|
|
c = max_size;
|
|
mutex.unlock();
|
|
return c;
|
|
}
|
|
|
|
//! \~english Returns how many more elements can be inserted without blocking at the moment of the call.
|
|
//! \~russian Возвращает, сколько элементов еще можно вставить без блокировки в момент вызова.
|
|
size_t remainingCapacity() {
|
|
mutex.lock();
|
|
size_t c = max_size - PIDeque<T>::size();
|
|
mutex.unlock();
|
|
return c;
|
|
}
|
|
|
|
//! \~english Returns current number of queued elements.
|
|
//! \~russian Возвращает текущее количество элементов в очереди.
|
|
size_t size() {
|
|
mutex.lock();
|
|
size_t s = PIDeque<T>::size();
|
|
mutex.unlock();
|
|
return s;
|
|
}
|
|
|
|
|
|
//! \~english Moves up to \a maxCount currently available elements into deque \a other without waiting.
|
|
//! \~russian Перемещает до \a maxCount доступных в данный момент элементов в деку \a other без ожидания.
|
|
size_t drainTo(PIDeque<T> & other, size_t maxCount = SIZE_MAX) {
|
|
mutex.lock();
|
|
size_t count = ((maxCount > PIDeque<T>::size()) ? PIDeque<T>::size() : maxCount);
|
|
for (size_t i = 0; i < count; ++i)
|
|
other.push_back(PIDeque<T>::take_front());
|
|
mutex.unlock();
|
|
return count;
|
|
}
|
|
|
|
//! \~\brief
|
|
//! \~english Moves up to \a maxCount currently available elements into blocking queue \a other without waiting.
|
|
//! \~russian Перемещает до \a maxCount доступных в данный момент элементов в блокирующую очередь \a other без ожидания.
|
|
//! \~\details
|
|
//! \~english
|
|
//! The actual count is also limited by the remaining capacity of \a other.
|
|
//! \~russian
|
|
//! Фактическое количество также ограничено оставшейся емкостью \a other.
|
|
size_t drainTo(PIBlockingQueue<T> & other, size_t maxCount = SIZE_MAX) {
|
|
mutex.lock();
|
|
other.mutex.lock();
|
|
size_t count = maxCount > PIDeque<T>::size() ? PIDeque<T>::size() : maxCount;
|
|
size_t otherRemainingCapacity = other.max_size - static_cast<PIDeque<T>>(other).size();
|
|
if (count > otherRemainingCapacity) count = otherRemainingCapacity;
|
|
for (size_t i = 0; i < count; ++i)
|
|
other.push_back(PIDeque<T>::take_front());
|
|
other.mutex.unlock();
|
|
mutex.unlock();
|
|
return count;
|
|
}
|
|
|
|
private:
|
|
PIMutex mutex;
|
|
PIConditionVariable *cond_var_add, *cond_var_rem;
|
|
size_t max_size;
|
|
};
|
|
|
|
|
|
#endif // PIBLOCKINGQUEUE_H
|