Merge branch 'master' into pico_sdk

This commit is contained in:
2026-03-29 12:28:36 +03:00
359 changed files with 56129 additions and 6404 deletions

View File

@@ -1,12 +1,11 @@
/*! \file piblockingqueue.h
* \ingroup Thread
* \~\brief
* \~english Queue with blocking
* \~russian Блокирующая очередь
*/
//! \~\file piblockingqueue.h
//! \~\ingroup Thread
//! \~\brief
//! \~english Blocking queue template
//! \~russian Шаблон блокирующей очереди
/*
PIP - Platform Independent Primitives
Blocking queue template
Stephan Fomenko
This program is free software: you can redistribute it and/or modify
@@ -29,16 +28,19 @@
#include "piconditionvar.h"
#include "piqueue.h"
/**
* \brief A Queue that supports operations that wait for the queue to become non-empty when retrieving an element, and
* wait for space to become available in the queue when storing an element.
*/
//! \~\ingroup Thread
//! \~\brief
//! \~english Thread-safe queue that supports blocking operations - waits for space when storing and waits for element when retrieving.
//! \~russian Потокобезопасная очередь с поддержкой блокирующих операций - ожидает место при добавлении и ожидает элемент при получении.
template<typename T>
class PIBlockingQueue: private PIQueue<T> {
public:
/**
* \brief Constructor
*/
//! \~\brief
//! \~english Constructs queue with capacity \a capacity.
//! \~russian Создает очередь с емкостью \a capacity.
//! \~\details
//! \~english Passed condition variables become owned by the queue and are deleted with it.
//! \~russian Переданные переменные условия переходят во владение очереди и удаляются вместе с ней.
explicit inline PIBlockingQueue(size_t capacity = SIZE_MAX,
PIConditionVariable * cond_var_add = new PIConditionVariable(),
PIConditionVariable * cond_var_rem = new PIConditionVariable())
@@ -46,9 +48,8 @@ public:
, cond_var_rem(cond_var_rem)
, max_size(capacity) {}
/**
* \brief Copy constructor. Initialize queue with copy of other queue elements. Not thread-safe for other queue.
*/
//! \~english Constructs queue from snapshot of \a other without synchronizing access to the source deque.
//! \~russian Создает очередь из снимка \a other без синхронизации доступа к исходной очереди.
explicit inline PIBlockingQueue(const PIDeque<T> & other)
: cond_var_add(new PIConditionVariable())
, cond_var_rem(new PIConditionVariable()) {
@@ -58,9 +59,8 @@ public:
mutex.unlock();
}
/**
* \brief Thread-safe copy constructor. Initialize queue with copy of other queue elements.
*/
//! \~english Constructs queue by copying another blocking queue while locking both queues.
//! \~russian Создает очередь копированием другой блокирующей очереди с блокировкой обеих очередей.
inline PIBlockingQueue(PIBlockingQueue<T> & other): cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) {
other.mutex.lock();
mutex.lock();
@@ -70,16 +70,16 @@ public:
other.mutex.unlock();
}
//! \~english Destroys queue and owned condition variables.
//! \~russian Уничтожает очередь и принадлежащие ей переменные условия.
~PIBlockingQueue() {
delete cond_var_add;
delete cond_var_rem;
}
/**
* \brief Inserts the specified element into this queue, waiting if necessary for space to become available.
*
* @param v the element to add
*/
//! \~english Appends \a v, waiting indefinitely until space becomes available when the queue is full.
//! \~russian Добавляет \a v в конец, ожидая без ограничения по времени появления свободного места при заполненной очереди.
PIBlockingQueue<T> & put(const T & v) {
mutex.lock();
cond_var_rem->wait(mutex, [&]() { return PIDeque<T>::size() < max_size; });
@@ -89,16 +89,18 @@ public:
return *this;
}
//! \~english Alias for \a put().
//! \~russian Псевдоним для \a put().
PIBlockingQueue<T> & enqueue(const T & v) { return put(v); }
/**
* \brief Inserts the specified element at the end of this queue if it is possible to do so immediately without
* exceeding the queue's capacity, returning true upon success and false if this queue is full.
*
* @param v the element to add
* @param timeout the timeout waiting for inserting if que is full, if timeout is null, then returns immediately
* @return true if the element was added to this queue, else false
*/
//! \~\brief
//! \~english Tries to append \a v.
//! \~russian Пытается добавить \a v в конец очереди.
//! \~\details
//! \~english
//! With null \a timeout this method checks capacity once and returns immediately.
//! \~russian
//! При пустом \a timeout метод однократно проверяет емкость и возвращается сразу.
bool offer(const T & v, PISystemTime timeout = {}) {
bool isOk;
mutex.lock();
@@ -112,11 +114,9 @@ public:
return isOk;
}
/**
* \brief Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.
*
* @return the head of this queue
*/
//! \~english Removes and returns the front element, waiting indefinitely until one becomes available.
//! \~russian Удаляет и возвращает элемент из начала очереди, ожидая его появления без ограничения по времени.
T take() {
T t;
mutex.lock();
@@ -127,18 +127,21 @@ public:
return t;
}
//! \~english Alias for \a take().
//! \~russian Псевдоним для \a take().
T dequeue() { return take(); }
/**
* \brief Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an
* element to become available.
*
* @param timeout how long to wait before giving up
* @param defaultVal value, which returns if the specified waiting time elapses before an element is available
* @param isOk flag, which indicates result of method execution. It will be set to false if timeout, or true if
* return value is retrieved value
* @return the head of this queue, or defaultVal if the specified waiting time elapses before an element is available
*/
//! \~\brief
//! \~english Tries to remove and return the front element.
//! \~russian Пытается удалить и вернуть элемент из начала очереди.
//! \~\details
//! \~english
//! With null \a timeout this method checks once and returns \a defaultVal immediately if the queue is empty.
//! If \a isOk is not null, it is set to \c true on successful retrieval.
//! \~russian
//! При пустом \a timeout метод проверяет очередь один раз и сразу возвращает \a defaultVal, если очередь пуста.
//! \Если \a isOk не равен null, он получает значение \c true при успешном извлечении.
T poll(PISystemTime timeout = {}, const T & defaultVal = T(), bool * isOk = nullptr) {
T t = defaultVal;
bool isNotEmpty;
@@ -154,12 +157,15 @@ public:
return t;
}
/**
* \brief Returns the number of elements that this queue can ideally (in the absence of memory or resource
* constraints) contains. This is always equal to the initial capacity of this queue less the current size of this queue.
*
* @return the capacity
*/
//! \~\brief
//! \~english Returns configured capacity limit.
//! \~russian Возвращает настроенный предел емкости.
//! \~\details
//! \~english
//! For the default unbounded queue this value is \c SIZE_MAX.
//! \~russian
//! Для очереди без ограничения по умолчанию это значение равно \c SIZE_MAX.
size_t capacity() {
size_t c;
mutex.lock();
@@ -168,12 +174,8 @@ public:
return c;
}
/**
* \brief Returns the number of additional elements that this queue can ideally (in the absence of memory or resource
* constraints) accept. This is always equal to the initial capacity of this queue less the current size of this queue.
*
* @return the remaining capacity
*/
//! \~english Returns how many more elements can be inserted without blocking at the moment of the call.
//! \~russian Возвращает, сколько элементов еще можно вставить без блокировки в момент вызова.
size_t remainingCapacity() {
mutex.lock();
size_t c = max_size - PIDeque<T>::size();
@@ -181,9 +183,8 @@ public:
return c;
}
/**
* \brief Returns the number of elements in this collection.
*/
//! \~english Returns current number of queued elements.
//! \~russian Возвращает текущее количество элементов в очереди.
size_t size() {
mutex.lock();
size_t s = PIDeque<T>::size();
@@ -191,9 +192,9 @@ public:
return s;
}
/**
* \brief Removes all available elements from this queue and adds them to other given queue.
*/
//! \~english Moves up to \a maxCount currently available elements into deque \a other without waiting.
//! \~russian Перемещает до \a maxCount доступных в данный момент элементов в деку \a other без ожидания.
size_t drainTo(PIDeque<T> & other, size_t maxCount = SIZE_MAX) {
mutex.lock();
size_t count = ((maxCount > PIDeque<T>::size()) ? PIDeque<T>::size() : maxCount);
@@ -203,9 +204,14 @@ public:
return count;
}
/**
* \brief Removes all available elements from this queue and adds them to other given queue.
*/
//! \~\brief
//! \~english Moves up to \a maxCount currently available elements into blocking queue \a other without waiting.
//! \~russian Перемещает до \a maxCount доступных в данный момент элементов в блокирующую очередь \a other без ожидания.
//! \~\details
//! \~english
//! The actual count is also limited by the remaining capacity of \a other.
//! \~russian
//! Фактическое количество также ограничено оставшейся емкостью \a other.
size_t drainTo(PIBlockingQueue<T> & other, size_t maxCount = SIZE_MAX) {
mutex.lock();
other.mutex.lock();

View File

@@ -87,11 +87,9 @@ void PIConditionVariable::wait(PIMutex & lk) {
}
void PIConditionVariable::wait(PIMutex & lk, const std::function<bool()> & condition) {
bool isCondition;
void PIConditionVariable::wait(PIMutex & lk, std::function<bool()> condition) {
while (true) {
isCondition = condition();
if (isCondition) break;
if (condition()) break;
#if defined(WINDOWS)
SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), INFINITE);
#elif defined(FREERTOS)
@@ -123,8 +121,7 @@ bool PIConditionVariable::waitFor(PIMutex & lk, PISystemTime timeout) {
}
bool PIConditionVariable::waitFor(PIMutex & lk, PISystemTime timeout, const std::function<bool()> & condition) {
bool isCondition;
bool PIConditionVariable::waitFor(PIMutex & lk, PISystemTime timeout, std::function<bool()> condition) {
#if defined(WINDOWS) || defined(FREERTOS)
PITimeMeasurer measurer;
#else
@@ -136,8 +133,7 @@ bool PIConditionVariable::waitFor(PIMutex & lk, PISystemTime timeout, const std:
xEventGroupClearBits(PRIVATE->nativeHandle, 1);
#endif
while (true) {
isCondition = condition();
if (isCondition) break;
if (condition()) break;
bool isTimeout;
#if defined(WINDOWS)
isTimeout = SleepConditionVariableCS(&PRIVATE->nativeHandle,

View File

@@ -1,26 +1,25 @@
/*! \file piconditionvar.h
* \ingroup Thread
* \~\brief
* \~english Conditional variable
* \~russian Conditional variable
*/
//! \~\file piconditionvar.h
//! \~\ingroup Thread
//! \~\brief
//! \~english Condition variable for waiting and notification between threads
//! \~russian Переменная условия для ожидания и уведомления между потоками
/*
PIP - Platform Independent Primitives
PIP - Platform Independent Primitives
Condition variable for waiting and notification between threads
Stephan Fomenko
Stephan Fomenko
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PICONDITIONVAR_H
@@ -31,92 +30,146 @@
#ifndef PIP_NO_THREADS
/**
* \brief A condition variable is an object able to block the calling thread until notified to resume.
*
* It uses a PIMutex to lock the thread when one of its wait functions is called. The thread remains
* blocked until woken up by another thread that calls a notification function on the same PIConditionVariable object.
*/
//! \~\ingroup Thread
//! \~\brief
//! \~english Condition variable used together with external %PIMutex.
//! \~russian Переменная условия, используемая вместе с внешним %PIMutex.
class PIP_EXPORT PIConditionVariable {
public:
NO_COPY_CLASS(PIConditionVariable);
//! \~english Constructs condition variable.
//! \~russian Создает переменную условия.
explicit PIConditionVariable();
//! \~english Destroys condition variable.
//! \~russian Уничтожает переменную условия.
virtual ~PIConditionVariable();
/**
* \brief Unblocks one of the threads currently waiting for this condition. If no threads are waiting, the function
* does nothing. If more than one, it is unspecified which of the threads is selected.
*/
//! \~english Wakes one waiting thread, if any.
//! \~russian Пробуждает один ожидающий поток, если он есть.
void notifyOne();
/**
* \brief Unblocks all threads currently waiting for this condition. If no threads are waiting, the function does
* nothing.
*/
//! \~english Wakes all threads currently waiting on this variable.
//! \~russian Пробуждает все потоки, ожидающие на этой переменной.
void notifyAll();
/**
* \brief see wait(PIMutex &, const std::function<bool()>&)
*/
//! \~english Waits until notification, temporarily releasing \a lk while blocked.
//! \~russian Ожидает уведомления, временно освобождая \a lk на время блокировки.
virtual void wait(PIMutex & lk);
/**
* \brief Wait until notified
*
* The execution of the current thread (which shall have locked with lk method PIMutex::lock()) is blocked
* until notified.
*
* At the moment of blocking the thread, the function automatically calls lk.unlock() (PIMutex::unlock()),
* allowing other locked threads to continue.
*
* Once notified (explicitly, by some other thread), the function unblocks and calls lk.lock() (PIMutex::lock()),
* leaving lk in the same state as when the function was called. Then the function returns (notice that this last mutex
* locking may block again the thread before returning).
*
* Generally, the function is notified to wake up by a call in another thread either to member notifyOne() or to
* member notifyAll(). But certain implementations may produce spurious wake-up calls without any of these functions
* being called. Therefore, users of this function shall ensure their condition for resumption is met.
*
* If condition is specified, the function only blocks if condition returns false, and notifications can only unblock
* the thread when it becomes true (which is specially useful to check against spurious wake-up calls).
*
* @param lk lock object used by method wait for data protection
* @param condition A callable object or function that takes no arguments and returns a value that can be evaluated
* as a bool. This is called repeatedly until it evaluates to true.
*/
virtual void wait(PIMutex & lk, const std::function<bool()> & condition);
//! \~\brief
//! \~english Waits until \a condition becomes true, rechecking it after each wakeup while \a lk is locked.
//! \~russian Ожидает, пока \a condition не станет истинным, повторно проверяя его после каждого пробуждения при заблокированном \a lk.
//! \~\details
//! \~english
//! The execution of the current thread (which shall have locked with lk method PIMutex::lock()) is blocked
//! until notified.
//!
//! At the moment of blocking the thread, the function automatically calls lk.unlock() (PIMutex::unlock()),
//! allowing other locked threads to continue.
//!
//! Once notified (explicitly, by some other thread), the function unblocks and calls lk.lock() (PIMutex::lock()),
//! leaving lk in the same state as when the function was called. Then the function returns (notice that this last mutex
//! locking may block again the thread before returning).
//!
//! Generally, the function is notified to wake up by a call in another thread either to member notifyOne() or to
//! member notifyAll(). But certain implementations may produce spurious wake-up calls without any of these functions
//! being called. Therefore, users of this function shall ensure their condition for resumption is met.
//!
//! If condition is specified, the function only blocks if condition returns false, and notifications can only unblock
//! the thread when it becomes true (which is specially useful to check against spurious wake-up calls).
//!
//! \param lk lock object used by method wait for data protection
//! \param condition A callable object or function that takes no arguments and returns a value that can be evaluated
//! as a bool. This is called repeatedly until it evaluates to true.
//!
//! \~russian
//! Выполнение текущего потока (который должен быть заблокирован с помощью \a lk методом PIMutex::lock())
//! приостанавливается до получения уведомления.
//!
//! В момент блокировки потока функция автоматически вызывает lk.unlock() (PIMutex::unlock()),
//! позволяя другим заблокированным потокам продолжить выполнение.
//!
//! После получения уведомления (явного, от другого потока) функция разблокируется и вызывает
//! lk.lock() (PIMutex::lock()), возвращая lk в то же состояние, в котором он находился при вызове
//! функции. Затем функция завершается (обратите внимание, что эта последняя блокировка мьютекса
//! может снова заблокировать поток перед возвратом).
//!
//! Обычно функция пробуждается при вызове в другом потоке либо notifyOne(), либо notifyAll().
//! Однако некоторые реализации могут создавать ложные пробуждения без вызова любой из этих функций.
//! Поэтому пользователи этой функции должны убедиться, что условие для возобновления выполнено.
//!
//! Если указано condition, функция блокируется только если condition возвращает false,
//! а уведомления могут разблокировать поток только когда оно станет true (что особенно полезно
//! для проверки ложных пробуждений).
//!
//! \param lk объект блокировки, используемый методом wait для защиты данных
//! \param condition вызываемый объект или функция, не принимающая аргументов и возвращающая значение, которое может быть оценено как
//! bool. Вызывается повторно, пока не примет значение true
//!
virtual void wait(PIMutex & lk, std::function<bool ()> condition);
/**
* \brief see waitFor(PIMutex &, int, const std::function<bool()>&)
*/
//! \~english Waits for at most \a timeout and returns \c true if awakened before it expires.
//! \~russian Ожидает не дольше \a timeout и возвращает \c true, если пробуждение произошло до его истечения.
virtual bool waitFor(PIMutex & lk, PISystemTime timeout);
/**
* \brief Wait for timeout or until notified
*
* The execution of the current thread (which shall have locked with lk method PIMutex::lock()) is blocked
* during timeout, or until notified (if the latter happens first).
*
* At the moment of blocking the thread, the function automatically calls lk.lock() (PIMutex::lock()), allowing
* other locked threads to continue.
*
* Once notified or once timeout has passed, the function unblocks and calls lk.unlock() (PIMutex::unlock()),
* leaving lk in the same state as when the function was called. Then the function returns (notice that this last
* mutex locking may block again the thread before returning).
*
* Generally, the function is notified to wake up by a call in another thread either to member notifyOne() or to
* member notifyAll(). But certain implementations may produce spurious wake-up calls without any of these functions
* being called. Therefore, users of this function shall ensure their condition for resumption is met.
*
* If condition is specified, the function only blocks if condition returns false, and notifications can only unblock
* the thread when it becomes true (which is especially useful to check against spurious wake-up calls).
*
* @param lk lock object used by method wait for data protection
* @param condition A callable object or function that takes no arguments and returns a value that can be evaluated
* as a bool. This is called repeatedly until it evaluates to true.
* @return false if timeout reached or true if wakeup condition is true
*/
virtual bool waitFor(PIMutex & lk, PISystemTime timeout, const std::function<bool()> & condition);
//! \brief
//! \~english Waits until \a condition becomes true or \a timeout expires.
//! \~russian Ожидает, пока \a condition не станет истинным или не истечет \a timeout.
//! \~\details
//! \~english
//! The execution of the current thread (which shall have locked with lk method PIMutex::lock()) is blocked
//! during timeout, or until notified (if the latter happens first).
//!
//! At the moment of blocking the thread, the function automatically calls lk.lock() (PIMutex::lock()), allowing
//! other locked threads to continue.
//!
//! Once notified or once timeout has passed, the function unblocks and calls lk.unlock() (PIMutex::unlock()),
//! leaving lk in the same state as when the function was called. Then the function returns (notice that this last
//! mutex locking may block again the thread before returning).
//!
//! Generally, the function is notified to wake up by a call in another thread either to member notifyOne() or to
//! member notifyAll(). But certain implementations may produce spurious wake-up calls without any of these functions
//! being called. Therefore, users of this function shall ensure their condition for resumption is met.
//!
//! If condition is specified, the function only blocks if condition returns false, and notifications can only unblock
//! the thread when it becomes true (which is especially useful to check against spurious wake-up calls).
//!
//! \param lk lock object used by method wait for data protection
//! \param condition A callable object or function that takes no arguments and returns a value that can be evaluated
//! as a bool. This is called repeatedly until it evaluates to true.
//! \return false if timeout reached or true if wakeup condition is true
//!
//! \~russian
//! Выполнение текущего потока (который должен быть заблокирован с помощью lk методом PIMutex::lock())
//! приостанавливается на время timeout, или до получения уведомления (в зависимости от того,
//! что произойдет раньше).
//!
//! В момент блокировки потока функция автоматически вызывает lk.unlock() (PIMutex::unlock()),
//! позволяя другим заблокированным потокам продолжить выполнение.
//!
//! После получения уведомления или истечения таймаута функция разблокируется и вызывает
//! lk.lock() (PIMutex::lock()), возвращая lk в то же состояние, в котором он находился при вызове
//! функции. Затем функция завершается (обратите внимание, что эта последняя блокировка мьютекса
//! может снова заблокировать поток перед возвратом).
//!
//! Обычно функция пробуждается при вызове в другом потоке либо notifyOne(), либо notifyAll().
//! Однако некоторые реализации могут создавать ложные пробуждения без вызова любой из этих функций.
//! Поэтому пользователи этой функции должны убедиться, что условие для возобновления выполнено.
//!
//! Если указано condition, функция блокируется только если condition возвращает false,
//! а уведомления могут разблокировать поток только когда оно станет true (что особенно полезно
//! для проверки ложных пробуждений).
//!
//! \param lk объект блокировки, используемый методом wait для защиты данных
//! \param timeout время ожидания
//! \param condition вызываемый объект или функция, не принимающая аргументов и возвращающая значение, которое может быть оценено как
//! bool. Вызывается повторно, пока не примет значение true
//! \return false если достигнут таймаут, или true если условие пробуждения истинно
//!
virtual bool waitFor(PIMutex & lk, PISystemTime timeout, std::function<bool()> condition);
private:
PRIVATE_DECLARATION(PIP_EXPORT)

View File

@@ -1,12 +1,11 @@
/*! \file pigrabberbase.h
* \ingroup Thread
* \~\brief
* \~english Abstract class for create grabbers
* \~russian Базовый класс для создания грабберов
*/
//! \~\file pigrabberbase.h
//! \~\ingroup Thread
//! \~\brief
//! \~english Grabber thread base class
//! \~russian Базовый класс потока-граббера
/*
PIP - Platform Independent Primitives
Abstract class for create grabbers
Grabber thread base class
Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
@@ -30,19 +29,42 @@
#include "pithread.h"
#include "pitime.h"
//! \~\ingroup Thread
//! \~\brief
//! \~english Base polling grabber thread with a pending queue, recording virtual methods, and a last-item snapshot.
//! \~russian Базовый поток-граббер с очередью ожидающих элементов, виртуальными методами записи и снимком последнего элемента.
//! \~\details
//! \~english
//! Captured items are appended to the internal queue and also copied into \a last(). The snapshot is independent from the queue.
//! \~russian
//! Захваченные элементы добавляются во внутреннюю очередь и также копируются в \a last(). Этот снимок не зависит от очереди.
template<typename T = PIByteArray>
class PIGrabberBase: public PIThread {
PIOBJECT_SUBCLASS(PIGrabberBase, PIThread);
public:
//! \~english Constructs a closed non-recording grabber.
//! \~russian Создает закрытый граббер без записи.
PIGrabberBase() {
is_opened = false;
is_recording = false;
}
//! \~english Stops the grabber thread and releases recording/open state.
//! \~russian Останавливает поток-граббер и освобождает состояние записи и открытия.
virtual ~PIGrabberBase() { stopGrabber(false); }
//! \~english Returns whether the grabber is currently opened.
//! \~russian Возвращает, открыт ли сейчас граббер.
virtual bool isOpened() const { return is_opened; }
//! \~english Returns whether captured items are currently being recorded.
//! \~russian Возвращает, записываются ли сейчас захваченные элементы.
virtual bool isRecording() const { return is_recording; }
//! \~english Starts recording subsequent captured items if the grabber is opened and recording is not active yet.
//! \~russian Запускает запись последующих захваченных элементов, если граббер открыт и запись еще не активна.
virtual void startRecord(const PIString & filename) {
if (!isOpened()) return;
if (isRecording()) return;
@@ -51,6 +73,9 @@ public:
is_recording = true;
rec_mutex.unlock();
}
//! \~english Stops recording if it is active.
//! \~russian Останавливает запись, если она активна.
virtual void stopRecord() {
if (!isOpened()) return;
if (!isRecording()) return;
@@ -59,6 +84,11 @@ public:
stopRecordInternal();
rec_mutex.unlock();
}
//! \~english Returns a copy of the last successfully captured item. This snapshot is kept separately from the pending queue and is not
//! consumed by \a dequeue().
//! \~russian Возвращает копию последнего успешно захваченного элемента. Этот снимок хранится отдельно от очереди ожидания и не
//! извлекается через \a dequeue().
T last() const {
T ret;
last_mutex.lock();
@@ -66,6 +96,9 @@ public:
last_mutex.unlock();
return ret;
}
//! \~english Returns whether the pending queue is empty.
//! \~russian Возвращает, пуста ли очередь ожидающих элементов.
bool isEmpty() {
bool ret;
que_mutex.lock();
@@ -73,6 +106,9 @@ public:
que_mutex.unlock();
return ret;
}
//! \~english Returns the current number of pending captured items.
//! \~russian Возвращает текущее количество ожидающих захваченных элементов.
int queSize() {
int ret;
que_mutex.lock();
@@ -80,6 +116,10 @@ public:
que_mutex.unlock();
return ret;
}
//! \~english Dequeues and returns the oldest pending captured item. If the queue is empty, returns a default-constructed value.
//! \~russian Извлекает и возвращает самый старый ожидающий захваченный элемент. Если очередь пуста, возвращает значение, созданное
//! конструктором по умолчанию.
T dequeue() {
T ret;
// piCoutObj << "start";
@@ -92,6 +132,10 @@ public:
que_mutex.unlock();
return ret;
}
//! \~english Stops the grabber thread. With \a wait_forever equal to \c false the method waits briefly and may terminate the thread
//! forcibly.
//! \~russian Останавливает поток-граббер. При \a wait_forever равном \c false метод ждет недолго и может принудительно завершить поток.
void stopGrabber(bool wait_forever = true) {
if (isRunning()) {
stop();
@@ -104,12 +148,19 @@ public:
}
}
}
//! \~english Opens the grabber immediately. The thread loop also tries to open it automatically when running in a closed state.
//! \~russian Немедленно открывает граббер. Цикл потока также пытается открыть его автоматически, если поток работает в закрытом
//! состоянии.
bool open() {
bool ret = openInternal();
if (!is_opened && ret) opened();
is_opened = ret;
return ret;
}
//! \~english Closes the grabber and resets the last-item snapshot.
//! \~russian Закрывает граббер и сбрасывает снимок последнего элемента.
void close() {
bool em = is_opened;
closeInternal();
@@ -117,28 +168,78 @@ public:
if (em) closed();
is_opened = false;
}
//! \~english Returns diagnostics collected for captured and recorded items.
//! \~russian Возвращает диагностику для захваченных и записанных элементов.
const PIDiagnostics & diag() const { return diag_; }
//! \~english Clears only the pending queue. The value returned by \a last() is not changed.
//! \~russian Очищает только очередь ожидающих элементов. Значение, возвращаемое \a last(), не изменяется.
void clear() {
que_mutex.lock();
que.clear();
que_mutex.unlock();
}
//! \~english Clears the pending queue and closes the grabber.
//! \~russian Очищает очередь ожидания и закрывает граббер.
void restart() {
clear();
close();
}
//! \events
//! \{
//! \fn void dataReady()
//! \brief
//! \~english Raised after a new item has been captured, queued, and copied into \a last().
//! \~russian Вызывается после захвата нового элемента, его добавления в очередь и копирования в \a last().
EVENT(dataReady);
//! \fn void opened()
//! \brief
//! \~english Raised when \a open() switches the grabber into the opened state.
//! \~russian Вызывается, когда \a open() переводит граббер в открытое состояние.
EVENT(opened);
//! \fn void closed()
//! \brief
//! \~english Raised when \a close() closes a previously opened grabber.
//! \~russian Вызывается, когда \a close() закрывает ранее открытый граббер.
EVENT(closed);
//! \}
protected:
//! \~english Virtual method executed once when the thread starts, before polling begins.
//! \~russian Виртуальный метод, выполняемый один раз при старте потока до начала опроса.
virtual void init() {}
//! \~english Opens the underlying grabber resource.
//! \~russian Открывает базовый ресурс граббера.
virtual bool openInternal() = 0;
//! \~english Closes the underlying grabber resource.
//! \~russian Закрывает базовый ресурс граббера.
virtual void closeInternal() = 0;
//! \~english Polls the next item into \a val. Return \c 0 when a new item was captured, a positive value when no item is ready yet, and
//! a negative value on failure that should close the grabber.
//! \~russian Опрашивает следующий элемент в \a val. Возвращает \c 0, когда новый элемент захвачен, положительное значение, когда
//! элемент еще не готов, и отрицательное значение при ошибке, после которой граббер должен закрыться.
virtual int get(T & val) = 0;
//! \~english Records a captured item when recording mode is active.
//! \~russian Записывает захваченный элемент, когда активен режим записи.
virtual void record(const T & val) {}
//! \~english Virtual method called before the recording flag becomes active.
//! \~russian Виртуальный метод, вызываемый перед активацией флага записи.
virtual void startRecordInternal(const PIString & filename) {}
//! \~english Virtual method called after the recording flag is cleared.
//! \~russian Виртуальный метод, вызываемый после сброса флага записи.
virtual void stopRecordInternal() {}
bool is_opened, is_recording;

View File

@@ -28,8 +28,11 @@
#include "piinit.h"
#ifndef PIP_NO_THREADS
//! \~\ingroup Thread
//! \~\brief
//! \~english Mutex for mutual exclusion between threads.
//! \~russian Мьютекс для взаимоисключения между потоками.
class PIP_EXPORT PIMutex {
public:
NO_COPY_CLASS(PIMutex);
@@ -55,6 +58,8 @@ public:
//! \~russian Пробует заблокировать мьютекс
bool tryLock();
//! \~english Returns native mutex handle used by low-level synchronization code.
//! \~russian Возвращает нативный дескриптор мьютекса для низкоуровневой синхронизации.
void * handle();
private:
@@ -64,7 +69,10 @@ private:
PRIVATE_DECLARATION(PIP_EXPORT)
};
//! \~\ingroup Thread
//! \~\brief
//! \~english Scope guard that locks a %PIMutex in constructor and unlocks it in destructor.
//! \~russian Защитник области видимости, который блокирует %PIMutex в конструкторе и разблокирует в деструкторе.
class PIP_EXPORT PIMutexLocker {
public:
NO_COPY_CLASS(PIMutexLocker);

View File

@@ -1,9 +1,8 @@
/*! \file pipipelinethread.h
* \ingroup Thread
* \~\brief
* \~english Class for create multihread pipeline
* \~russian Класс для создания многопоточного конвейера
*/
//! \~\file pipipelinethread.h
//! \~\ingroup Thread
//! \~\brief
//! \~english Thread-based pipeline for multi-stage data processing
//! \~russian Потоковый конвейер для многоэтапной обработки данных
/*
PIP - Platform Independent Primitives
Class for create multihread pipeline
@@ -31,16 +30,31 @@
#include "pithread.h"
//! \~\ingroup Thread
//! \~english Thread-based pipeline template class for multi-stage data processing
//! \~russian Шаблонный класс потокового конвейера для многоэтапной обработки данных
//! \~\details
//! \~english
//! Pipeline thread template class for processing data through multiple stages in separate threads.
//! \~Each stage processes incoming data and passes it to the next stage via event notification.
//! \~russian
//! Шаблонный класс конвейерного потока для многоэтапной обработки данных в отдельных потоках.
//! Каждый этап обрабатывает входные данные и передает их следующему этапу через event-уведомления.
template<typename Tin, typename Tout>
class PIPipelineThread: public PIThread {
PIOBJECT_SUBCLASS(PIPipelineThread, PIThread);
public:
//! \~english Constructs pipeline thread
//! \~russian Создает конвейерный поток
PIPipelineThread() {
cnt = 0;
max_size = 0;
wait_next_pipe = false;
}
//! \~english Stops the stage thread and may terminate it forcibly if it does not finish in time.
//! \~russian Останавливает поток стадии и может принудительно завершить его, если он не завершится вовремя.
~PIPipelineThread() {
stop();
cv.notifyAll();
@@ -49,11 +63,26 @@ public:
terminate();
}
}
//! \~english Connects to next pipeline stage via event notification
//! \~russian Подключает к следующему этапу конвейера через event-уведомления
template<typename T>
void connectTo(PIPipelineThread<Tout, T> * next) {
CONNECT3(void, Tout, bool, bool *, this, calculated, next, enqueue);
}
EVENT3(calculated, const Tout &, v, bool, wait, bool *, overload);
//! \~\handlers
//! \~\{
//! \~english Event handler for data enqueue
//! \~russian Обработчик события добавления данных в очередь
//! \~\details
//! \~english
//! For bounded queues, \a wait decides whether to block until space is available or to drop the item immediately.
//! \~If \a overload is not null, it is set to \c true when the item is rejected because the queue is full.
//! \~russian
//! Для ограниченных очередей \a wait определяет, ждать ли освобождения места или сразу отбросить элемент.
//! \~Если \a overload не равен null, он получает значение \c true, когда элемент отклонен из-за переполнения очереди.
EVENT_HANDLER3(void, enqueue, const Tin &, v, bool, wait, bool *, overload) {
mutex.lock();
// piCoutObj << "enque" << overload;
@@ -72,9 +101,40 @@ public:
}
mutex.unlock();
}
//! \~\}
//! \~\events
//! \~\{
//! \~\fn void calculated(const Tout & v, bool wait, bool * overload)
//! \~\brief
//! \~english Emitted when processing result is ready
//! \~russian Генерируется когда результат обработки готов
//! \~\details
//! \~english Raised after \a calc() succeeds.
//! \~russian Вызывается после успешного завершения \a calc().
//! \~\note
//! \~english The \a wait flag is propagated to a downstream stage and does not affect this stage's own queue.
//! \~russian Флаг \a wait передается следующей стадии и не влияет на собственную очередь этой стадии.
EVENT3(calculated, const Tout &, v, bool, wait, bool *, overload);
//! \}
//! \~english Enqueue data for processing
//! \~russian Добавляет данные в очередь на обработку
void enqueue(const Tin & v, bool wait = false) { enqueue(v, wait, nullptr); }
//! \~english Returns pointer to counter
//! \~russian Возвращает указатель на счетчик
const ullong * counterPtr() const { return &cnt; }
//! \~english Returns items processed counter
//! \~russian Возвращает количество обработанных элементов
ullong counter() const { return cnt; }
//! \~english Returns whether the input queue is empty.
//! \~russian Возвращает, пуста ли входная очередь.
bool isEmpty() {
bool ret;
mutex.lock();
@@ -82,6 +142,9 @@ public:
mutex.unlock();
return ret;
}
//! \~english Returns the current input queue size.
//! \~russian Возвращает текущий размер входной очереди.
int queSize() {
int ret;
mutex.lock();
@@ -89,6 +152,9 @@ public:
mutex.unlock();
return ret;
}
//! \~english Clear input queue
//! \~russian Очищает входную очередь
void clear() {
mutex.lock();
mutex_wait.lock();
@@ -97,6 +163,12 @@ public:
mutex_wait.unlock();
mutex.unlock();
}
//! \~english Stops calculation and waits for thread finish
//! \~russian Останавливает вычисления и ожидает завершения потока
//! \~\note
//! \~english If the stage does not stop within \a wait_delay, it may be terminated forcibly.
//! \~russian Если стадия не остановится за \a wait_delay, она может быть принудительно завершена.
void stopCalc(int wait_delay = 100) {
if (isRunning()) {
stop();
@@ -108,6 +180,13 @@ public:
}
}
}
//! \~english Returns a copy of the last successfully calculated output.
//! \~russian Возвращает копию последнего успешно вычисленного выхода.
//! \~\note
//! \~english This snapshot is separate from the input queue.
//! \~russian Этот снимок хранится отдельно от входной очереди.
Tout getLast() {
Tout ret;
mutex_last.lock();
@@ -116,8 +195,19 @@ public:
return ret;
}
//! \~english Returns the configured input queue limit.
//! \~russian Возвращает настроенный предел входной очереди.
//! \~\note
//! \~english Value \c 0 means the queue is unbounded.
//! \~russian Значение \c 0 означает, что очередь не ограничена.
uint maxQueSize() { return max_size; }
//! \~english Sets the input queue limit.
//! \~russian Устанавливает предел входной очереди.
//! \~\note
//! \~english Value \c 0 removes the limit. If the queue already exceeds the new limit, it is resized immediately.
//! \~russian Значение \c 0 снимает ограничение. Если очередь уже превышает новый предел, она немедленно приводится к нему.
void setMaxQueSize(uint count) {
mutex.lock();
max_size = count;
@@ -125,16 +215,37 @@ public:
mutex.unlock();
}
//! \~english Returns if waiting for next pipeline stage
//! \~russian Возвращает ожидает ли следующий этап конвейера
bool isWaitNextPipe() { return wait_next_pipe; }
//! \~english Sets whether to wait for next pipeline stage
//! \~russian Устанавливает флаг ожидания следующего этапа конвейера
//! \~\details
//! \~english Sets whether the \a calculated() signal requests waiting in the downstream stage.
//! \~russian Устанавливает, должен ли сигнал \a calculated() запрашивать ожидание на следующей стадии.
//! \~\note
//! \~english This flag only affects downstream propagation and does not change how this stage accepts input.
//! \~russian Этот флаг влияет только на передачу вниз по конвейеру и не меняет правила приема входа этой стадией.
void setWaitNextPipe(bool wait) { wait_next_pipe = wait; }
protected:
//! \~english Calculates one output from queued input \a v.
//! \~russian Вычисляет один выход из очередного входного элемента \a v.
//! \~\note
//! \~english Leave \a ok equal to \c true to publish the result, or set it to \c false to drop this input without forwarding.
//! \~russian Оставьте \a ok равным \c true, чтобы опубликовать результат, или установите \a ok в \c false, чтобы отбросить этот вход
//! без пересылки дальше.
virtual Tout calc(Tin & v, bool & ok) = 0;
//! \~english Maximum queue size (0 means unlimited)
//! \~russian Максимальный размер очереди (0 означает без ограничений)
uint max_size;
private:
void begin() override { cnt = 0; }
void run() override {
mutex.lock();
while (in.isEmpty()) {
@@ -161,6 +272,7 @@ private:
}
// piCoutObj << "run ok";
}
PIMutex mutex, mutex_wait;
PIConditionVariable cv, cv_wait;
PIMutex mutex_last;

View File

@@ -1,26 +1,25 @@
/*! \file piprotectedvariable.h
* \ingroup Thread
* \~\brief
* \~english Thread-safe variable
* \~russian Потокобезопасная переменная
*/
//! \~\file piprotectedvariable.h
//! \~\ingroup Thread
//! \~\brief
//! \~english Thread-safe variable
//! \~russian Потокобезопасная переменная
/*
PIP - Platform Independent Primitives
Thread-safe variable
Ivan Pelipenko peri4ko@yandex.ru, Stephan Fomenko, Andrey Bychkov work.a.b@yandex.ru
PIP - Platform Independent Primitives
Thread-safe variable
Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PIPROTECTEDVARIABLE_H
@@ -28,55 +27,74 @@
#include "pimutex.h"
//! \~\ingroup Thread
//! \~\brief
//! \~english Thread-safe variable template class
//! \~russian Шаблонный класс потокобезопасной переменной
template<typename T>
class PIP_EXPORT PIProtectedVariable {
public:
//! \~english
//! \~russian
//! \~english Constructs %PIProtectedVariable and initialize variable by value `v`.
//! \~russian Создает %PIProtectedVariable и инициализирует переменную значением `v`.
PIProtectedVariable(T v = T()): var(std::move(v)) {}
//! \~\brief
//! \~english Pointer-like wrapper returned by \a getRef() while the protected value remains locked.
//! \~russian Указателеподобная обертка, возвращаемая \a getRef(), пока защищенное значение остается заблокированным.
class PIP_EXPORT Pointer {
friend class PIProtectedVariable<T>;
NO_COPY_CLASS(Pointer);
Pointer & operator=(Pointer && other) = delete;
public:
Pointer(const Pointer & v): pv(v.pv), counter(v.counter + 1) {}
//! \~english Move constructor - transfers ownership of the lock.
//! \~russian Конструктор перемещения - передает владение блокировкой.
Pointer(Pointer && other): pv(other.pv) { other.can_unlock = false; };
//! \~english Destroys wrapper and releases the mutex.
//! \~russian Уничтожает обертку и освобождает мьютекс.
~Pointer() {
if (counter == 0) pv.mutex.unlock();
if (can_unlock) {
pv.mutex.unlock();
}
}
//! \~english Returns pointer access to the protected value.
//! \~russian Возвращает указательный доступ к защищенному значению.
T * operator->() { return &pv.var; }
//! \~english Returns reference access to the protected value.
//! \~russian Возвращает ссылочный доступ к защищенному значению.
T & operator*() { return pv.var; }
private:
Pointer() = delete;
Pointer(PIProtectedVariable<T> & v): pv(v) {}
explicit Pointer() = delete;
explicit Pointer(PIProtectedVariable<T> & v): pv(v) { pv.mutex.lock(); }
PIProtectedVariable<T> & pv;
int counter = 0;
bool can_unlock = true;
};
//! \~english Sets value to \"v\"
//! \~russian Устанавливает значение как \"v\"
//! \~english Replaces the protected value with \a v.
//! \~russian Заменяет защищенное значение на \a v.
void set(T v) {
PIMutexLocker _ml(mutex);
var = std::move(v);
}
//! \~english Lock mutex and returns reference wrapper of value. Unlock on variable destructor.
//! \~russian Блокирует мьютекс и возвращает класс-обертку на значение. Разблокирует в деструкторе переменной.
Pointer getRef() {
mutex.lock();
return Pointer(*this);
}
//! \~english Locks the value and returns wrapper-based access to it.
//! \~russian Блокирует значение и возвращает обертку для доступа к нему.
Pointer getRef() { return Pointer(*this); }
//! \~english Returns copy of value
//! \~russian Возвращает копию значения
//! \~english Returns a copy of the protected value.
//! \~russian Возвращает копию защищенного значения.
T get() const {
PIMutexLocker _ml(mutex);
return var;
}
//! \~english Sets value to \"v\"
//! \~russian Устанавливает значение как \"v\"
//! \~english Replaces the protected value with \a v.
//! \~russian Заменяет защищенное значение на \a v.
PIProtectedVariable<T> & operator=(T v) {
set(std::move(v));
return *this;
@@ -85,8 +103,8 @@ public:
private:
mutable PIMutex mutex;
T var;
T var = {};
};
#endif
#endif // PIPROTECTEDVARIABLE_H

View File

@@ -1,13 +1,13 @@
/*! \file pireadwritelock.h
* \ingroup Read/Write lock
* \ingroup Thread
* \~\brief
* \~english Read/Write lock
* \~russian Блокировка чтения/записи
* \~english Read-write lock with multiple readers or one writer
* \~russian Блокировка чтения-записи с несколькими читателями или одним писателем
*/
/*
PIP - Platform Independent Primitives
PIReadWriteLock, PIReadLocker, PIWriteLocker
Ivan Pelipenko peri4ko@yandex.ru
PIReadWriteLock, PIReadLocker, PIWriteLocker
Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
@@ -28,7 +28,10 @@
#include "piconditionvar.h"
//! \~\ingroup Thread
//! \~\brief
//! \~english Synchronization primitive that allows concurrent readers and exclusive writer access.
//! \~russian Примитив синхронизации, допускающий одновременных читателей и эксклюзивный доступ писателя.
class PIP_EXPORT PIReadWriteLock {
public:
NO_COPY_CLASS(PIReadWriteLock)
@@ -42,37 +45,37 @@ public:
~PIReadWriteLock();
//! \~english Lock for write. If already locked for write or read, than wait until all locks released.
//! \~russian Заблокировать на запись. Если уже заблокировано на запись или чтение, то ждёт освобождения блокировок.
//! \~english Acquires writer access, waiting until there are no active readers or writer.
//! \~russian Захватывает доступ на запись, ожидая отсутствия активных читателей и писателя.
void lockWrite();
//! \~english Try to lock for write. Returns if operation was successfull.
//! \~russian Пробует заблокировать на запись. Возвращает успех операции.
//! \~english Tries to acquire writer access without waiting.
//! \~russian Пытается захватить доступ на запись без ожидания.
bool tryLockWrite();
//! \~english Try to lock for write for \"timeout\". Returns if operation was successfull (timeout has not expired).
//! \~russian Пробует заблокировать на запись в течении \"timeout\". Возвращает успех операции (не истек ли тайм-аут).
//! \~english Tries to acquire writer access within \a timeout.
//! \~russian Пытается захватить доступ на запись в пределах \a timeout.
bool tryLockWrite(PISystemTime timeout);
//! \~english Release lock for write.
//! \~russian Освобождает блокировку на запись.
//! \~english Releases writer access and wakes waiting threads.
//! \~russian Освобождает доступ на запись и пробуждает ожидающие потоки.
void unlockWrite();
//! \~english Lock for read. If already locked for write, than wait until write lock released.
//! \~russian Заблокировать на чтение. Если уже заблокировано на запись, то ждёт освобождения записывающей блокировки.
//! \~english Acquires one reader slot, waiting while writer access is active.
//! \~russian Захватывает одно место читателя, ожидая пока активен доступ на запись.
void lockRead();
//! \~english Try to lock for read. Returns if operation was successfull.
//! \~russian Пробует заблокировать на чтение. Возвращает успех операции.
//! \~english Tries to acquire reader access without waiting.
//! \~russian Пытается захватить доступ на чтение без ожидания.
bool tryLockRead();
//! \~english Try to lock for read for \"timeout\". Returns if operation was successfull (timeout has not expired).
//! \~russian Пробует заблокировать на чтение в течении \"timeout\". Возвращает успех операции (не истек ли тайм-аут).
//! \~english Tries to acquire reader access within \a timeout.
//! \~russian Пытается захватить доступ на чтение в пределах \a timeout.
bool tryLockRead(PISystemTime timeout);
//! \~english Release lock for read.
//! \~russian Освобождает блокировку на чтение.
//! \~english Releases one reader slot and wakes waiting threads.
//! \~russian Освобождает одно место читателя и пробуждает ожидающие потоки.
void unlockRead();
private:
@@ -82,7 +85,10 @@ private:
PIConditionVariable var;
};
//! \~\ingroup Thread
//! \~\brief
//! \~english Scope guard that acquires reader access in constructor and releases it in destructor.
//! \~russian Защитник области видимости, который захватывает доступ на чтение в конструкторе и освобождает его в деструкторе.
class PIP_EXPORT PIReadLocker {
public:
NO_COPY_CLASS(PIReadLocker);
@@ -104,7 +110,10 @@ private:
bool cond = true;
};
//! \~\ingroup Thread
//! \~\brief
//! \~english Scope guard that acquires writer access in constructor and releases it in destructor.
//! \~russian Защитник области видимости, который захватывает доступ на запись в конструкторе и освобождает его в деструкторе.
class PIP_EXPORT PIWriteLocker {
public:
NO_COPY_CLASS(PIWriteLocker);
@@ -127,4 +136,4 @@ private:
};
#endif
#endif // PIREADWRITELOCK_H

View File

@@ -1,13 +1,13 @@
/*! \file pisemaphore.h
* \ingroup Semaphore
* \ingroup Thread
* \~\brief
* \~english Basic semaphore
* \~russian Простой семафор
* \~english Counting semaphore for shared resources
* \~russian Счетный семафор для общих ресурсов
*/
/*
PIP - Platform Independent Primitives
PISemaphore, PISemaphoreLocker
Ivan Pelipenko peri4ko@yandex.ru
PISemaphore, PISemaphoreLocker
Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
@@ -28,7 +28,10 @@
#include "piconditionvar.h"
//! \~\ingroup Thread
//! \~\brief
//! \~english Counting semaphore that tracks available resource units.
//! \~russian Счетный семафор, отслеживающий количество доступных единиц ресурса.
class PIP_EXPORT PISemaphore {
public:
NO_COPY_CLASS(PISemaphore)
@@ -42,24 +45,24 @@ public:
~PISemaphore();
//! \~english Acquire \"cnt\" resources. If no available resources, than blocks until they freed.
//! \~russian Захватывает \"cnt\" ресурсов. Если свободных ресурсов недостаточно, то блокирует до их появления.
//! \~english Acquires \a cnt resource units, waiting until enough units become available.
//! \~russian Захватывает \a cnt единиц ресурса, ожидая появления достаточного количества.
void acquire(int cnt = 1);
//! \~english Try to acquire \"cnt\" resources. Returns if operation was successfull.
//! \~russian Пробует захватывает \"cnt\" ресурсов. Возвращает успех захвата.
//! \~english Tries to acquire \a cnt resource units without waiting.
//! \~russian Пытается захватить \a cnt единиц ресурса без ожидания.
bool tryAcquire(int cnt = 1);
//! \~english Try to acquire \"cnt\" resources for \"timeout\". Returns if operation was successfull (timeout has not expired).
//! \~russian Пробует захватывает \"cnt\" ресурсов в течении \"timeout\". Возвращает успех захвата (не истек ли тайм-аут).
//! \~english Tries to acquire \a cnt resource units within \a timeout.
//! \~russian Пытается захватить \a cnt единиц ресурса в пределах \a timeout.
bool tryAcquire(int cnt, PISystemTime timeout);
//! \~english Release \"cnt\" resources.
//! \~russian Освобождает \"cnt\" ресурсов.
//! \~english Releases \a cnt resource units and wakes waiting threads.
//! \~russian Освобождает \a cnt единиц ресурса и пробуждает ожидающие потоки.
void release(int cnt = 1);
//! \~english Returns available resources count.
//! \~russian Возвращает количество свободных ресурсов.
//! \~english Returns the current number of available resource units.
//! \~russian Возвращает текущее количество доступных единиц ресурса.
int available() const;
private:
@@ -68,7 +71,10 @@ private:
PIConditionVariable var;
};
//! \~\ingroup Thread
//! \~\brief
//! \~english Scope guard that acquires semaphore units in constructor and releases them in destructor.
//! \~russian Защитник области видимости, который захватывает единицы семафора в конструкторе и освобождает их в деструкторе.
class PIP_EXPORT PISemaphoreLocker {
public:
NO_COPY_CLASS(PISemaphoreLocker);
@@ -92,4 +98,4 @@ private:
};
#endif
#endif // PISEMAPHORE_H

View File

@@ -1,9 +1,8 @@
/*! \file pispinlock.h
* \ingroup Thread
* \~\brief
* \~english Fast and full-load lock
* \~russian Быстрая блокировка с полной нагрузкой
*/
//! \~\file pispinlock.h
//! \~\ingroup Thread
//! \~\brief
//! \~english Spinlock with busy waiting
//! \~russian Спинлок с активным ожиданием
/*
PIP - Platform Independent Primitives
PISpinlock
@@ -31,6 +30,10 @@
#include <atomic>
//! \~\ingroup Thread
//! \~\brief
//! \~english Lock based on atomic spinning for very short critical sections.
//! \~russian Блокировка на основе атомарного вращения для очень коротких критических секций.
class PIP_EXPORT PISpinlock {
public:
NO_COPY_CLASS(PISpinlock);
@@ -44,27 +47,15 @@ public:
~PISpinlock() {}
//! \~english Lock spinlock
//! \~russian Блокирует спинлок
//! \~\details
//! \~english
//! If spinlock is unlocked it set to locked state and returns immediate.
//! If spinlock is already locked function blocks until spinlock will be unlocked
//! \~russian
//! Если спинлок свободен, то блокирует его и возвращает управление немедленно.
//! Если спинлок заблокирован, то ожидает разблокировки, затем блокирует и возвращает управление
//! \~english Acquires the spinlock, busy-waiting until it becomes free.
//! \~russian Захватывает спинлок, активно ожидая его освобождения.
void lock() {
while (flag.test_and_set(std::memory_order_acquire))
;
}
//! \~english Unlock spinlock
//! \~russian Разблокирует спинлок
//! \~\details
//! \~english
//! In any case this function returns immediate
//! \~russian
//! В любом случае возвращает управление немедленно
//! \~english Releases the spinlock.
//! \~russian Освобождает спинлок.
void unlock() { flag.clear(std::memory_order_release); }
private:
@@ -72,18 +63,22 @@ private:
};
//! \~\ingroup Thread
//! \~\brief
//! \~english Scope guard that locks a %PISpinlock in constructor and unlocks it in destructor.
//! \~russian Защитник области видимости, который блокирует %PISpinlock в конструкторе и разблокирует его в деструкторе.
class PIP_EXPORT PISpinlockLocker {
public:
NO_COPY_CLASS(PISpinlockLocker);
//! \~english Constructs and lock "s" if "condition" is \c true
//! \~russianСоздает и блокирует спинлок "m" если "condition" \c true
//! \~russian Создает и блокирует спинлок "s" если "condition" \c true
PISpinlockLocker(PISpinlock & s, bool condition = true): spinlock(s), cond(condition) {
if (cond) spinlock.lock();
}
//! \~english Unlock "s" if "condition" was \c true
//! \~russian Разблокирует спинлок "m" если "condition" был \c true
//! \~russian Разблокирует спинлок "s" если "condition" был \c true
~PISpinlockLocker() {
if (cond) spinlock.unlock();
}

View File

@@ -541,7 +541,7 @@ PRIVATE_DEFINITION_END(PIThread)
PIThread::PIThread(void * data, ThreadFunc func, bool startNow, PISystemTime loop_delay): PIObject() {
PIINTROSPECTION_THREAD_NEW(this);
data_ = data;
ret_func = func;
ret_func = std::move(func);
terminating = running_ = lockRun = false;
priority_ = piNormal;
delay_ = loop_delay;
@@ -610,13 +610,13 @@ bool PIThread::start(PISystemTime loop_delay) {
bool PIThread::start(ThreadFunc func) {
ret_func = func;
ret_func = std::move(func);
return start();
}
bool PIThread::start(ThreadFunc func, PISystemTime loop_delay) {
ret_func = func;
ret_func = std::move(func);
delay_ = loop_delay;
return start();
}
@@ -642,7 +642,7 @@ bool PIThread::startOnce() {
bool PIThread::startOnce(ThreadFunc func) {
ret_func = func;
ret_func = std::move(func);
return startOnce();
}
@@ -739,12 +739,12 @@ bool PIThread::_startThread(void * func) {
#ifdef FREERTOS
auto name_ba = createThreadName();
if (xTaskCreate((__THREAD_FUNC_RET__ (*)(void *))func,
(const char *)name_ba.data(), // A name just for humans
128, // This stack size can be checked & adjusted by reading the Stack Highwater
this,
priority_,
&PRIVATE->thread) == pdPASS) {
if (xTaskCreate((__THREAD_FUNC_RET__(*)(void *))func,
(const char *)name_ba.data(), // A name just for humans
128, // This stack size can be checked & adjusted by reading the Stack Highwater
this,
priority_,
&PRIVATE->thread) == pdPASS) {
tid_ = (llong)PRIVATE->thread;
return true;
}
@@ -753,7 +753,7 @@ bool PIThread::_startThread(void * func) {
if (PRIVATE->thread) CloseHandle(PRIVATE->thread);
# ifdef CC_GCC
PRIVATE->thread = (void *)_beginthreadex(0, 0, (__THREAD_FUNC_RET__ (*)(void *))func, this, CREATE_SUSPENDED, 0);
PRIVATE->thread = (void *)_beginthreadex(0, 0, (__THREAD_FUNC_RET__(*)(void *))func, this, CREATE_SUSPENDED, 0);
# else
PRIVATE->thread = CreateThread(0, 0, (LPTHREAD_START_ROUTINE)func, this, CREATE_SUSPENDED, 0);
# endif
@@ -767,7 +767,7 @@ bool PIThread::_startThread(void * func) {
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
int ret = pthread_create(&PRIVATE->thread, &attr, (__THREAD_FUNC_RET__ (*)(void *))func, this);
int ret = pthread_create(&PRIVATE->thread, &attr, (__THREAD_FUNC_RET__(*)(void *))func, this);
pthread_attr_destroy(&attr);
// PICout(PICoutManipulators::DefaultControls) << "pthread_create" << PRIVATE->thread;
// piCout << "started" << PRIVATE->thread;
@@ -885,8 +885,8 @@ void PIThread::_runThread() {
PIINTROSPECTION_THREAD_RUN(this);
// PICout(PICoutManipulators::DefaultControls) << "thread" << this << "lock" << "...";
if (lockRun) thread_mutex.lock();
// PICout(PICoutManipulators::DefaultControls) << "thread" << this << "lock" << "ok";
// PICout(PICoutManipulators::DefaultControls) << "thread" << this << "run" << "...";
// PICout(PICoutManipulators::DefaultControls) << "thread" << this << "lock" << "ok";
// PICout(PICoutManipulators::DefaultControls) << "thread" << this << "run" << "...";
#ifdef PIP_INTROSPECTION
PITimeMeasurer _tm;
#endif
@@ -1043,7 +1043,7 @@ void PIThread::runOnce(PIObject * object, const char * handler, const PIString &
void PIThread::runOnce(std::function<void()> func, const PIString & name) {
PIThread * t = new PIThread();
t->setName(name);
t->setSlot(func);
t->setSlot(std::move(func));
#ifndef MICRO_PIP
__PIThreadCollection::instance()->startedAuto(t);
CONNECT0(void, t, stopped, __PIThreadCollection::instance(), stoppedAuto);

View File

@@ -1,9 +1,19 @@
/*! \file pithread.h
* \ingroup Thread
* \~\brief
* \~english Thread class
* \~russian Класс потока
*/
//! \~\file pithread.h
//! \~\ingroup Thread
//! \~\brief
//! \~english Runtime thread object with optional loop execution
//! \~russian Объект потока выполнения с необязательным циклом
//!
//! \~\details
//! \~english
//! %PIThread runs \a begin(), \a run() and \a end() on a dedicated system
//! thread. In loop mode it also drains queued delivery addressed to the thread
//! object as performer through \a maybeCallQueuedEvents() before each iteration.
//! \~russian
//! %PIThread выполняет \a begin(), \a run() и \a end() в отдельном системном
//! потоке. В циклическом режиме он также обрабатывает отложенную доставку,
//! адресованную объекту потока как исполнителю, через
//! \a maybeCallQueuedEvents() перед каждой итерацией.
/*
PIP - Platform Independent Primitives
Thread
@@ -67,9 +77,26 @@ public:
static __PIThreadCollection_Initializer__ __PIThreadCollection_initializer__;
#endif // MICRO_PIP
//! \~english Callback executed by %PIThread with the current \a data() pointer.
//! \~russian Обратный вызов, который %PIThread выполняет с текущим указателем \a data().
typedef std::function<void(void *)> ThreadFunc;
//! \~\ingroup Thread
//! \~\brief
//! \~english Thread object that executes work on a dedicated system thread.
//! \~russian Объект потока, выполняющий работу в отдельном системном потоке.
//! \~\details
//! \~english
//! The default loop calls \a begin(), then repeats queued-delivery draining for
//! this performer object plus \a run(), and finally calls \a end() when
//! stopping. Use \a startOnce() when only one pass is needed and no repeated
//! queued draining loop is required.
//! \~russian
//! Стандартный цикл вызывает \a begin(), затем повторяет обработку отложенной
//! доставки для этого объекта-исполнителя вместе с \a run(), а при остановке
//! вызывает \a end(). Используйте \a startOnce(), когда нужен только один
//! проход без повторяющегося цикла обработки очереди.
class PIP_EXPORT PIThread: public PIObject {
PIOBJECT_SUBCLASS(PIThread, PIObject);
#ifndef MICRO_PIP
@@ -79,22 +106,24 @@ class PIP_EXPORT PIThread: public PIObject {
public:
NO_COPY_CLASS(PIThread);
//! \~english Contructs thread with custom data "data", external function "func" and main loop delay "loop_delay"
//! \~russian Создает поток с данными "data", функцией "func" и задержкой цикла "loop_delay"
//! \~english Constructs a thread with user data, callback and optional immediate start.
//! \~russian Создает поток с пользовательскими данными, обратным вызовом и необязательным немедленным запуском.
PIThread(void * data, ThreadFunc func, bool startNow = false, PISystemTime loop_delay = {});
//! \~english Contructs thread with external function "func" and main loop delay "loop_delay"
//! \~russian Создает поток с функцией "func" и задержкой цикла "loop_delay"
//! \~english Constructs a thread with a callback without custom data.
//! \~russian Создает поток с обратным вызовом без пользовательских данных.
PIThread(std::function<void()> func, bool startNow = false, PISystemTime loop_delay = {});
//! \~english Contructs thread with main loop delay "loop_delay"
//! \~russian Создает поток с задержкой цикла "loop_delay"
//! \~english Constructs a subclass-oriented thread with an optional loop delay.
//! \~russian Создает поток для наследования с необязательной задержкой цикла.
PIThread(bool startNow = false, PISystemTime loop_delay = {});
//! \~english Destroys the thread object. If it is still running, destruction forces termination.
//! \~russian Уничтожает объект потока. Если поток еще работает, при уничтожении выполняется принудительное завершение.
virtual ~PIThread();
//! \~english Priority of thread
//! \~russian Приоритет потока
//! \~english Thread priority hint.
//! \~russian Подсказка приоритета потока.
enum Priority {
piLowerst /** \~english Lowest \~russian Низший */,
piLow /** \~english Low \~russian Низкий */,
@@ -106,128 +135,128 @@ public:
};
//! \~english Start thread
//! \~russian Запускает поток
//! \~english Starts the thread with the stored callback and loop delay.
//! \~russian Запускает поток с сохраненными обратным вызовом и задержкой цикла.
bool start();
//! \~english Start thread
//! \~russian Запускает поток
//! \~english Stores a new loop delay and starts the thread.
//! \~russian Сохраняет новую задержку цикла и запускает поток.
bool start(PISystemTime loop_delay);
//! \~english Start thread
//! \~russian Запускает поток
//! \~english Stores a callback and starts the thread.
//! \~russian Сохраняет обратный вызов и запускает поток.
bool start(ThreadFunc func);
//! \~english Start thread
//! \~russian Запускает поток
//! \~english Stores a callback and loop delay, then starts the thread.
//! \~russian Сохраняет обратный вызов и задержку цикла, затем запускает поток.
bool start(ThreadFunc func, PISystemTime loop_delay);
//! \~english Start thread
//! \~russian Запускает поток
//! \~english Stores a lambda callback and starts the thread.
//! \~russian Сохраняет лямбда-обратный вызов и запускает поток.
bool start(std::function<void()> func);
//! \~english Start thread
//! \~russian Запускает поток
//! \~english Stores a lambda callback and loop delay, then starts the thread.
//! \~russian Сохраняет лямбда-обратный вызов и задержку цикла, затем запускает поток.
bool start(std::function<void()> func, PISystemTime loop_delay);
//! \~english Start thread without internal loop
//! \~russian Запускает поток без внутреннего цикла
//! \~english Starts a one-shot thread without the repeating loop.
//! \~russian Запускает одноразовый поток без повторяющегося цикла.
bool startOnce();
//! \~english Start thread without internal loop
//! \~russian Запускает поток без внутреннего цикла
//! \~english Stores a callback and starts one-shot execution.
//! \~russian Сохраняет обратный вызов и запускает одноразовое выполнение.
bool startOnce(ThreadFunc func);
//! \~english Start thread without internal loop
//! \~russian Запускает поток без внутреннего цикла
//! \~english Stores a lambda callback and starts one-shot execution.
//! \~russian Сохраняет лямбда-обратный вызов и запускает одноразовое выполнение.
bool startOnce(std::function<void()> func);
EVENT_HANDLER0(void, stop);
EVENT_HANDLER0(void, terminate);
//! \~english Deprecated overload of \a stopAndWait() that accepts milliseconds.
//! \~russian Устаревшая перегрузка \a stopAndWait(), принимающая миллисекунды.
bool stopAndWait(int timeout_ms) DEPRECATEDM("use stopAndWait(PISystemTime)") {
return stopAndWait(PISystemTime::fromMilliseconds(timeout_ms));
}
//! \~english Stop thread and wait for finish. Returns \b false if timeout expired.
//! \~russian Останавливает поток и ожидает завершения. Возвращает \b false если таймаут истек.
//! \~english Requests stop and waits for thread completion. Returns \b false if the timeout expires.
//! \~russian Запрашивает остановку и ожидает завершения потока. Возвращает \b false, если таймаут истек.
bool stopAndWait(PISystemTime timeout = {});
//! \~english Set common data passed to external function
//! \~russian Устанавливает данные, передаваемые в функцию потока
//! \~english Sets the data pointer passed to \a ThreadFunc callbacks.
//! \~russian Устанавливает указатель данных, передаваемый в обратные вызовы \a ThreadFunc.
void setData(void * d) { data_ = d; }
//! \~english Set external function that will be executed after every \a run()
//! \~russian Устанавливает функцию потока, вызываемую после каждого \a run()
//! \~english Sets the callback executed after each \a run() pass.
//! \~russian Устанавливает обратный вызов, выполняемый после каждого прохода \a run().
void setSlot(ThreadFunc func) { ret_func = func; }
//! \~english Set external function that will be executed after every \a run()
//! \~russian Устанавливает функцию потока, вызываемую после каждого \a run()
//! \~english Sets a lambda callback executed after each \a run() pass.
//! \~russian Устанавливает лямбда-обратный вызов, выполняемый после каждого прохода \a run().
void setSlot(std::function<void()> func) {
ret_func = [func](void *) { func(); };
}
//! \~english Set thread priority
//! \~russian Устанавливает приоритет потока
//! \~english Sets the priority hint. If the thread is already running, applies it immediately.
//! \~russian Устанавливает подсказку приоритета. Если поток уже работает, применяет ее немедленно.
void setPriority(PIThread::Priority prior);
//! \~english Returns common data passed to external function
//! \~russian Возвращает данные, передаваемые в функцию потока
//! \~english Returns the data pointer passed to \a ThreadFunc callbacks.
//! \~russian Возвращает указатель данных, передаваемый в обратные вызовы \a ThreadFunc.
void * data() const { return data_; }
//! \~english Return thread priority
//! \~russian Возвращает приоритет потока
//! \~english Returns the configured priority hint.
//! \~russian Возвращает настроенную подсказку приоритета.
PIThread::Priority priority() const { return priority_; }
//! \~english Return if thread is running
//! \~russian Возвращает исполняется ли поток
//! \~english Returns whether the thread is currently running.
//! \~russian Возвращает, выполняется ли поток в данный момент.
bool isRunning() const { return running_; }
//! \~english Return if thread is stopping
//! \~russian Возвращает останавливается ли поток
//! \~english Returns whether stop has been requested and the thread is still finishing.
//! \~russian Возвращает, был ли запрошен останов и поток еще завершает работу.
bool isStopping() const { return running_ && terminating; }
//! \~english Wait for thread start
//! \~russian Ожидает старта потока
//! \~english Waits until the thread starts. Returns \b false if the timeout expires first.
//! \~russian Ожидает запуска потока. Возвращает \b false, если таймаут истек раньше.
bool waitForStart(PISystemTime timeout = {});
//! \~english Deprecated overload of \a waitForStart() that accepts milliseconds.
//! \~russian Устаревшая перегрузка \a waitForStart(), принимающая миллисекунды.
bool waitForStart(int timeout_msecs) DEPRECATEDM("use waitForStart(PISystemTime)") {
return waitForStart(PISystemTime::fromMilliseconds(timeout_msecs));
}
//! \~english Wait for thread finish. Returns \b false if timeout expired.
//! \~russian Ожидает завершения потока. Возвращает \b false если таймаут истек.
//! \~english Waits for thread completion. Returns \b false if the timeout expires first.
//! \~russian Ожидает завершения потока. Возвращает \b false, если таймаут истек раньше.
bool waitForFinish(PISystemTime timeout = {});
//! \~english Deprecated overload of \a waitForFinish() that accepts milliseconds.
//! \~russian Устаревшая перегрузка \a waitForFinish(), принимающая миллисекунды.
bool waitForFinish(int timeout_msecs) DEPRECATEDM("use waitForFinish(PISystemTime)") {
return waitForFinish(PISystemTime::fromMilliseconds(timeout_msecs));
}
//! \~english Set necessity of lock every \a run() with internal mutex
//! \~russian Устанавливает необходимость блокировки внутреннего мьютекса каждый \a run()
//! \~english Enables locking of the internal mutex around \a begin(), \a run(), callbacks and \a end().
//! \~russian Включает блокировку внутреннего мьютекса вокруг \a begin(), \a run(), обратных вызовов и \a end().
void needLockRun(bool need) { lockRun = need; }
EVENT_HANDLER0(void, lock) const { thread_mutex.lock(); }
EVENT_HANDLER0(void, unlock) const { thread_mutex.unlock(); }
//! \~english Returns internal mutex
//! \~russian Возвращает внутренний мьютекс
//! \~english Returns the internal mutex used by \a lock(), \a unlock() and \a needLockRun().
//! \~russian Возвращает внутренний мьютекс, используемый \a lock(), \a unlock() и \a needLockRun().
PIMutex & mutex() const { return thread_mutex; }
//! \~english Returns thread ID
//! \~russian Возвращает ID потока
//! \~english Returns the system thread identifier, or -1 when the thread is not running.
//! \~russian Возвращает системный идентификатор потока, либо -1 когда поток не запущен.
llong tid() const { return tid_; }
void __thread_func__();
void __thread_func_once__();
EVENT(started);
EVENT(stopped);
//! \~english Call event handler "handler" of object "object" in separate thread
//! \~russian Вызывает обработчик "handler" объекта "object" в отдельном потоке
//! \~english Creates a temporary thread and invokes handler \a handler of object \a object on it.
//! \~russian Создает временный поток и вызывает в нем обработчик \a handler объекта \a object.
static void runOnce(PIObject * object, const char * handler, const PIString & name = PIString());
//! \~english Call [lambda expression](https://en.cppreference.com/w/cpp/language/lambda) "func" in separate thread
//! \~russian Вызывает [лямбда-выражение](https://ru.cppreference.com/w/cpp/language/lambda) "func" в отдельном потоке
//! \~english Creates a temporary thread and runs [lambda expression](https://en.cppreference.com/w/cpp/language/lambda) \a func on it.
//! \~russian Создает временный поток и выполняет в нем [лямбда-выражение](https://ru.cppreference.com/w/cpp/language/lambda) \a func.
static void runOnce(std::function<void()> func, const PIString & name = PIString());
//! \handlers
@@ -235,23 +264,27 @@ public:
//! \fn void stop()
//! \brief
//! \~english Stop thread
//! \~russian Останавливает поток
//! \~english Requests graceful thread shutdown.
//! \~russian Запрашивает корректное завершение потока.
EVENT_HANDLER0(void, stop);
//! \fn void terminate()
//! \brief
//! \~english Strongly stop thread
//! \~russian Жёстко прерывает поток
//! \~english Forces thread termination. Use only as a last resort.
//! \~russian Принудительно прерывает поток. Используйте только как крайнюю меру.
EVENT_HANDLER0(void, terminate);
//! \fn void lock()
//! \brief
//! \~english Lock internal mutex
//! \~russian Блокирует внутренний мьютекс
//! \~english Locks the internal mutex.
//! \~russian Блокирует внутренний мьютекс.
EVENT_HANDLER0(void, lock) const { thread_mutex.lock(); }
//! \fn void unlock()
//! \brief
//! \~english Unlock internal mutex
//! \~russian Разблокирует внутренний мьютекс
//! \~english Unlocks the internal mutex.
//! \~russian Разблокирует внутренний мьютекс.
EVENT_HANDLER0(void, unlock) const { thread_mutex.unlock(); }
//! \}
//! \events
@@ -259,29 +292,31 @@ public:
//! \fn void started()
//! \brief
//! \~english Raise on thread start
//! \~russian Вызывается при старте потока
//! \~english Raised after the thread has started.
//! \~russian Вызывается после запуска потока.
EVENT(started);
//! \fn void stopped()
//! \brief
//! \~english Raise on thread stop
//! \~russian Вызывается при завершении потока
//! \~english Raised when thread shutdown begins.
//! \~russian Вызывается при начале завершения потока.
EVENT(stopped);
//! \}
protected:
static int priority2System(PIThread::Priority p);
//! \~english Function executed once at the start of thread
//! \~russian Метод выполняется один раз при старте потока
//! \~english Virtual method executed once after the system thread starts and before \a started().
//! \~russian Виртуальный метод, выполняемый один раз после запуска системного потока и до \a started().
virtual void begin() { ; }
//! \~english Function executed at every "loop_delay" msecs until thread was stopped
//! \~russian Метод выполняется каждые "loop_delay" миллисекунд
//! \~english Virtual method executed on each loop iteration until stop is requested.
//! \~russian Виртуальный метод, выполняемый на каждой итерации цикла, пока не запрошен останов.
virtual void run() { ; }
//! \~english Function executed once at the end of thread
//! \~russian Метод выполняется один раз при остановке потока
//! \~english Virtual method executed once during thread shutdown after \a stopped().
//! \~russian Виртуальный метод, выполняемый один раз при завершении потока после \a stopped().
virtual void end() { ; }
std::atomic_bool terminating, running_, lockRun;

View File

@@ -1,3 +1,13 @@
/*! \file pithreadmodule.h
* \ingroup Thread
* \~\brief
* \~english Umbrella header for the thread module
* \~russian Зонтичный заголовок модуля потоков
*
* \~\details
* \~english Includes the main public synchronization, worker-thread and timer APIs.
* \~russian Подключает основные публичные API синхронизации, рабочих потоков и таймеров.
*/
/*
PIP - Platform Independent Primitives
Module includes
@@ -34,10 +44,10 @@
//! \~russian \par Общее
//!
//! \~english
//! These files provides thread, timer, blocking and several complex multithreading techniques
//! This module provides thread, timer and synchronization primitives for runtime work.
//!
//! \~russian
//! Эти файлы обеспечивают потоки, таймера, блокировки и несколько сложных многопоточных техник
//! Этот модуль предоставляет потоки, таймеры и примитивы синхронизации для задач времени выполнения.
//!
//! \~\authors
//! \~english
@@ -61,8 +71,8 @@
#include "pispinlock.h"
#include "pithread.h"
#include "pithreadnotifier.h"
#include "pithreadpoolexecutor.h"
#include "pithreadpoolloop.h"
#include "pithreadpoolworker.h"
#include "pitimer.h"
#endif // PITHREADMODULE_H

View File

@@ -1,13 +1,12 @@
/*! \file pithreadnotifier.h
* \ingroup Thread
* \~\brief
* \~english Class for simple notify and wait in different threads
* \~russian Класс для простого уведомления и ожидания в различных потоках
*/
//! \~\file pithreadnotifier.h
//! \~\ingroup Thread
//! \~\brief
//! \~english Counting notification helper for coordination between threads
//! \~russian Счетный помощник уведомления для координации между потоками
/*
PIP - Platform Independent Primitives
Class for simply notify and wait in different threads
Andrey Bychkov work.a.b@yandex.ru
Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
@@ -29,6 +28,19 @@
#include "piconditionvar.h"
//! \~\ingroup Thread
//! \~\brief
//! \~english Thread notifier class for synchronization between threads.
//! \~russian Класс уведомления потоков для синхронизации между потоками.
//! \~\details
//! \~english
//! Each \a notify() stores one pending wake-up. A later \a wait() or
//! \a waitFor() consumes one stored notification; with multiple waiters the
//! resume order is unspecified.
//! \~russian
//! Каждый вызов \a notify() сохраняет одно ожидающее пробуждение. Последующий
//! \a wait() или \a waitFor() потребляет одно сохраненное уведомление; при
//! нескольких ожидающих потоках порядок возобновления не определен.
class PIP_EXPORT PIThreadNotifier {
public:
PIThreadNotifier();

View File

@@ -1,83 +0,0 @@
/*
PIP - Platform Independent Primitives
Stephan Fomenko
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pithreadpoolexecutor.h"
#include "piliterals_time.h"
/*! \class PIThreadPoolExecutor
* \brief Thread pools address two different problems: they usually provide improved performance when executing large
* numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and
* managing the resources, including threads, consumed when executing a collection of tasks.
*/
PIThreadPoolExecutor::PIThreadPoolExecutor(int corePoolSize): isShutdown_(false) {
for (int i = 0; i < corePoolSize; ++i) {
PIThread * thread = new PIThread([&, i]() {
auto runnable = taskQueue.poll(100_ms, std::function<void()>());
if (runnable) {
runnable();
}
if (isShutdown_ && taskQueue.size() == 0) threadPool[i]->stop();
});
threadPool.push_back(thread);
thread->start();
}
}
bool PIThreadPoolExecutor::awaitTermination(PISystemTime timeout) {
PITimeMeasurer measurer;
for (size_t i = 0; i < threadPool.size(); ++i) {
auto dif = timeout - measurer.elapsed();
if (dif.isNegative()) return false;
if (!threadPool[i]->waitForFinish(dif)) return false;
}
return true;
}
void PIThreadPoolExecutor::shutdownNow() {
isShutdown_ = true;
for (size_t i = 0; i < threadPool.size(); ++i)
threadPool[i]->stop();
}
PIThreadPoolExecutor::~PIThreadPoolExecutor() {
shutdownNow();
while (threadPool.size() > 0)
delete threadPool.take_back();
}
void PIThreadPoolExecutor::execute(const std::function<void()> & runnable) {
if (!isShutdown_) taskQueue.offer(runnable);
}
bool PIThreadPoolExecutor::isShutdown() const {
return isShutdown_;
}
void PIThreadPoolExecutor::shutdown() {
isShutdown_ = true;
}

View File

@@ -1,59 +1,23 @@
/*
PIP - Platform Independent Primitives
Stephan Fomenko
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PITHREADPOOLEXECUTOR_H
#define PITHREADPOOLEXECUTOR_H
#include "piblockingqueue.h"
#include "pithread.h"
#include "pithreadpoolworker.h"
#include <atomic>
namespace {
DEPRECATEDM("Use pithreadpoolworker.h") const bool deprecated_header_is_used = true;
}
class PIP_EXPORT PIThreadPoolExecutor {
class DEPRECATEDM("Use PIThreadPoolWorker") PIThreadPoolExecutor: public PIThreadPoolWorker {
public:
explicit PIThreadPoolExecutor(int corePoolSize);
PIThreadPoolExecutor(int threads_count): PIThreadPoolWorker(threads_count) { start(); }
~PIThreadPoolExecutor() { stopAndWait(); }
virtual ~PIThreadPoolExecutor();
//! \brief Executes the given task sometime in the future. The task execute in an existing pooled thread. If the task
//! cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been
//! reached.
//!
//! \param runnable not empty function for thread pool execution
void execute(const std::function<void()> & runnable);
void shutdownNow();
//! \brief Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be
//! accepted. Invocation has no additional effect if already shut down. This method does not wait for previously
//! submitted tasks to complete execution. Use awaitTermination to do that.
void shutdown();
bool isShutdown() const;
bool awaitTermination(PISystemTime timeout);
private:
std::atomic_bool isShutdown_;
PIBlockingQueue<std::function<void()>> taskQueue;
PIVector<PIThread *> threadPool;
void execute(std::function<void()> runnable) DEPRECATEDM("Use enqueueTask()") { enqueueTask(runnable); }
void shutdownNow() DEPRECATEDM("Use stopAndWait()") { stopAndWait(); }
void shutdown() DEPRECATEDM("Use stop()") { stop(); }
bool isShutdown() const DEPRECATEDM("Use !isRunning()") { return !isRunning(); }
bool awaitTermination(PISystemTime timeout) DEPRECATEDM("Use waitForFinish()") { return waitForFinish(timeout); }
};
#endif // PITHREADPOOLEXECUTOR_H
#endif

View File

@@ -136,7 +136,7 @@ PIThreadPoolLoop::~PIThreadPoolLoop() {
void PIThreadPoolLoop::setFunction(std::function<void(int)> f) {
func = f;
func = std::move(f);
}
@@ -163,6 +163,6 @@ void PIThreadPoolLoop::exec(int index_start, int index_count) {
void PIThreadPoolLoop::exec(int index_start, int index_count, std::function<void(int)> f) {
setFunction(f);
setFunction(std::move(f));
exec(index_start, index_count);
}

View File

@@ -1,9 +1,8 @@
/*! \file pithreadpoolloop.h
* \ingroup Thread
* \~\brief
* \~english Thread pool loop
* \~russian Пул потоков
*/
//! \~\file pithreadpoolloop.h
//! \~\ingroup Thread
//! \~\brief
//! \~english Parallel loop helper
//! \~russian Вспомогательный класс для параллельного цикла
/*
PIP - Platform Independent Primitives
Thread pool loop
@@ -31,54 +30,44 @@
class PIThread;
//! \~\ingroup Thread
//! \~\brief
//! \~english Helper that runs one integer range across a fixed set of worker threads.
//! \~russian Вспомогательный класс, который выполняет один целочисленный диапазон на фиксированном наборе рабочих потоков.
class PIP_EXPORT PIThreadPoolLoop {
public:
//! \~english
//! Contructs thread pool with threads count "thread_cnt".
//! If "thread_cnt" = -1 then system processors count used
//! \~russian
//! Создает пул из "thread_cnt" потоков. Если "thread_cnt" = -1
//! то используется количество процессоров системы
//! \~english Constructs parallel loop runner with \a thread_cnt worker threads. If \a thread_cnt is less than or equal to zero, the
//! processor count is used.
//! \~russian Создает исполнитель параллельного цикла с \a thread_cnt рабочими потоками. Если \a thread_cnt меньше либо равен нулю,
//! используется количество процессоров.
PIThreadPoolLoop(int thread_cnt = -1);
//! \~english Stops worker threads and destroys the loop runner.
//! \~russian Останавливает рабочие потоки и уничтожает исполнитель цикла.
virtual ~PIThreadPoolLoop();
//! \~english Set threads function to [lambda expression](https://en.cppreference.com/w/cpp/language/lambda) "f" with format [ ](int){
//! ... }
//! \~russian Устанавливает функцию потоков на [лямбда-выражение](https://ru.cppreference.com/w/cpp/language/lambda) "f" в формате [
//! ](int){ ... }
//! \~english Sets the iteration body called once for each index of a started range.
//! \~russian Устанавливает тело итерации, которое вызывается один раз для каждого индекса запущенного диапазона.
void setFunction(std::function<void(int)> f);
//! \~english Wait for all threads stop
//! \~russian Ожидает завершения всех потоков
//! \~english Waits for the current in-flight batch started by \a start().
//! \~russian Ожидает завершения текущего запущенного пакета, начатого через \a start().
void wait();
//! \~english
//! Start functions execution with integer argument range
//! from "index_start" to "index_start + index_count - 1"
//! \~russian
//! Начинает исполнение потоков с аргументами по диапазону
//! от "index_start" до "index_start + index_count - 1"
//! \~english Starts asynchronous execution for indices in range [\a index_start, \a index_start + \a index_count).
//! \~russian Запускает асинхронное выполнение для индексов из диапазона [\a index_start, \a index_start + \a index_count).
void start(int index_start, int index_count);
//! \~english
//! Start functions execution with integer argument range
//! from "index_start" to "index_start + index_count - 1"
//! and wait for finish
//! \~russian
//! Начинает исполнение потоков с аргументами по диапазону
//! от "index_start" до "index_start + index_count - 1"
//! и ожидает завершения
//! \~english Runs the configured iteration body for indices in range [\a index_start, \a index_start + \a index_count) and waits for
//! completion.
//! \~russian Выполняет настроенное тело итерации для индексов из диапазона [\a index_start, \a index_start + \a index_count) и ожидает
//! завершения.
void exec(int index_start, int index_count);
//! \~english
//! Start functions "f" execution with integer argument range
//! from "index_start" to "index_start + index_count - 1"
//! and wait for finish
//! \~russian
//! Начинает исполнение потоками функции "f" с аргументами по диапазону
//! от "index_start" до "index_start + index_count - 1"
//! и ожидает завершения
//! \~english Sets iteration body to \a f, runs it for indices in range [\a index_start, \a index_start + \a index_count), and waits for
//! completion.
//! \~russian Устанавливает тело итерации \a f, выполняет его для индексов из диапазона [\a index_start, \a index_start + \a
//! index_count) и ожидает завершения.
void exec(int index_start, int index_count, std::function<void(int)> f);
private:

View File

@@ -0,0 +1,238 @@
/*
PIP - Platform Independent Primitives
Ivan Pelipenko, Stephan Fomenko
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pithreadpoolworker.h"
#include "pisysteminfo.h"
//! \addtogroup Thread
//! \{
//! \class PIThreadPoolWorker pithreadpoolworker.h
//! \~\details
//! \~english
//! \PIThreadPoolWorker is a class that implements a fixed-size pool of worker threads for general-purpose task execution. It
//! allows starting threads, enqueuing tasks as functors or class member methods, and managing their execution. The class provides methods
//! to wait for all tasks or specific tasks by ID to complete, as well as to gracefully stop the pool with timeouts. The lifecycle of each
//! task can be monitored using the taskStarted and taskFinished events. It is a versatile tool for multithreaded asynchronous operations.
//! \~russian
//! PIThreadPoolWorker — это класс, реализующий фиксированный пул рабочих потоков для выполнения задач общего назначения. Он
//! позволяет запускать потоки, добавлять в очередь задачи в виде функторов или методов класса, а также управлять их выполнением. Класс
//! предоставляет методы для ожидания завершения всех задач или отдельных задач по их идентификатору, а также для корректной остановки пула
//! с таймаутами. С помощью событий taskStarted и taskFinished можно отслеживать жизненный цикл каждой задачи. Это универсальный инструмент
//! для многопоточного асинхронного выполнения операций.
//!
//! \}
PIThreadPoolWorker::PIThreadPoolWorker(int threads_count) {
if (threads_count < 0) threads_count = PISystemInfo::instance()->processorsCount;
assertm(threads_count > 0, "Invalid threads count!");
for (int i = 0; i < threads_count; ++i) {
Worker * w = new Worker();
w->thread.setSlot([this, w]() { threadFunc(w); });
workers << w;
}
}
PIThreadPoolWorker::~PIThreadPoolWorker() {
piDeleteAllAndClear(workers);
}
void PIThreadPoolWorker::start() {
for (auto w: workers) {
w->thread.start();
w->notifier.notify();
}
}
void PIThreadPoolWorker::stop() {
for (auto w: workers) {
w->thread.stop();
w->notifier.notify();
}
}
bool PIThreadPoolWorker::stopAndWait(PISystemTime timeout) {
stop();
return waitForFinish(timeout);
}
bool PIThreadPoolWorker::waitForStart(PISystemTime timeout) {
PITimeMeasurer tm;
for (auto w: workers) {
if (timeout.isNull())
w->thread.waitForStart();
else {
auto remains = timeout - tm.elapsed();
if (remains.isNegative()) return false;
if (!w->thread.waitForStart(remains)) return false;
}
}
return true;
}
bool PIThreadPoolWorker::waitForFinish(PISystemTime timeout) {
PITimeMeasurer tm;
for (auto w: workers) {
if (timeout.isNull())
w->thread.waitForFinish();
else {
auto remains = timeout - tm.elapsed();
if (remains.isNegative()) return false;
if (!w->thread.waitForFinish(remains)) return false;
}
}
return true;
}
bool PIThreadPoolWorker::isRunning() const {
return workers.every([](Worker * w) { return w->thread.isRunning(); });
}
bool PIThreadPoolWorker::waitForTasks(PISystemTime timeout) {
if (!isRunning()) return tasks_queue.getRef()->isEmpty();
auto checkWorking = [this] {
if (tasks_queue.getRef()->isNotEmpty()) return true;
for (auto * w: workers)
if (w->in_work > 0) return true;
return false;
};
if (timeout.isNull()) {
for (;;) {
if (!checkWorking()) break;
piMinSleep();
}
return true;
}
PITimeMeasurer tm;
while (tm.elapsed() < timeout) {
if (!checkWorking()) return true;
piMinSleep();
}
return tm.elapsed() < timeout;
}
bool PIThreadPoolWorker::waitForTask(int64_t id, PISystemTime timeout) {
if (!isRunning()) return tasks_queue.getRef()->every([id](const Task & t) { return t.id != id; });
auto checkWorking = [this, id] {
if (tasks_queue.getRef()->any([id](const Task & t) { return t.id == id; })) return true;
for (auto * w: workers)
if (w->in_work == id) return true;
return false;
};
if (timeout.isNull()) {
for (;;) {
if (!checkWorking()) break;
piMinSleep();
}
return true;
}
PITimeMeasurer tm;
while (tm.elapsed() < timeout) {
if (!checkWorking()) return true;
piMinSleep();
}
return tm.elapsed() < timeout;
}
void PIThreadPoolWorker::exec() {
start();
waitForStart();
waitForTasks();
stopAndWait();
}
int64_t PIThreadPoolWorker::enqueueTask(std::function<void(int64_t)> func, PIObject * context) {
if (context) {
if (!contexts[context]) {
contexts << context;
CONNECTL(context, deleted, ([this, context](PIObject *) {
contexts.remove(context);
auto qref = tasks_queue.getRef();
// auto prev_size = qref->size();
// piCout << "deleted" << (void *)context << qref->map<void *>([](const Task & t) { return t.context; });
qref->removeWhere([context](const Task & t) { return t.context == context; });
// piCout << prev_size << qref->size() << qref->map<void *>([](const Task & t) { return t.context; });
}));
}
}
int64_t id = ++next_task_id;
Task task;
task.id = id;
task.func = std::move(func);
task.context = context;
tasks_queue.getRef()->enqueue(std::move(task));
for (auto * w: workers)
w->notifier.notify();
return id;
}
bool PIThreadPoolWorker::removeTask(int64_t id) {
auto qref = tasks_queue.getRef();
auto prev_size = qref->size();
qref->removeWhere([id](const Task & t) { return t.id == id; });
return prev_size != qref->size();
}
void PIThreadPoolWorker::clearTasks() {
tasks_queue.getRef()->clear();
}
PIThreadPoolWorker::TaskStatus PIThreadPoolWorker::taskStatus(int64_t id) const {
if (id <= 0) return TaskStatus::Unknown;
auto qref = tasks_queue.getRef();
for (auto w: workers)
if (w->in_work == id) return TaskStatus::InProgress;
for (const auto & t: *qref)
if (t.id == id) return TaskStatus::Enqueued;
return id <= next_task_id ? TaskStatus::DoneOrCancelled : TaskStatus::Unknown;
}
void PIThreadPoolWorker::threadFunc(Worker * w) {
w->notifier.wait();
if (w->thread.isStopping()) return;
Task task;
{
auto ref = tasks_queue.getRef();
if (ref->isEmpty()) return;
task = ref->dequeue();
}
if (!task.isValid()) return;
w->in_work = task.id;
taskStarted(task.id);
task.func(task.id);
w->in_work = -1;
taskFinished(task.id);
w->notifier.notify();
}

View File

@@ -0,0 +1,177 @@
//! \~\file pithreadpoolworker.h
//! \~\ingroup Thread
//! \brief
//! \~english Thread pool worker
//! \~russian Исполнитель пула потоков
/*
PIP - Platform Independent Primitives
Ivan Pelipenko, Stephan Fomenko
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PITHREADPOOLWORKER_H
#define PITHREADPOOLWORKER_H
#include "piprotectedvariable.h"
#include "pithread.h"
//! \~\ingroup Thread
//! \~\brief
//! \~english Fixed-size pool of worker threads for generic-purpose tasks.
//! \~russian Фиксированный пул рабочих потоков для задач общего назначения.
class PIP_EXPORT PIThreadPoolWorker: public PIObject {
PIOBJECT(PIThreadPoolWorker)
public:
//! \~english Constructs executor with \a threads_count worker threads. If \a threads_count < 0 processor threads count used.
//! \~russian Создает исполнитель с \a threads_count рабочими потоками. Если \a threads_count < 0 используется количество потоков
//! процессора.
explicit PIThreadPoolWorker(int threads_count = -1);
//! \~english Destroy worker threads. Call \a stopAndWait() before.
//! \~russian Уничтожает рабочие потоки. Вызывайте перед этим \a stopAndWait().
virtual ~PIThreadPoolWorker();
//! \~english Task status.
//! \~russian Статус задачи.
enum class TaskStatus {
Unknown /** \~english ID <= 0 or not queued yet \~russian ID <= 0 или не поставлена в очередь */,
Enqueued /** \~english Wait for execution \~russian Ожидает выполнения */,
InProgress /** \~english In execution now \~russian В процессе выполнения */,
DoneOrCancelled /** \~english Done or cancelled \~russian Выполнена или отменена */
};
//! \~english Starts the threads.
//! \~russian Запускает потоки.
void start();
//! \~english Requests graceful threads shutdown.
//! \~russian Запрашивает корректное завершение потоков.
void stop();
//! \~english Requests stop and waits for threads completion. Returns \b false if the timeout expires.
//! \~russian Запрашивает остановку и ожидает завершения потоков. Возвращает \b false, если таймаут истек.
bool stopAndWait(PISystemTime timeout = {});
//! \~english Waits until the threads starts. Returns \b false if the timeout expires first.
//! \~russian Ожидает запуска потоков. Возвращает \b false, если таймаут истек раньше.
bool waitForStart(PISystemTime timeout = {});
//! \~english Waits for threads completion. Returns \b false if the timeout expires first.
//! \~russian Ожидает завершения потоков. Возвращает \b false, если таймаут истек раньше.
bool waitForFinish(PISystemTime timeout = {});
//! \~english Returns whether the threads are currently running.
//! \~russian Возвращает, выполняются ли потоки в данный момент.
bool isRunning() const;
//! \~english Waits for all tasks completion. Returns \b false if the timeout expires first.
//! \~russian Ожидает завершения всех задач. Возвращает \b false, если таймаут истек раньше.
bool waitForTasks(PISystemTime timeout = {});
//! \~english Waits for task with id \a id completion. Returns \b false if the timeout expires first.
//! \~russian Ожидает завершения задачи с id \a id. Возвращает \b false, если таймаут истек раньше.
bool waitForTask(int64_t id, PISystemTime timeout = {});
//! \~english Starts threads, wait for all tasks complete and threads stop.
//! \~russian Запускает потоки, ожидает завершения всех задач и остановки потоков.
void exec();
//! \~english Queue functor to execution. Pass task ID in functor. Returns task ID.
//! \~russian Запланировать функтор на выполнение. В функтор передастся ID задачи. Возвращает ID задачи.
int64_t enqueueTask(std::function<void(int64_t)> func, PIObject * context = nullptr);
//! \~english Queue functor to execution. Returns task ID.
//! \~russian Запланировать функтор на выполнение. Возвращает ID задачи.
int64_t enqueueTask(std::function<void()> func, PIObject * context = nullptr) {
return enqueueTask([func](int64_t) { func(); }, context);
}
//! \~english Queue class member method to execution. Pass task ID in method. Returns task ID.
//! \~russian Запланировать член-метод класса на выполнение. В метод передастся ID задачи. Возвращает ID задачи.
template<typename O>
int64_t enqueueTask(O * obj, void (O::*member_func)(int64_t)) {
return enqueueTask([obj, member_func](int64_t id) { (obj->*member_func)(id); },
PIObject::isPIObject(obj) ? dynamic_cast<PIObject *>(obj) : nullptr);
}
//! \~english Queue class member method to execution. Returns task ID.
//! \~russian Запланировать член-метод класса на выполнение. Возвращает ID задачи.
template<typename O>
int64_t enqueueTask(O * obj, void (O::*member_func)()) {
return enqueueTask([obj, member_func](int64_t) { (obj->*member_func)(); },
PIObject::isPIObject(obj) ? dynamic_cast<PIObject *>(obj) : nullptr);
}
//! \~english Remove task with id \a id from queue. Returns if task delete.
//! \~russian Удаляет задачу с id \a id из очереди. Возвращиает была ли задача удалена.
bool removeTask(int64_t id);
//! \~english Remove all queued tasks.
//! \~russian Удаляет все задачи из очереди.
void clearTasks();
//! \~english Returns task status with id \a id.
//! \~russian Возвращиает статус задачи с id \a id.
TaskStatus taskStatus(int64_t id) const;
//! \events
//! \{
//! \fn void taskStarted(int64_t id)
//! \brief
//! \~english Raised on start execution task with id \a id.
//! \~russian Вызывается при старте выполнения задачи с id \a id.
EVENT1(taskStarted, int64_t, id);
//! \fn void taskFinished(int64_t id)
//! \brief
//! \~english Raised on finish execution task with id \a id.
//! \~russian Вызывается при завершении выполнения задачи с id \a id.
EVENT1(taskFinished, int64_t, id);
//! \}
private:
struct Worker {
PIThread thread;
PIThreadNotifier notifier;
std::atomic_int64_t in_work = {-1};
};
struct Task {
bool isValid() const { return func != nullptr; }
PIObject * context = nullptr;
std::function<void(int64_t)> func = nullptr;
int64_t id = -1;
};
void threadFunc(Worker * w);
mutable PIVector<Worker *> workers;
mutable PIProtectedVariable<PIQueue<Task>> tasks_queue;
PISet<PIObject *> contexts;
std::atomic_int64_t next_task_id = {0};
};
#endif // PITHREADPOOLWORKER_H

View File

@@ -123,13 +123,13 @@ PITimer::PITimer(): PIObject() {
PITimer::PITimer(std::function<void(int)> func) {
initFirst();
ret_func = func;
ret_func_delim = std::move(func);
}
PITimer::PITimer(std::function<void()> func) {
initFirst();
ret_func = [func](int) { func(); };
ret_func = std::move(func);
}
@@ -225,7 +225,8 @@ void PITimer::adjustTimes() {
void PITimer::execTick() {
if (!isRunning()) return;
if (lockRun) lock();
if (ret_func) ret_func(1);
if (ret_func) ret_func();
if (ret_func_delim) ret_func_delim(1);
tick(1);
tickEvent(1);
if (callEvents) maybeCallQueuedEvents();
@@ -234,8 +235,8 @@ void PITimer::execTick() {
i.tick = 0;
if (i.func)
i.func(i.delim);
else if (ret_func)
ret_func(i.delim);
else if (ret_func_delim)
ret_func_delim(i.delim);
tick(i.delim);
tickEvent(i.delim);
}
@@ -263,7 +264,7 @@ bool PITimer::start(PISystemTime interval) {
bool PITimer::start(PISystemTime interval, std::function<void()> func) {
if (isRunning()) stopAndWait();
setInterval(interval);
setSlot(func);
setSlot(std::move(func));
return start();
}
@@ -274,8 +275,20 @@ void PITimer::stopAndWait(PISystemTime timeout) {
}
void PITimer::setSlot(std::function<void()> func) {
ret_func_delim = nullptr;
ret_func = std::move(func);
}
void PITimer::setSlot(std::function<void(int)> func) {
ret_func = nullptr;
ret_func_delim = std::move(func);
}
void PITimer::addDelimiter(int delim, std::function<void(int)> func) {
delims << Delimiter(func, delim);
delims << Delimiter(std::move(func), delim);
}

View File

@@ -1,8 +1,19 @@
/*! \file pitimer.h
* \ingroup Thread
* \~\brief
* \~english Timer
* \~russian Таймер
* \~english Timer object backed by an internal thread
* \~russian Объект таймера, работающий на внутреннем потоке
*
* \~\details
* \~english
* %PITimer uses an internal %PIThread to generate best-effort periodic ticks.
* When queued delivery is enabled, the timer also drains events addressed to it
* as performer through \a maybeCallQueuedEvents() on the main tick.
* \~russian
* %PITimer использует внутренний %PIThread для генерации периодических тиков
* с практической, но не строго гарантированной точностью. Когда включена
* отложенная доставка, таймер также обрабатывает события, адресованные ему как
* исполнителю, через \a maybeCallQueuedEvents() на основном тике.
*/
/*
PIP - Platform Independent Primitives
@@ -34,133 +45,157 @@
class PIThread;
#ifndef PIP_NO_THREADS
//! \~\ingroup Thread
//! \~\brief
//! \~english Periodic timer that emits ticks from an internal worker thread.
//! \~russian Периодический таймер, который выдает тики из внутреннего рабочего потока.
//! \~\details
//! \~english
//! The main tick uses delimiter value 1. Additional frequency delimiters can
//! request extra callbacks every \a n ticks. This class does not promise exact
//! wake-up timing or scheduler fairness.
//! \~russian
//! Основной тик использует значение делителя 1. Дополнительные делители частоты
//! позволяют запрашивать дополнительные вызовы каждые \a n тиков. Этот класс не
//! обещает точное время пробуждения или справедливость планировщика.
class PIP_EXPORT PITimer: public PIObject {
PIOBJECT_SUBCLASS(PITimer, PIObject);
public:
NO_COPY_CLASS(PITimer);
//! \~english Constructs timer
//! \~russian Создает таймер
//! \~english Constructs a timer without a tick callback.
//! \~russian Создает таймер без обратного вызова тика.
explicit PITimer();
//! \~english Constructs timer with method void(int)
//! \~russian Создает таймер с функцией void(int)
//! \~english Constructs a timer with a callback that receives the delimiter value.
//! \~russian Создает таймер с обратным вызовом, принимающим значение делителя.
explicit PITimer(std::function<void(int)> func);
//! \~english Constructs timer with method void()
//! \~russian Создает таймер с функцией void()
//! \~english Constructs a timer with a callback that ignores the delimiter value.
//! \~russian Создает таймер с обратным вызовом, игнорирующим значение делителя.
explicit PITimer(std::function<void()> func);
//! \~english Destroys the timer and stops its internal worker thread.
//! \~russian Уничтожает таймер и останавливает его внутренний рабочий поток.
virtual ~PITimer();
//! \~english Returns timer loop delay
//! \~russian Возвращает задержку цикла таймера
//! \~english Returns the configured base interval between main ticks.
//! \~russian Возвращает настроенный базовый интервал между основными тиками.
PISystemTime interval() const;
//! \~english Set timer loop delay
//! \~russian Установить интервал таймера
//! \~english Sets the base interval. If the timer is running, restarts it with the new value.
//! \~russian Устанавливает базовый интервал. Если таймер запущен, перезапускает его с новым значением.
void setInterval(PISystemTime interval);
//! \~english Returns if timer is started
//! \~russian Возвращает работает ли таймер
//! \~english Returns whether the timer thread is currently running.
//! \~russian Возвращает, работает ли сейчас поток таймера.
bool isRunning() const;
//! \~english Return if timer is stopping
//! \~russian Возвращает останавливается ли таймер
//! \~english Returns whether stop has been requested and shutdown is still in progress.
//! \~russian Возвращает, был ли запрошен останов и идет ли еще завершение.
bool isStopping() const;
//! \~english Wait for timer stop
//! \~russian Ожидает остановки таймера
//! \~english Waits until the timer finishes. Returns \b false if the timeout expires first.
//! \~russian Ожидает завершения таймера. Возвращает \b false, если таймаут истек раньше.
bool waitForFinish(PISystemTime timeout = {});
//! \~english Start timer with "interval" loop delay
//! \~russian Запустить таймер с интервалом "interval"
//! \~english Sets the base interval and starts the timer.
//! \~russian Устанавливает базовый интервал и запускает таймер.
bool start(PISystemTime interval);
//! \~english Start timer with "interval" loop delay and tick function "func"
//! \~russian Запустить таймер с интервалом "interval" и вызываевымым методом "func"
//! \~english Sets the base interval and tick callback, then starts the timer.
//! \~russian Устанавливает базовый интервал и обратный вызов тика, затем запускает таймер.
bool start(PISystemTime interval, std::function<void()> func);
//! \~english Deprecated overload of \a start() that accepts milliseconds.
//! \~russian Устаревшая перегрузка \a start(), принимающая миллисекунды.
bool start(double interval_ms) DEPRECATEDM("use start(PISystemTime)") { return start(PISystemTime::fromMilliseconds(interval_ms)); }
EVENT_HANDLER0(bool, start);
EVENT_HANDLER0(bool, restart);
EVENT_HANDLER0(void, stop);
//! \~english Stop timer and wait for finish.
//! \~russian Останавливает таймер и ожидает завершения.
//! \~english Deprecated overload of \a stopAndWait() that accepts milliseconds.
//! \~russian Устаревшая перегрузка \a stopAndWait(), принимающая миллисекунды.
void stopAndWait(int timeout_ms) { stopAndWait(PISystemTime::fromMilliseconds(timeout_ms)); }
//! \~english Stop timer and wait for finish.
//! \~russian Останавливает таймер и ожидает завершения.
//! \~english Requests stop and waits for the worker thread to finish.
//! \~russian Запрашивает остановку и ожидает завершения рабочего потока.
void stopAndWait(PISystemTime timeout = {});
//! \~english Set timer tick function
//! \~russian Установить вызываемый метод
void setSlot(std::function<void()> func) {
ret_func = [func](int) { func(); };
}
//! \~english Sets a tick callback that ignores the delimiter value.
//! \~russian Устанавливает обратный вызов тика, игнорирующий значение делителя.
void setSlot(std::function<void()> func);
//! \~english Set timer tick function
//! \~russian Установить вызываемый метод
void setSlot(std::function<void(int)> func) { ret_func = func; }
//! \~english Sets a tick callback that receives the current delimiter value.
//! \~russian Устанавливает обратный вызов тика, принимающий текущее значение делителя.
void setSlot(std::function<void(int)> func);
//! \~english Enables locking of the internal mutex around tick processing.
//! \~russian Включает блокировку внутреннего мьютекса вокруг обработки тиков.
void needLockRun(bool need) { lockRun = need; }
EVENT_HANDLER0(void, lock) { mutex_.lock(); }
EVENT_HANDLER0(void, unlock) { mutex_.unlock(); }
//! \~english Returns if timer should exec \a maybeCallQueuedEvents() at every tick. By default \b true
//! \~russian Возвращает должен ли таймер вызывать \a maybeCallQueuedEvents() каждый тик. По умолчанию \b true
//! \~english Returns whether the timer drains queued delivery for itself as performer on each main tick. By default \b true.
//! \~russian Возвращает, должен ли таймер обрабатывать отложенную доставку для себя как исполнителя на каждом основном тике. По
//! умолчанию \b true.
bool isCallQueuedEvents() const { return callEvents; }
//! \~english Set timer exec \a maybeCallQueuedEvents() at every tick
//! \~russian Установает должен ли таймер вызывать \a maybeCallQueuedEvents() каждый тик
//! \~english Enables or disables queued-delivery draining through \a maybeCallQueuedEvents() on each main tick.
//! \~russian Включает или отключает обработку отложенной доставки через \a maybeCallQueuedEvents() на каждом основном тике.
void setCallQueuedEvents(bool yes) { callEvents = yes; }
//! \~english Add frequency delimiter "delim" with optional delimiter slot "slot"
//! \~russian Добавляет делитель частоты "delim" с необязательным методом "slot"
//! \~english Adds a frequency delimiter that invokes \a func every \a delim main ticks.
//! \~russian Добавляет делитель частоты, который вызывает \a func каждые \a delim основных тиков.
void addDelimiter(int delim, std::function<void(int)> func = nullptr);
//! \~english Add frequency delimiter "delim" with optional delimiter slot "slot"
//! \~russian Добавляет делитель частоты "delim" с необязательным методом "slot"
//! \~english Adds a delimiter with a callback that ignores the delimiter value.
//! \~russian Добавляет делитель с обратным вызовом, игнорирующим значение делителя.
void addDelimiter(int delim, std::function<void()> func);
//! \~english Add frequency delimiter "delim" with optional delimiter slot "slot"
//! \~russian Добавляет делитель частоты "delim" с необязательным методом "slot"
//! \~english Adds a delimiter with a pointer-based callback signature.
//! \~russian Добавляет делитель с сигнатурой обратного вызова, принимающей указатель.
void addDelimiter(int delim, std::function<void(void *)> slot);
//! \~english Remove all frequency delimiters "delim"
//! \~russian Удаляет все делители частоты "delim"
//! \~english Removes all delimiters with value \a delim.
//! \~russian Удаляет все делители со значением \a delim.
void removeDelimiter(int delim);
EVENT_HANDLER0(void, clearDelimiters) { delims.clear(); }
EVENT1(tickEvent, int, delimiter);
//! \handlers
//! \{
//! \fn bool start()
//! \brief
//! \~english Start timer with \a interval() loop delay
//! \~russian Запустить таймер с интервалом \a interval()
//! \~english Starts the timer with the current \a interval().
//! \~russian Запускает таймер с текущим значением \a interval().
EVENT_HANDLER0(bool, start);
//! \fn bool restart()
//! \brief
//! \~english Stop and start timer with \a interval() loop delay
//! \~russian Остановить и запустить таймер с интервалом \a interval()
//! \~english Stops the timer, then starts it again with the current \a interval().
//! \~russian Останавливает таймер, затем снова запускает его с текущим значением \a interval().
EVENT_HANDLER0(bool, restart);
//! \fn bool stop()
//! \brief
//! \~english Stop timer (don`t wait for finish)
//! \~russian Остановить таймер (не дожидается остановки)
//! \~english Requests stop and wakes the timer thread, but does not wait for completion.
//! \~russian Запрашивает остановку и пробуждает поток таймера, но не ожидает завершения.
EVENT_HANDLER0(void, stop);
//! \fn void clearDelimiters()
//! \brief
//! \~english Remove all frequency delimiters
//! \~russian Удаляет все делители частоты
EVENT_HANDLER0(void, clearDelimiters) { delims.clear(); }
//! \fn void lock()
//! \brief
//! \~english Locks the internal timer mutex.
//! \~russian Блокирует внутренний мьютекс таймера.
EVENT_HANDLER0(void, lock) { mutex_.lock(); }
//! \fn void unlock()
//! \brief
//! \~english Unlocks the internal timer mutex.
//! \~russian Разблокирует внутренний мьютекс таймера.
EVENT_HANDLER0(void, unlock) { mutex_.unlock(); }
//! \}
//! \events
@@ -168,21 +203,21 @@ public:
//! \fn void tickEvent(int delimiter)
//! \brief
//! \~english Raise on timer tick
//! \~russian Вызывается каждый тик таймера
//! \~english Raised on each timer tick and on configured delimiter ticks.
//! \~russian Вызывается на каждом тике таймера и на настроенных тиках делителей.
//! \~\details
//! \~english
//! "delimiter" is frequency delimiter, 1 for main loop.
//! "delimiter" is the frequency delimiter, 1 for the main loop.
//! \~russian
//! "delimiter" - делитель частоты, 1 для основного цикла
//! "delimiter" - делитель частоты, 1 для основного цикла.
EVENT1(tickEvent, int, delimiter);
//! \}
protected:
struct PIP_EXPORT Delimiter {
Delimiter(std::function<void(int)> func_ = nullptr, int delim_ = 1) {
func = func_;
func = std::move(func_);
delim = delim_;
}
std::function<void(int)> func;
@@ -197,7 +232,8 @@ protected:
void adjustTimes();
void execTick();
//! Timer execution function, similar to "slot" or event \a timeout(). By default does nothing
//! \~english Virtual tick method. The main loop passes delimiter 1, additional delimiters pass their own value.
//! \~russian Виртуальный метод тика. Основной цикл передает делитель 1, дополнительные делители передают свое значение.
virtual void tick(int delimiter) {}
PIThread * thread = nullptr;
@@ -205,7 +241,8 @@ protected:
PIMutex mutex_;
PISystemTime m_interval, m_interval_x5;
PISystemTime m_time_next;
std::function<void(int)> ret_func = nullptr;
std::function<void()> ret_func = nullptr;
std::function<void(int)> ret_func_delim = nullptr;
PIVector<Delimiter> delims;
PIConditionVariable event;
};