BlockingDequeue put and offer logic && documentation for concurrent

git-svn-id: svn://db.shs.com.ru/pip@869 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5
This commit is contained in:
5 changed files with 143 additions and 3 deletions

View File

@@ -15,18 +15,34 @@
template <typename T>
class PIBlockingDequeue: private PIDeque<T> {
public:
/**
* @brief Constructor
*/
explicit inline PIBlockingDequeue(size_t capacity = SIZE_MAX,
PIConditionVariable* cond_var_add = new PIConditionVariable(),
PIConditionVariable* cond_var_rem = new PIConditionVariable())
: cond_var_add(cond_var_add), cond_var_rem(cond_var_rem), max_size(capacity) { }
/**
* @brief Copy constructor. Initialize queue with copy of other queue elements. Not thread-safe for other queue.
*/
explicit inline PIBlockingDequeue(const PIDeque<T>& other) : cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) {
mutex.lock();
max_size = SIZE_MAX;
PIDeque<T>::append(other);
mutex.unlock();
}
/**
* @brief Thread-safe copy constructor. Initialize queue with copy of other queue elements.
*/
inline PIBlockingDequeue(PIBlockingDequeue<T> & other) : cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) {
other.mutex.lock();
mutex.lock();
max_size = other.max_size;
PIDeque<T>::append(static_cast<PIDeque<T>&>(other));
mutex.unlock();
other.mutex.unlock();
}
virtual ~PIBlockingDequeue() {
@@ -37,8 +53,6 @@ public:
/**
* @brief Inserts the specified element into this queue, waiting if necessary for space to become available.
*
* @todo write tests
*
* @param v the element to add
*/
virtual void put(const T & v) {
@@ -68,6 +82,23 @@ public:
return true;
}
/**
* @brief Inserts the specified element into this queue, waiting up to the specified wait time if necessary for
* space to become available.
*
* @param v the element to add
* @param timeoutMs how long to wait before giving up, in milliseconds
* @return true if successful, or false if the specified waiting time elapses before space is available
*/
virtual bool offer(const T & v, int timeoutMs) {
mutex.lock();
bool isOk = cond_var_rem->waitFor(mutex, timeoutMs, [&]() { return PIDeque<T>::size() < max_size; } );
if (isOk) PIDeque<T>::push_back(v);
mutex.unlock();
if (isOk) cond_var_add->notifyOne();
return isOk;
}
/**
* @brief Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.
*
@@ -101,6 +132,12 @@ public:
return t;
}
/**
* @brief Returns the number of elements that this queue can ideally (in the absence of memory or resource
* constraints) contains. This is always equal to the initial capacity of this queue less the current size of this queue.
*
* @return the capacity
*/
virtual size_t capacity() {
size_t c;
mutex.lock();
@@ -122,6 +159,9 @@ public:
return c;
}
/**
* @brief Returns the number of elements in this collection.
*/
virtual size_t size() {
mutex.lock();
size_t s = PIDeque<T>::size();