tree changes

This commit is contained in:
2020-08-19 00:47:05 +03:00
parent c582d8ff46
commit ccd6a9888f
240 changed files with 30 additions and 12 deletions

View File

@@ -0,0 +1,218 @@
/*
PIP - Platform Independent Primitives
Stephan Fomenko
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PIBLOCKINGQUEUE_H
#define PIBLOCKINGQUEUE_H
#include "pideque.h"
#include "piconditionvar.h"
/**
* @brief A Queue that supports operations that wait for the queue to become non-empty when retrieving an element, and
* wait for space to become available in the queue when storing an element.
*/
template <typename T>
class PIBlockingQueue: private PIQueue<T> {
public:
/**
* @brief Constructor
*/
explicit inline PIBlockingQueue(size_t capacity = SIZE_MAX,
PIConditionVariable* cond_var_add = new PIConditionVariable(),
PIConditionVariable* cond_var_rem = new PIConditionVariable())
: cond_var_add(cond_var_add), cond_var_rem(cond_var_rem), max_size(capacity) { }
/**
* @brief Copy constructor. Initialize queue with copy of other queue elements. Not thread-safe for other queue.
*/
explicit inline PIBlockingQueue(const PIDeque<T>& other) : cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) {
mutex.lock();
max_size = SIZE_MAX;
PIDeque<T>::append(other);
mutex.unlock();
}
/**
* @brief Thread-safe copy constructor. Initialize queue with copy of other queue elements.
*/
inline PIBlockingQueue(PIBlockingQueue<T> & other) : cond_var_add(new PIConditionVariable()), cond_var_rem(new PIConditionVariable()) {
other.mutex.lock();
mutex.lock();
max_size = other.max_size;
PIDeque<T>::append(static_cast<PIDeque<T>&>(other));
mutex.unlock();
other.mutex.unlock();
}
~PIBlockingQueue() {
delete cond_var_add;
delete cond_var_rem;
}
/**
* @brief Inserts the specified element into this queue, waiting if necessary for space to become available.
*
* @param v the element to add
*/
PIBlockingQueue<T> & put(const T & v) {
mutex.lock();
cond_var_rem->wait(mutex, [&]() { return PIDeque<T>::size() < max_size; });
PIDeque<T>::push_back(v);
mutex.unlock();
cond_var_add->notifyOne();
return *this;
}
PIBlockingQueue<T> & enqueue(const T & v) {return put(v);}
/**
* @brief Inserts the specified element at the end of this queue if it is possible to do so immediately without
* exceeding the queue's capacity, returning true upon success and false if this queue is full.
*
* @param v the element to add
* @return true if the element was added to this queue, else false
*/
bool offer(const T & v, int timeoutMs = 0) {
bool isOk;
mutex.lock();
if (timeoutMs == 0)
isOk = PIDeque<T>::size() < max_size;
else
isOk = cond_var_rem->waitFor(mutex, timeoutMs, [&]() { return PIDeque<T>::size() < max_size; } );
if (isOk) PIDeque<T>::push_back(v);
mutex.unlock();
if (isOk) cond_var_add->notifyOne();
return isOk;
}
/**
* @brief Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.
*
* @return the head of this queue
*/
T take() {
T t;
mutex.lock();
cond_var_add->wait(mutex, [&]() { return !PIDeque<T>::isEmpty(); });
t = T(PIDeque<T>::take_front());
mutex.unlock();
cond_var_rem->notifyOne();
return t;
}
T dequeue() {return take();}
/**
* @brief Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an
* element to become available.
*
* @param timeoutMs how long to wait before giving up, in milliseconds
* @param defaultVal value, which returns if the specified waiting time elapses before an element is available
* @param isOk flag, which indicates result of method execution. It will be set to false if timeout, or true if
* return value is retrieved value
* @return the head of this queue, or defaultVal if the specified waiting time elapses before an element is available
*/
T poll(int timeoutMs = 0, const T & defaultVal = T(), bool * isOk = nullptr) {
T t = defaultVal;
bool isNotEmpty;
mutex.lock();
if (timeoutMs == 0)
isNotEmpty = !PIDeque<T>::isEmpty();
else
isNotEmpty = cond_var_add->waitFor(mutex, timeoutMs, [&]() { return !PIDeque<T>::isEmpty(); });
if (isNotEmpty) t = PIDeque<T>::take_front();
mutex.unlock();
if (isNotEmpty) cond_var_rem->notifyOne();
if (isOk) *isOk = isNotEmpty;
return t;
}
/**
* @brief Returns the number of elements that this queue can ideally (in the absence of memory or resource
* constraints) contains. This is always equal to the initial capacity of this queue less the current size of this queue.
*
* @return the capacity
*/
size_t capacity() {
size_t c;
mutex.lock();
c = max_size;
mutex.unlock();
return c;
}
/**
* @brief Returns the number of additional elements that this queue can ideally (in the absence of memory or resource
* constraints) accept. This is always equal to the initial capacity of this queue less the current size of this queue.
*
* @return the remaining capacity
*/
size_t remainingCapacity() {
mutex.lock();
size_t c = max_size - PIDeque<T>::size();
mutex.unlock();
return c;
}
/**
* @brief Returns the number of elements in this collection.
*/
size_t size() {
mutex.lock();
size_t s = PIDeque<T>::size();
mutex.unlock();
return s;
}
/**
* @brief Removes all available elements from this queue and adds them to other given queue.
*/
size_t drainTo(PIDeque<T>& other, size_t maxCount = SIZE_MAX) {
mutex.lock();
size_t count = ((maxCount > PIDeque<T>::size()) ? PIDeque<T>::size() : maxCount);
for (size_t i = 0; i < count; ++i) other.push_back(PIDeque<T>::take_front());
mutex.unlock();
return count;
}
/**
* @brief Removes all available elements from this queue and adds them to other given queue.
*/
size_t drainTo(PIBlockingQueue<T>& other, size_t maxCount = SIZE_MAX) {
mutex.lock();
other.mutex.lock();
size_t count = maxCount > PIDeque<T>::size() ? PIDeque<T>::size() : maxCount;
size_t otherRemainingCapacity = other.max_size - static_cast<PIDeque<T> >(other).size();
if (count > otherRemainingCapacity) count = otherRemainingCapacity;
for (size_t i = 0; i < count; ++i) other.push_back(PIDeque<T>::take_front());
other.mutex.unlock();
mutex.unlock();
return count;
}
private:
PIMutex mutex;
PIConditionVariable * cond_var_add, * cond_var_rem;
size_t max_size;
};
#endif // PIBLOCKINGQUEUE_H

View File

@@ -0,0 +1,141 @@
/*
PIP - Platform Independent Primitives
Stephan Fomenko
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "piconditionvar.h"
#include "pithread.h"
#include "pitime.h"
#ifdef WINDOWS
# undef _WIN32_WINNT
# define _WIN32_WINNT 0x0600
# include <synchapi.h>
# include <windef.h>
# include <winbase.h>
#endif
PRIVATE_DEFINITION_START(PIConditionVariable)
#ifdef WINDOWS
CONDITION_VARIABLE nativeHandle;
#else
pthread_cond_t nativeHandle;
PIMutex * currentLock;
#endif
bool isDestroying;
PRIVATE_DEFINITION_END(PIConditionVariable)
PIConditionVariable::PIConditionVariable() {
#ifdef WINDOWS
InitializeConditionVariable(&PRIVATE->nativeHandle);
#else
PRIVATE->isDestroying = false;
PRIVATE->currentLock = nullptr;
memset(&(PRIVATE->nativeHandle), 0, sizeof(PRIVATE->nativeHandle));
pthread_cond_init(&PRIVATE->nativeHandle, NULL);
#endif
}
PIConditionVariable::~PIConditionVariable() {
#ifdef WINDOWS
#else
pthread_cond_destroy(&PRIVATE->nativeHandle);
#endif
}
void PIConditionVariable::wait(PIMutex& lk) {
#ifdef WINDOWS
SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), INFINITE);
#else
pthread_cond_wait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle());
#endif
}
void PIConditionVariable::wait(PIMutex& lk, const std::function<bool()>& condition) {
bool isCondition;
while (true) {
isCondition = condition();
if (isCondition) break;
#ifdef WINDOWS
SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), INFINITE);
#else
pthread_cond_wait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle());
#endif
if (PRIVATE->isDestroying) return;
}
}
bool PIConditionVariable::waitFor(PIMutex &lk, int timeoutMs) {
bool isNotTimeout;
#ifdef WINDOWS
isNotTimeout = SleepConditionVariableCS(&PRIVATE->nativeHandle, (PCRITICAL_SECTION)lk.handle(), timeoutMs) != 0;
#else
timespec abstime = {.tv_sec = timeoutMs / 1000, .tv_nsec = timeoutMs * 1000 * 1000};
isNotTimeout = pthread_cond_timedwait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle(), &abstime) == 0;
#endif
if (PRIVATE->isDestroying) return false;
return isNotTimeout;
}
bool PIConditionVariable::waitFor(PIMutex& lk, int timeoutMs, const std::function<bool()> &condition) {
bool isCondition;
PITimeMeasurer measurer;
while (true) {
isCondition = condition();
if (isCondition) break;
#ifdef WINDOWS
WINBOOL isTimeout = SleepConditionVariableCS(
&PRIVATE->nativeHandle,
(PCRITICAL_SECTION)lk.handle(),
timeoutMs - (int)measurer.elapsed_m());
if (isTimeout == 0) return false;
#else
int timeoutCurr = timeoutMs - (int)measurer.elapsed_m();
timespec abstime = {.tv_sec = timeoutCurr / 1000, .tv_nsec = timeoutCurr * 1000 * 1000};
bool isTimeout = pthread_cond_timedwait(&PRIVATE->nativeHandle, (pthread_mutex_t*)lk.handle(), &abstime) == 0;
if (isTimeout) return false;
#endif
if (PRIVATE->isDestroying) return false;
}
return true;
}
void PIConditionVariable::notifyOne() {
#ifdef WINDOWS
WakeConditionVariable(&PRIVATE->nativeHandle);
#else
pthread_cond_signal(&PRIVATE->nativeHandle);
#endif
}
void PIConditionVariable::notifyAll() {
#ifdef WINDOWS
WakeAllConditionVariable(&PRIVATE->nativeHandle);
#else
pthread_cond_broadcast(&PRIVATE->nativeHandle);
#endif
}

View File

@@ -0,0 +1,118 @@
/*
PIP - Platform Independent Primitives
Stephan Fomenko
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PICONDITIONVAR_H
#define PICONDITIONVAR_H
#include "pithread.h"
/**
* @brief A condition variable is an object able to block the calling thread until notified to resume.
*
* It uses a PIMutex to lock the thread when one of its wait functions is called. The thread remains
* blocked until woken up by another thread that calls a notification function on the same PIConditionVariable object.
*/
class PIP_EXPORT PIConditionVariable {
public:
NO_COPY_CLASS(PIConditionVariable)
explicit PIConditionVariable();
virtual ~PIConditionVariable();
/**
* @brief Unblocks one of the threads currently waiting for this condition. If no threads are waiting, the function
* does nothing. If more than one, it is unspecified which of the threads is selected.
*/
void notifyOne();
/**
* @brief Unblocks all threads currently waiting for this condition. If no threads are waiting, the function does
* nothing.
*/
void notifyAll();
/**
* @brief see wait(PIMutex &, const std::function<bool()>&)
*/
virtual void wait(PIMutex & lk);
/**
* @brief Wait until notified
*
* The execution of the current thread (which shall have locked with lk method PIMutex::lock()) is blocked
* until notified.
*
* At the moment of blocking the thread, the function automatically calls lk.unlock() (PIMutex::unlock()),
* allowing other locked threads to continue.
*
* Once notified (explicitly, by some other thread), the function unblocks and calls lk.lock() (PIMutex::lock()),
* leaving lk in the same state as when the function was called. Then the function returns (notice that this last mutex
* locking may block again the thread before returning).
*
* Generally, the function is notified to wake up by a call in another thread either to member notifyOne() or to
* member notifyAll(). But certain implementations may produce spurious wake-up calls without any of these functions
* being called. Therefore, users of this function shall ensure their condition for resumption is met.
*
* If condition is specified, the function only blocks if condition returns false, and notifications can only unblock
* the thread when it becomes true (which is specially useful to check against spurious wake-up calls).
*
* @param lk lock object used by method wait for data protection
* @param condition A callable object or function that takes no arguments and returns a value that can be evaluated
* as a bool. This is called repeatedly until it evaluates to true.
*/
virtual void wait(PIMutex& lk, const std::function<bool()>& condition);
/**
* @brief see waitFor(PIMutex &, int, const std::function<bool()>&)
*/
virtual bool waitFor(PIMutex & lk, int timeoutMs);
/**
* @brief Wait for timeout or until notified
*
* The execution of the current thread (which shall have locked with lk method PIMutex::lock()) is blocked
* during timeoutMs, or until notified (if the latter happens first).
*
* At the moment of blocking the thread, the function automatically calls lk.lock() (PIMutex::lock()), allowing
* other locked threads to continue.
*
* Once notified or once timeoutMs has passed, the function unblocks and calls lk.unlock() (PIMutex::unlock()),
* leaving lk in the same state as when the function was called. Then the function returns (notice that this last
* mutex locking may block again the thread before returning).
*
* Generally, the function is notified to wake up by a call in another thread either to member notifyOne() or to
* member notifyAll(). But certain implementations may produce spurious wake-up calls without any of these functions
* being called. Therefore, users of this function shall ensure their condition for resumption is met.
*
* If condition is specified, the function only blocks if condition returns false, and notifications can only unblock
* the thread when it becomes true (which is especially useful to check against spurious wake-up calls).
*
* @param lk lock object used by method wait for data protection
* @param condition A callable object or function that takes no arguments and returns a value that can be evaluated
* as a bool. This is called repeatedly until it evaluates to true.
* @return false if timeout reached or true if wakeup condition is true
*/
virtual bool waitFor(PIMutex& lk, int timeoutMs, const std::function<bool()>& condition);
private:
PRIVATE_DECLARATION(PIP_EXPORT)
};
#endif // PICONDITIONVAR_H

View File

@@ -0,0 +1,196 @@
/*! \file pigrabberbase.h
* \brief Abstract class for create grabbers
*/
/*
PIP - Platform Independent Primitives
Abstract class for create grabbers
Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PIGRABBERBASE_H
#define PIGRABBERBASE_H
#include "pithread.h"
#include "pidiagnostics.h"
template<typename T = PIByteArray>
class PIGrabberBase: public PIThread
{
PIOBJECT_SUBCLASS(PIGrabberBase, PIThread)
public:
PIGrabberBase() {
is_opened = false;
is_recording = false;
}
virtual ~PIGrabberBase() {
stopGrabber(false);
}
virtual bool isOpened() const {return is_opened;}
virtual bool isRecording() const {return is_recording;}
virtual void startRecord(const PIString & filename) {
if (!isOpened()) return;
if (isRecording()) return;
rec_mutex.lock();
startRecordInternal(filename);
is_recording = true;
rec_mutex.unlock();
}
virtual void stopRecord() {
if (!isOpened()) return;
if (!isRecording()) return;
rec_mutex.lock();
is_recording = false;
stopRecordInternal();
rec_mutex.unlock();
}
T last() const {
T ret;
last_mutex.lock();
ret = last_;
last_mutex.unlock();
return ret;
}
bool isEmpty() {
bool ret;
que_mutex.lock();
ret = que.isEmpty();
que_mutex.unlock();
return ret;
}
int queSize() {
int ret;
que_mutex.lock();
ret = que.size();
que_mutex.unlock();
return ret;
}
T dequeue() {
T ret;
// piCoutObj << "start";
que_mutex.lock();
if (!que.isEmpty()) {
// piCoutObj << "dequeue";
ret = que.dequeue();
}
// piCoutObj << "end";
que_mutex.unlock();
return ret;
}
void stopGrabber(bool wait_forever = true) {
if (isRunning()) {
stop();
if (wait_forever) waitForFinish();
else {
if (!waitForFinish(100)) terminate();
stopRecord();
close();
}
}
}
bool open() {
bool ret = openInternal();
if (!is_opened && ret)
opened();
is_opened = ret;
return ret;
}
void close() {
bool em = is_opened;
closeInternal();
last_ = T();
if (em)
closed();
is_opened = false;
}
const PIDiagnostics & diag() const {return diag_;}
void clear() {
que_mutex.lock();
que.clear();
que_mutex.unlock();
}
void restart() {
clear();
close();
}
EVENT(dataReady)
EVENT(opened)
EVENT(closed)
protected:
virtual void init() {}
virtual bool openInternal() = 0;
virtual void closeInternal() = 0;
virtual int get(T & val) = 0;
virtual void record(const T & val) {}
virtual void startRecordInternal(const PIString & filename) {}
virtual void stopRecordInternal() {}
bool is_opened, is_recording;
mutable PIMutex rec_mutex;
private:
void begin() {
init();
}
void run() {
if (!isOpened()) {
open();
diag_.reset();
if (!is_opened)
piMSleep(200);
}
if (isOpened()) {
T c;
int ret = get(c);
if (ret < 0) {
close();
return;
}
if (ret > 0) {
piMSleep(PIP_MIN_MSLEEP);
return;
}
diag_.received(1);
que_mutex.lock();
que.enqueue(c);
que_mutex.unlock();
if (isRecording()) {
rec_mutex.lock();
record(c);
rec_mutex.unlock();
diag_.sended(1);
}
last_mutex.lock();
last_ = c;
last_mutex.unlock();
dataReady();
}
}
void end() {
stopRecord();
close();
}
T last_;
PIQueue<T> que;
PIDiagnostics diag_;
mutable PIMutex que_mutex, last_mutex;
};
#endif // PIGRABBERBASE_H

View File

@@ -0,0 +1,122 @@
/*
PIP - Platform Independent Primitives
Mutex
Ivan Pelipenko peri4ko@yandex.ru, Stephan Fomenko, Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/** \class PIMutex
* \brief Mutex
* \details
* \section PIMutex_sec0 Synopsis
* %PIMutex provides synchronization blocks between several threads.
* Using mutex guarantees execution of some code only one of threads.
* Mutex contains logic state and functions to change it: \a lock(),
* \a unlock() and \a tryLock().
*
* \section PIMutex_sec1 Usage
* Block of code that should to be executed only one thread simultaniously
* should to be started with \a lock() and ended with \a unlock().
* \snippet pimutex.cpp main
* "mutex" in this example is one for all threads.
*
* */
#include "pimutex.h"
#include "piincludes_p.h"
#ifdef WINDOWS
# include <synchapi.h>
#else
# include <pthread.h>
#endif
PRIVATE_DEFINITION_START(PIMutex)
#ifdef WINDOWS
CRITICAL_SECTION
#else
pthread_mutex_t
#endif
mutex;
PRIVATE_DEFINITION_END(PIMutex)
PIMutex::PIMutex() {
init();
}
PIMutex::~PIMutex() {
destroy();
}
void PIMutex::lock() {
#ifdef WINDOWS
EnterCriticalSection(&(PRIVATE->mutex));
#else
pthread_mutex_lock(&(PRIVATE->mutex));
#endif
}
void PIMutex::unlock() {
#ifdef WINDOWS
LeaveCriticalSection(&(PRIVATE->mutex));
#else
pthread_mutex_unlock(&(PRIVATE->mutex));
#endif
}
bool PIMutex::tryLock() {
bool ret =
#ifdef WINDOWS
(TryEnterCriticalSection(&(PRIVATE->mutex)) != 0);
#else
(pthread_mutex_trylock(&(PRIVATE->mutex)) == 0);
#endif
return ret;
}
void * PIMutex::handle() {
return (void*)&(PRIVATE->mutex);
}
void PIMutex::init() {
#ifdef WINDOWS
InitializeCriticalSection(&(PRIVATE->mutex));
#else
pthread_mutexattr_t attr;
memset(&attr, 0, sizeof(attr));
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
memset(&(PRIVATE->mutex), 0, sizeof(PRIVATE->mutex));
pthread_mutex_init(&(PRIVATE->mutex), &attr);
pthread_mutexattr_destroy(&attr);
#endif
}
void PIMutex::destroy() {
#ifdef WINDOWS
DeleteCriticalSection(&(PRIVATE->mutex));
#else
pthread_mutex_destroy(&(PRIVATE->mutex));
#endif
}

View File

@@ -0,0 +1,85 @@
/*! \file pimutex.h
* \brief PIMutexLocker
*/
/*
PIP - Platform Independent Primitives
PIMutexLocker
Ivan Pelipenko peri4ko@yandex.ru, Stephan Fomenko, Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PIMUTEX_H
#define PIMUTEX_H
#include "piinit.h"
#include <mutex>
class PIP_EXPORT PIMutex
{
public:
NO_COPY_CLASS(PIMutex)
//! Constructs unlocked mutex
explicit PIMutex();
//! Destroy mutex
~PIMutex();
//! \brief Lock mutex
//! \details If mutex is unlocked it set to locked state and returns immediate.
//! If mutex is already locked function blocks until mutex will be unlocked
void lock();
//! \brief Unlock mutex
//! \details In any case this function returns immediate
void unlock() ;
//! \brief Try to lock mutex
//! \details If mutex is unlocked it set to locked state and returns "true" immediate.
//! If mutex is already locked function returns immediate an returns "false"
bool tryLock();
void * handle();
private:
void init();
void destroy();
PRIVATE_DECLARATION(PIP_EXPORT)
};
//! \brief PIMutexLocker
//! \details Same as std::lock_guard<std::mutex>.
//! When a PIMutexLocker object is created, it attempts to lock the mutex it is given, if "condition" true.
//! When control leaves the scope in which the PIMutexLocker object was created,
//! the PIMutexLocker is destructed and the mutex is released, if "condition" true.
//! If "condition" false this class do nothing.
//! The PIMutexLocker class is non-copyable.
class PIP_EXPORT PIMutexLocker
{
public:
NO_COPY_CLASS(PIMutexLocker)
PIMutexLocker(PIMutex & m, bool condition = true): mutex(m), cond(condition) {if (cond) mutex.lock();}
~PIMutexLocker() {if (cond) mutex.unlock();}
private:
PIMutex & mutex;
bool cond;
};
#endif // PIMUTEX_H

View File

@@ -0,0 +1,168 @@
/*! \file pipipelinethread.h
* \brief Class for create multihread pipeline
*/
/*
PIP - Platform Independent Primitives
Class for create multihread pipeline
Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PIPIPELINETHREAD_H
#define PIPIPELINETHREAD_H
#include "pithread.h"
#include "piqueue.h"
template <typename Tin, typename Tout>
class PIPipelineThread : public PIThread
{
PIOBJECT_SUBCLASS(PIPipelineThread, PIThread)
public:
PIPipelineThread() {
cnt = 0;
max_size = 0;
wait_next_pipe = false;
next_overload = false;
}
~PIPipelineThread() {
stop();
if (!waitForFinish(1000)) {
piCoutObj << "terminating self thread";
terminate();
}
}
template <typename T>
void connectTo(PIPipelineThread<Tout, T> * next) {
CONNECT2(void, Tout, bool *, this, calculated, next, enqueue)
}
EVENT2(calculated, const Tout &, v, bool *, overload)
void enqueue(const Tin &v) {enqueue(v, 0);}
EVENT_HANDLER2(void, enqueue, const Tin &, v, bool *, overload) {
mutex.lock();
//piCoutObj << "enque" << overload;
if (max_size == 0 || in.size() < max_size) {
in.enqueue(v);
if (overload) *overload = false;
} else {
if (overload) *overload = true;
}
mutex.unlock();
}
const ullong * counterPtr() const {return &cnt;}
ullong counter() const {return cnt;}
bool isEmpty() {
bool ret;
mutex.lock();
ret = in.isEmpty();
mutex.unlock();
return ret;
}
int queSize() {
int ret;
mutex.lock();
ret = in.size();
mutex.unlock();
return ret;
}
void clear() {
mutex.lock();
in.clear();
next_overload = false;
mutex.unlock();
}
void stopCalc(int wait_delay = 100) {
if (isRunning()) {
stop();
if (!waitForFinish(wait_delay)) {
mutex_l.unlock();
mutex.unlock();
terminate();
}
}
}
Tout getLast() {
Tout ret;
mutex_l.lock();
ret = last;
mutex_l.unlock();
return ret;
}
uint maxQueSize() {
uint ret;
mutex.lock();
ret = max_size;
mutex.unlock();
return ret;
}
void setMaxQueSize(uint count) {
mutex.lock();
max_size = count;
if (max_size > 0 && in.size() > max_size) in.resize(max_size);
mutex.unlock();
}
bool isWaitNextPipe() {return wait_next_pipe;}
void setWaitNextPipe(bool wait) {wait_next_pipe = wait;}
protected:
virtual Tout calc(Tin &v, bool &ok) = 0;
uint max_size;
private:
void begin() {cnt = 0;}
void run() {
//piCoutObj << "run ...";
mutex.lock();
if (in.isEmpty()) {
mutex.unlock();
piMSleep(10);
//piCoutObj << "run in empty";
return;
}
if (next_overload && wait_next_pipe) {
mutex.unlock();
//piCoutObj << "wait" << &next_overload;
calculated(last, &next_overload);
piMSleep(10);
} else {
Tin t = in.dequeue();
mutex.unlock();
bool ok = true;
Tout r = calc(t, ok);
if (ok) {
mutex_l.lock();
last = r;
mutex_l.unlock();
cnt++;
//piCoutObj << "calc ok" << &next_overload;
calculated(r, &next_overload);
}
}
//piCoutObj << "run ok";
}
PIMutex mutex;
PIMutex mutex_l;
bool wait_next_pipe;
bool next_overload;
ullong cnt;
PIQueue<Tin> in;
Tout last;
};
#endif // PIPIPELINETHREAD_H

View File

@@ -0,0 +1,575 @@
/*
PIP - Platform Independent Primitives
Thread
Ivan Pelipenko peri4ko@yandex.ru, Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "piincludes_p.h"
#include "pithread.h"
#include "pisystemtests.h"
#include "piintrospection_threads.h"
#include <signal.h>
#ifdef WINDOWS
# define __THREAD_FUNC_RET__ uint __stdcall
#else
# define __THREAD_FUNC_RET__ void*
#endif
#if defined(LINUX)
# include <sys/syscall.h>
# define gettid() syscall(SYS_gettid)
#endif
#if defined(MAC_OS) || defined(BLACKBERRY) || defined(FREERTOS)
# include <pthread.h>
#endif
__THREAD_FUNC_RET__ thread_function(void * t) {((PIThread*)t)->__thread_func__(); return 0;}
__THREAD_FUNC_RET__ thread_function_once(void * t) {((PIThread*)t)->__thread_func_once__(); return 0;}
#define REGISTER_THREAD(t) __PIThreadCollection::instance()->registerThread(t)
#define UNREGISTER_THREAD(t) __PIThreadCollection::instance()->unregisterThread(t)
/*! \class PIThread
* \brief Thread class
* \details This class allow you exec your code in separate thread.
*
* \section PIThread_sec0 Synopsis
* Multithreading allow you to write program which will be executed
* in several threads simultaneously. This trend allow you to use all
* cores of modern processors, but there are many dangers.
*
* This class provide virtual functions \a begin(), \a run() and \a end(),
* which describes start, execution and finish work of some process.
* These functions executes in \b separate thread. When you execute
* \a start(), %PIThread create separate system thread and sequentially
* executes function \a begin(), \a run() and \a end(). You can
* reimplement each function and write your own code to execute.
* Scheme of functions executing:
\code{.cpp}
begin();
event started();
while (isRunning()) {
run();
ThreadFunc();
msleep(timer_delay);
}
event stopped();
end();
\endcode
* Unlike from directly using "pthread" or some similar you doesn`t need
* to write your own main thread cycle and sleep at every cycle end.
* %PIThread make it for you, and your job is to set sleep value from
* contructor or when starting thread, and reimplement \a begin(), \a run()
* and \a end() functions.
*
* \section PIThread_sec1 Using without subclassing
* You can use %PIThread without subclassing by using "ThreadFunc" pointer
* that can be set from constructor or by overloaded function \a start(ThreadFunc func, int timer_delay).
* If "func" if not null this function will be executed as \a run(). ThreadFunc is any static
* function with format void func(void * data). "Data" is custom data set from constructor or
* with function \a setData(). \n Also you can connect to event \a started(), but
* in this case you should to white your thread main cycle, because this event raised only one time.
*
* \section PIThread_sec2 Locking
* %PIThread has inrternal mutex that can be locked and unlocked every \a run() if you set this flag
* with function \a needLockRun(bool). Also you can access to this mutex by functions \a lock(), \a unlock()
* and \a mutex(). Using this functions together with needLockRun(true) can guarantee one-thread access to
* some data.
*
*/
__PIThreadCollection *__PIThreadCollection::instance() {
return __PIThreadCollection_Initializer__::__instance__;
}
void __PIThreadCollection::registerThread(PIThread * t) {
lock();
if (!threads_.contains(t))
threads_ << t;
unlock();
}
void __PIThreadCollection::unregisterThread(PIThread * t) {
lock();
threads_.removeAll(t);
unlock();
}
PIVector<PIThread * > __PIThreadCollection::threads() const {
return threads_;
}
void __PIThreadCollection::startedAuto(PIThread * t) {
PIMutexLocker _ml(auto_mutex);
auto_threads_ << t;
}
void __PIThreadCollection::stoppedAuto() {
PIThread * t = emitter()->cast<PIThread>();
if (!t) return;
auto_mutex.lock();
auto_threads_.removeAll(t);
auto_mutex.unlock();
delete t;
}
int __PIThreadCollection_Initializer__::count_(0);
__PIThreadCollection * __PIThreadCollection_Initializer__::__instance__(0);
__PIThreadCollection_Initializer__::__PIThreadCollection_Initializer__() {
count_++;
//PICout(PICoutManipulators::DefaultControls) << "try create Core" << count_;
if (count_ > 1) return;
//PICout(PICoutManipulators::DefaultControls) << "create Core";
__instance__ = new __PIThreadCollection();
}
__PIThreadCollection_Initializer__::~__PIThreadCollection_Initializer__() {
count_--;
//PICout(PICoutManipulators::DefaultControls) << "try delete Core" << count_;
if (count_ > 0) return;
//PICout(PICoutManipulators::DefaultControls) << "delete Core";
if (__instance__ != 0) {
delete __instance__;
__instance__ = 0;
}
}
PRIVATE_DEFINITION_START(PIThread)
#ifndef WINDOWS
pthread_t thread;
sched_param sparam;
#else
void * thread;
#endif
PRIVATE_DEFINITION_END(PIThread)
PIThread::PIThread(void * data, ThreadFunc func, bool startNow, int timer_delay): PIObject() {
PIINTROSPECTION_THREAD_NEW(this);
tid_ = -1;
PRIVATE->thread = 0;
data_ = data;
ret_func = func;
terminating = running_ = lockRun = false;
priority_ = piNormal;
delay_ = timer_delay;
if (startNow) start(timer_delay);
}
PIThread::PIThread(std::function<void ()> func, bool startNow, int timer_delay) {
PIINTROSPECTION_THREAD_NEW(this);
tid_ = -1;
PRIVATE->thread = 0;
data_ = 0;
ret_func = [func](void*){func();};
terminating = running_ = lockRun = false;
priority_ = piNormal;
delay_ = timer_delay;
if (startNow) start(timer_delay);
}
PIThread::PIThread(bool startNow, int timer_delay): PIObject() {
PIINTROSPECTION_THREAD_NEW(this);
tid_ = -1;
PRIVATE->thread = 0;
ret_func = 0;
terminating = running_ = lockRun = false;
priority_ = piNormal;
delay_ = timer_delay;
if (startNow) start(timer_delay);
}
PIThread::~PIThread() {
PIINTROSPECTION_THREAD_DELETE(this);
if (!running_ || PRIVATE->thread == 0) return;
#ifdef FREERTOS
//void * ret(0);
//PICout(PICoutManipulators::DefaultControls) << "~PIThread" << PRIVATE->thread;
//PICout(PICoutManipulators::DefaultControls) << pthread_join(PRIVATE->thread, 0);
PICout(PICoutManipulators::DefaultControls) << "FreeRTOS can't terminate pthreads! waiting for stop";
stop(true);
//PICout(PICoutManipulators::DefaultControls) << "stopped!";
#else
#ifndef WINDOWS
# ifdef ANDROID
pthread_kill(PRIVATE->thread, SIGTERM);
# else
pthread_cancel(PRIVATE->thread);
# endif
# else
TerminateThread(PRIVATE->thread, 0);
CloseHandle(PRIVATE->thread);
# endif
#endif
terminating = running_ = false;
}
void PIThread::stop(bool wait) {
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "stop ..." << running_ << wait;
terminating = true;
if (wait) waitForFinish();
}
bool PIThread::start(int timer_delay) {
if (running_) return false;
delay_ = timer_delay;
return _startThread((void*)thread_function);
}
bool PIThread::startOnce() {
if (running_) return false;
return _startThread((void*)thread_function_once);
}
void PIThread::terminate() {
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "terminate ..." << running_;
#ifdef FREERTOS
PICout(PICoutManipulators::DefaultControls) << "FreeRTOS can't terminate pthreads! waiting for stop";
stop(true);
//PICout(PICoutManipulators::DefaultControls) << "stopped!";
#else
if (PRIVATE->thread == 0) return;
UNREGISTER_THREAD(this);
terminating = running_ = false;
tid_ = -1;
//PICout(PICoutManipulators::DefaultControls) << "terminate" << PRIVATE->thread;
#ifndef WINDOWS
# ifdef ANDROID
pthread_kill(PRIVATE->thread, SIGTERM);
# else
//pthread_kill(PRIVATE->thread, SIGKILL);
//void * ret(0);
pthread_cancel(PRIVATE->thread);
//pthread_join(PRIVATE->thread, &ret);
# endif
#else
TerminateThread(PRIVATE->thread, 0);
CloseHandle(PRIVATE->thread);
#endif
PRIVATE->thread = 0;
end();
#endif
PIINTROSPECTION_THREAD_STOP(this);
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "terminate ok" << running_;
}
int PIThread::priority2System(PIThread::Priority p) {
switch (p) {
# ifdef QNX
case piLowerst: return 8;
case piLow: return 9;
case piNormal: return 10;
case piHigh: return 11;
case piHighest: return 12;
# else
# ifdef WINDOWS
case piLowerst: return -2;
case piLow: return -1;
case piNormal: return 0;
case piHigh: return 1;
case piHighest: return 2;
# else
case piLowerst: return 2;
case piLow: return 1;
case piNormal: return 0;
case piHigh: return -1;
case piHighest: return -2;
# endif
# endif
default: return 0;
}
return 0;
}
bool PIThread::_startThread(void * func) {
terminating = false;
running_ = true;
#ifndef WINDOWS
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
int ret = pthread_create(&PRIVATE->thread, &attr, (void*(*)(void*))func, this);
//PICout(PICoutManipulators::DefaultControls) << "pthread_create" << PRIVATE->thread;
pthread_attr_destroy(&attr);
if (ret == 0) {
# ifdef MAC_OS
pthread_setname_np(((PIString&)name().elided(15, 0.4f).resize(15, '\0')).dataAscii());
pthread_threadid_np(PRIVATE->thread, (__uint64_t*)&tid_);
# else
# ifdef FREERTOS
tid_ = PRIVATE->thread;
# else
pthread_setname_np(PRIVATE->thread, ((PIString&)name().elided(15, 0.4f).resize(15, '\0')).dataAscii());
# endif
# endif
#else
if (PRIVATE->thread) CloseHandle(PRIVATE->thread);
# ifdef CC_GCC
PRIVATE->thread = (void *)_beginthreadex(0, 0, (unsigned(__stdcall*)(void*))func, this, 0, 0);
# else
PRIVATE->thread = CreateThread(0, 0, (LPTHREAD_START_ROUTINE)func, this, 0, 0);
# endif
if (PRIVATE->thread != 0) {
#endif
setPriority(priority_);
return true;
} else {
running_ = false;
PRIVATE->thread = 0;
piCoutObj << "Error: Can`t start new thread:" << errorString();
}
running_ = false;
return false;
}
void PIThread::setPriority(PIThread::Priority prior) {
#ifndef FREERTOS // FreeRTOS can't change priority runtime
priority_ = prior;
# ifndef WINDOWS
if (!running_ || (PRIVATE->thread == 0)) return;
//PICout(PICoutManipulators::DefaultControls) << "setPriority" << PRIVATE->thread;
policy_ = 0;
memset(&(PRIVATE->sparam), 0, sizeof(PRIVATE->sparam));
pthread_getschedparam(PRIVATE->thread, &policy_, &(PRIVATE->sparam));
PRIVATE->sparam.
# ifndef LINUX
sched_priority
# else
__sched_priority
# endif
= priority2System(priority_);
pthread_setschedparam(PRIVATE->thread, policy_, &(PRIVATE->sparam));
# else
if (!running_ || (PRIVATE->thread == 0)) return;
SetThreadPriority(PRIVATE->thread, priority2System(priority_));
# endif
#endif //FREERTOS
}
#ifdef WINDOWS
bool isExists(HANDLE hThread) {
//errorClear();
//piCout << "isExists" << hThread;
DWORD dw = 0;
GetExitCodeThread(hThread, &dw);
//piCout << ret << dw << errorString();
if (dw == STILL_ACTIVE) return true;
//piCout << errorString();
return false;
}
#endif
bool PIThread::waitForFinish(int timeout_msecs) {
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "PIThread::waitForFinish" << running_ << terminating << timeout_msecs;
if (!running_) return true;
if (timeout_msecs < 0) {
while (running_) {
msleep(PIP_MIN_MSLEEP);
#ifdef WINDOWS
if (!isExists(PRIVATE->thread)) {
unlock();
return true;
}
#endif
}
return true;
}
tmf_.reset();
while (running_ && tmf_.elapsed_m() < timeout_msecs) {
msleep(PIP_MIN_MSLEEP);
#ifdef WINDOWS
if (!isExists(PRIVATE->thread)) {
unlock();
return true;
}
#endif
}
return tmf_.elapsed_m() < timeout_msecs;
}
bool PIThread::waitForStart(int timeout_msecs) {
if (running_) return true;
if (timeout_msecs < 0) {
while (!running_)
msleep(PIP_MIN_MSLEEP);
return true;
}
tms_.reset();
while (!running_ && tms_.elapsed_m() < timeout_msecs)
msleep(PIP_MIN_MSLEEP);
return tms_.elapsed_m() < timeout_msecs;
}
void PIThread::_beginThread() {
#ifndef WINDOWS
# if !defined(ANDROID) && !defined(FREERTOS)
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0);
# endif
#endif
#ifdef WINDOWS
tid_ = GetCurrentThreadId();
#endif
#ifdef LINUX
tid_ = gettid();
#endif
PIINTROSPECTION_THREAD_START(this);
REGISTER_THREAD(this);
running_ = true;
if (lockRun) mutex_.lock();
begin();
if (lockRun) mutex_.unlock();
started();
}
void PIThread::_runThread() {
PIINTROSPECTION_THREAD_RUN(this);
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "lock" << "...";
if (lockRun) mutex_.lock();
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "lock" << "ok";
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "run" << "...";
#ifdef PIP_INTROSPECTION
PITimeMeasurer _tm;
#endif
run();
#ifdef PIP_INTROSPECTION
PIINTROSPECTION_THREAD_RUN_DONE(this, ullong(_tm.elapsed_u()));
#endif
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "run" << "ok";
//printf("thread %p tick\n", this);
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "ret_func" << "...";
if (ret_func != 0) ret_func(data_);
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "ret_func" << "ok";
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "unlock" << "...";
if (lockRun) mutex_.unlock();
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "unlock" << "ok";
}
void PIThread::_endThread() {
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "stop" << "...";
stopped();
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "stop" << "ok";
if (lockRun) mutex_.lock();
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "end" << "...";
end();
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "stop" << "ok";
if (lockRun) mutex_.unlock();
terminating = running_ = false;
tid_ = -1;
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "exit";
//cout << "thread " << t << " exiting ... " << endl;
//PICout(PICoutManipulators::DefaultControls) << "pthread_exit" << (__privateinitializer__.p)->thread;
UNREGISTER_THREAD(this);
PIINTROSPECTION_THREAD_STOP(this);
#ifndef WINDOWS
pthread_detach(PRIVATE->thread);
PRIVATE->thread = 0;
#endif
#ifndef WINDOWS
pthread_exit(0);
#else
# ifdef CC_GCC
_endthreadex(0);
# else
ExitThread(0);
# endif
#endif
}
void PIThread::__thread_func__() {
_beginThread();
while (!terminating) {
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "queued" << "...";
maybeCallQueuedEvents();
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "queued" << "ok";
_runThread();
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "wait" << "...";
PIINTROSPECTION_THREAD_WAIT(this);
if (delay_ > 0) {
tmr_.reset();
double sl(0.);
while (1) {
sl = piMind(delay_ - tmr_.elapsed_m(), PIP_MIN_MSLEEP);
if (terminating) break;
if (sl <= 0.) break;
piMSleep(sl);
}
}
//PICout(PICoutManipulators::DefaultControls) << "thread" << this << "wait" << "ok";
}
_endThread();
}
void PIThread::__thread_func_once__() {
_beginThread();
_runThread();
_endThread();
}
void PIThread::runOnce(PIObject * object, const char * handler, const PIString & name) {
PIThread * t = new PIThread();
t->setName(name);
if (!PIObject::piConnectU(t, PIStringAscii("started"), object, object, PIStringAscii(handler), "PIThread::runOnce")) {
delete t;
return;
}
__PIThreadCollection::instance()->startedAuto(t);
CONNECTU(t, stopped, __PIThreadCollection::instance(), stoppedAuto);
t->startOnce();
}
void PIThread::runOnce(std::function<void ()> func, const PIString & name) {
PIThread * t = new PIThread();
t->setName(name);
t->setSlot(func);
__PIThreadCollection::instance()->startedAuto(t);
CONNECTU(t, stopped, __PIThreadCollection::instance(), stoppedAuto);
t->startOnce();
}

257
libs/main/thread/pithread.h Normal file
View File

@@ -0,0 +1,257 @@
/*! \file pithread.h
* \brief Thread
*
* This file declare thread class and some wait functions
*/
/*
PIP - Platform Independent Primitives
Thread
Ivan Pelipenko peri4ko@yandex.ru, Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PITHREAD_H
#define PITHREAD_H
#include "piinit.h"
#include "pimutex.h"
#include "piobject.h"
class PIThread;
class PIIntrospectionThreads;
class PIP_EXPORT __PIThreadCollection: public PIObject {
PIOBJECT(__PIThreadCollection)
public:
static __PIThreadCollection * instance();
void registerThread(PIThread * t);
void unregisterThread(PIThread * t);
PIVector<PIThread * > threads() const;
void lock() {mutex.lock();}
void unlock() {mutex.unlock();}
void startedAuto(PIThread * t);
EVENT_HANDLER(void, stoppedAuto);
private:
PIVector<PIThread * > threads_, auto_threads_;
mutable PIMutex mutex, auto_mutex;
};
class PIP_EXPORT __PIThreadCollection_Initializer__ {
public:
__PIThreadCollection_Initializer__();
~__PIThreadCollection_Initializer__();
static int count_;
static __PIThreadCollection * __instance__;
};
static __PIThreadCollection_Initializer__ __PIThreadCollection_initializer__;
typedef std::function<void(void *)> ThreadFunc;
class PIP_EXPORT PIThread: public PIObject
{
PIOBJECT_SUBCLASS(PIThread, PIObject)
friend class PIIntrospectionThreads;
public:
NO_COPY_CLASS(PIThread)
//! Contructs thread with custom data "data", external function "func" and main loop delay "loop_delay".
PIThread(void * data, ThreadFunc func, bool startNow = false, int loop_delay = -1);
//! Contructs thread with external function "func" and main loop delay "loop_delay".
PIThread(std::function<void()> func, bool startNow = false, int loop_delay = -1);
//! Contructs thread with main loop delay "loop_delay".
PIThread(bool startNow = false, int loop_delay = -1);
virtual ~PIThread();
//! Priority of thread
enum Priority {
piLowerst /** Lowest */,
piLow /** Low */,
piNormal /** Normal, this is default priority of threads and timers */,
piHigh /** High */,
piHighest /** Highest */
};
EVENT_HANDLER0(bool, start) {return start(-1);}
EVENT_HANDLER1(bool, start, int, timer_delay);
bool start(ThreadFunc func) {return start(func, -1);}
bool start(ThreadFunc func, int timer_delay) {ret_func = func; return start(timer_delay);}
bool start(std::function<void()> func) {return start(func, -1);}
bool start(std::function<void()> func, int timer_delay) {ret_func = [func](void*){func();}; return start(timer_delay);}
EVENT_HANDLER0(bool, startOnce);
EVENT_HANDLER1(bool, startOnce, ThreadFunc, func) {ret_func = func; return startOnce();}
EVENT_HANDLER0(void, stop) {stop(false);}
EVENT_HANDLER1(void, stop, bool, wait);
EVENT_HANDLER0(void, terminate);
//! \brief Set common data passed to external function
void setData(void * d) {data_ = d;}
//! \brief Set external function that will be executed after every \a run()
void setSlot(ThreadFunc func) {ret_func = func;}
//! \brief Set external function that will be executed after every \a run()
void setSlot(std::function<void()> func) {ret_func = [func](void*){func();};}
//! \brief Set priority of thread
void setPriority(PIThread::Priority prior);
//! \brief Returns common data passed to external function
void * data() const {return data_;}
//! \brief Return priority of thread
PIThread::Priority priority() const {return priority_;}
//! \brief Return \c true if thread is running
bool isRunning() const {return running_;}
bool isStopping() const {return running_ && terminating;}
EVENT_HANDLER0(bool, waitForStart) {return waitForStart(-1);}
EVENT_HANDLER1(bool, waitForStart, int, timeout_msecs);
EVENT_HANDLER0(bool, waitForFinish) {return waitForFinish(-1);}
EVENT_HANDLER1(bool, waitForFinish, int, timeout_msecs);
//! \brief Set necessity of lock every \a run with internal mutex
void needLockRun(bool need) {lockRun = need;}
EVENT_HANDLER0(void, lock) {mutex_.lock();}
EVENT_HANDLER0(void, unlock) {mutex_.unlock();}
//! \brief Returns internal mutex
PIMutex & mutex() {return mutex_;}
//! \brief Returns thread ID
llong tid() const {return tid_;}
void __thread_func__();
void __thread_func_once__();
EVENT(started)
EVENT(stopped)
//! \brief Start event handler with name \"handler\" of object \"object\"
//! in separate thread with name \"name\"
//! and automatically delete it on function finish
static void runOnce(PIObject * object, const char * handler, const PIString & name = PIString());
//! \brief Start function \"func\" in separate thread with name \"name\"
//! and automatically delete it on function finish
static void runOnce(std::function<void()> func, const PIString & name = PIString());
//! \handlers
//! \{
/** \fn bool start(int timer_delay = -1)
* \brief Start thread
* \details Start execution of \a run() in internal loop with
* "timer_delay" delay in milliseconds. If "timer_delay" <= 0
* there is no delay in loop. Thread also exec external function
* set by \a setSlot() if it`s not null
*
* \return \c false if thread already started or can`t start thread */
/** \fn bool startOnce()
* \brief Start thread without internal loop
* \details Start execution of \a run() once. Thread also exec
* external function set by \a setSlot() if it`s not null
*
* \return \c false if thread already started or can`t start thread */
/** \fn bool startOnce(ThreadFunc func)
* \brief Start thread without internal loop
* \details Overloaded function. Set external function "func" before start
*
* \return \c false if thread already started or can`t start thread */
/** \fn void stop(bool wait = false)
* \brief Stop thread
* \details Stop execution of thread and wait for it finish
* if "wait" is \c true. This function can block for infinite
* time if "wait" is \c true and any of thread function is
* busy forever */
/** \fn void terminate()
* \brief Strongly stop thread
* \details Stop execution of thread immediately */
/** \fn bool waitForStart(int timeout_msecs = -1)
* \brief Wait for thread start
* \details This function block until thread start for "timeout_msecs"
* or forever if "timeout_msecs" < 0
*
* \return \c false if timeout is exceeded */
/** \fn bool waitForFinish(int timeout_msecs = -1)
* \brief Wait for thread finish
* \details This function block until thread finish for "timeout_msecs"
* or forever if "timeout_msecs" < 0
*
* \return \c false if timeout is exceeded */
//! \fn void lock()
//! \brief Lock internal mutex
//! \fn void unlock()
//! \brief Unlock internal mutex
//! \}
//! \events
//! \{
//! \fn void started()
//! \brief Raise on thread start
//! \fn void stopped()
//! \brief Raise on thread stop
//! \}
protected:
static int priority2System(PIThread::Priority p);
//! Function executed once at the start of thread.
virtual void begin() {;}
//! Function executed at every "timer_delay" msecs until thread was stopped.
virtual void run() {;}
//! Function executed once at the end of thread.
virtual void end() {;}
std::atomic_bool terminating, running_, lockRun;
int delay_, policy_;
llong tid_;
void * data_;
mutable PIMutex mutex_;
PITimeMeasurer tmf_, tms_, tmr_;
PIThread::Priority priority_;
ThreadFunc ret_func;
PRIVATE_DECLARATION(PIP_EXPORT)
private:
bool _startThread(void * func);
void _beginThread();
void _runThread();
void _endThread();
};
#endif // PITHREAD_H

View File

@@ -0,0 +1,31 @@
/*
PIP - Platform Independent Primitives
Module includes
Ivan Pelipenko peri4ko@yandex.ru, Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PITHREADMODULE_H
#define PITHREADMODULE_H
#include "pimutex.h"
#include "pithread.h"
#include "pitimer.h"
#include "pipipelinethread.h"
#include "pigrabberbase.h"
#include "pithreadpoolexecutor.h"
#include "piconditionvar.h"
#endif // PITHREADMODULE_H

View File

@@ -0,0 +1,79 @@
/*
PIP - Platform Independent Primitives
Stephan Fomenko
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pithreadpoolexecutor.h"
/*! \class PIThreadPoolExecutor
* @brief Thread pools address two different problems: they usually provide improved performance when executing large
* numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and
* managing the resources, including threads, consumed when executing a collection of tasks.
*/
PIThreadPoolExecutor::PIThreadPoolExecutor(int corePoolSize) : isShutdown_(false) {
for (int i = 0; i < corePoolSize; ++i) {
PIThread * thread = new PIThread([&, i](){
auto runnable = taskQueue.poll(100, std::function<void()>());
if (runnable) {
runnable();
}
if (isShutdown_ && taskQueue.size() == 0) threadPool[i]->stop();
});
threadPool.push_back(thread);
thread->start();
}
}
bool PIThreadPoolExecutor::awaitTermination(int timeoutMs) {
PITimeMeasurer measurer;
for (size_t i = 0; i < threadPool.size(); ++i) {
int dif = timeoutMs - (int)measurer.elapsed_m();
if (dif < 0) return false;
if (!threadPool[i]->waitForFinish(dif)) return false;
}
return true;
}
void PIThreadPoolExecutor::shutdownNow() {
isShutdown_ = true;
for (size_t i = 0; i < threadPool.size(); ++i) threadPool[i]->stop();
}
PIThreadPoolExecutor::~PIThreadPoolExecutor() {
shutdownNow();
while (threadPool.size() > 0) delete threadPool.take_back();
}
void PIThreadPoolExecutor::execute(const std::function<void()> & runnable) {
if (!isShutdown_) taskQueue.offer(runnable);
}
bool PIThreadPoolExecutor::isShutdown() const {
return isShutdown_;
}
void PIThreadPoolExecutor::shutdown() {
isShutdown_ = true;
}

View File

@@ -0,0 +1,63 @@
/*
PIP - Platform Independent Primitives
Stephan Fomenko
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PITHREADPOOLEXECUTOR_H
#define PITHREADPOOLEXECUTOR_H
#include "piblockingqueue.h"
#include <atomic>
class PIP_EXPORT PIThreadPoolExecutor {
public:
explicit PIThreadPoolExecutor(int corePoolSize);
virtual ~PIThreadPoolExecutor();
/**
* @brief Executes the given task sometime in the future. The task execute in an existing pooled thread. If the task
* cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been
* reached.
*
* @param runnable not empty function for thread pool execution
*/
void execute(const std::function<void()> & runnable);
void shutdownNow();
/**
* @brief Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be
* accepted. Invocation has no additional effect if already shut down. This method does not wait for previously
* submitted tasks to complete execution. Use awaitTermination to do that.
*/
void shutdown();
bool isShutdown() const;
bool awaitTermination(int timeoutMs);
private:
std::atomic_bool isShutdown_;
PIBlockingQueue<std::function<void()> > taskQueue;
PIVector<PIThread*> threadPool;
bool queue_own;
};
#endif // PITHREADPOOLEXECUTOR_H

View File

@@ -0,0 +1,658 @@
/*
PIP - Platform Independent Primitives
Timer
Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pitimer.h"
#include "piincludes_p.h"
#ifdef PIP_TIMER_RT
# include <csignal>
#endif
/*! \class PITimer
* \brief Timer
*
* \section PITimer_sec0 Synopsis
* This class implements timer function. PIP timers supports 3 way to tick notify,
* frequency delimiters and time measurements.
* \section PITimer_sec1 Notify variants
* Notify variants:
* * "slot" - static function with format void func(void * data, int delimiter);
* * event - \a tickEvent();
* * virtual function - \a tick().
*
* All these variants are equivalent, use most applicable.
* \section PITimer_sec2 Frequency delimiters
* Frequency delimiter is an integer number and "slot" function. If "slot" function is null
* timer main "slot" will be used. Each delimiter numbers tick timer will be execute
* delimiters or timer main "slot" function with \b delimiter value = delimiter number.
* Example: \snippet pitimer.cpp delimiter
* \section PITimer_sec3 Time measurements
* PITimer can be used as time measurer. Function \a reset() set time mark to current
* system time, then functions double elapsed_*() returns time elapsed from this mark.
* These functions can returns nano-, micro-, milli- and seconds with suffixes "n", "u", "m"
* and "s"
* Example: \snippet pitimer.cpp elapsed
*/
_PITimerBase::_PITimerBase() {
interval_ = 1000;
deferred_delay = 0.;
running_ = deferred_ = deferred_mode = false;
tfunc = 0;
parent = 0;
}
void _PITimerBase::setInterval(double i) {
interval_ = i;
if (isRunning()) {
//piCout << "change interval runtime";
stop(true);
start();
}
}
bool _PITimerBase::start(double interval_ms) {
if (isRunning()) stop(true);
deferred_ = false;
setInterval(interval_ms);
//piCout << "_PITimerBase::startTimer"<<interval_ms<<"...";
running_ = startTimer(interval_ms);
return running_;
}
void _PITimerBase::startDeferred(double interval_ms, PIDateTime start_datetime) {
if (isRunning()) stop(true);
deferred_ = true;
deferred_mode = true;
deferred_datetime = start_datetime;
setInterval(interval_ms);
running_ = startTimer(interval_ms);
}
void _PITimerBase::startDeferred(double interval_ms, double delay_ms) {
if (isRunning()) stop(true);
deferred_ = true;
deferred_mode = false;
deferred_delay = delay_ms;
setInterval(interval_ms);
running_ = startTimer(interval_ms);
}
bool _PITimerBase::stop(bool wait) {
//piCout << GetCurrentThreadId() << "_PITimerBase::stop" << wait << isRunning();
if (!isRunning()) return true;
//piCout << "_PITimerBase::stopTimer ...";
running_ = !stopTimer(wait);
return !running_;
}
class _PITimerImp_Thread: public _PITimerBase {
public:
_PITimerImp_Thread();
virtual ~_PITimerImp_Thread();
protected:
void prepareStart(double interval_ms);
bool threadFunc(); // returns true if repeat is needed
int wait_dt, wait_dd, wait_tick;
private:
virtual bool startTimer(double interval_ms);
virtual bool stopTimer(bool wait);
static void threadFuncS(void * d) {((_PITimerImp_Thread*)d)->threadFunc();}
void adjustTimes();
PIThread thread_;
PISystemTime st_time, st_inc, st_wait, st_odt;
};
#ifdef PIP_TIMER_RT
struct _PITimerImp_RT_Private_;
class _PITimerImp_RT: public _PITimerBase {
public:
_PITimerImp_RT();
virtual ~_PITimerImp_RT();
protected:
private:
virtual bool startTimer(double interval_ms);
virtual bool stopTimer(bool wait);
int ti;
_PITimerImp_RT_Private_ * priv;
};
#endif
class _PITimerImp_Pool: public _PITimerImp_Thread {
public:
_PITimerImp_Pool();
virtual ~_PITimerImp_Pool() {stop(true);}
private:
class Pool: public PIThread {
public:
static Pool * instance();
void add(_PITimerImp_Pool * t);
void remove(_PITimerImp_Pool * t);
void run();
PIVector<_PITimerImp_Pool * > timers, to_remove;
private:
explicit Pool();
virtual ~Pool();
};
virtual bool startTimer(double interval_ms);
virtual bool stopTimer(bool wait);
};
_PITimerImp_Thread::_PITimerImp_Thread() {
thread_.setName("__S__PITimerImp_Thread::thread");
wait_dt = 100;
wait_dd = 200;
wait_tick = 10;
//piCout << "_PITimerImp_Thread" << this << ", thread& =" << &thread_;
//piCout << "new _PITimerImp_Thread";
}
_PITimerImp_Thread::~_PITimerImp_Thread() {
stop(true);
}
void _PITimerImp_Thread::prepareStart(double interval_ms) {
if (interval_ms <= 0.) {
piCout << "Achtung! Start PITimer with interval <= 0!";
piCout << "Achtung! Parent" << parent;
assert(interval_ms > 0.);
}
st_inc = PISystemTime::fromMilliseconds(interval_ms);
st_odt = st_inc * 5;
if (st_odt.toSeconds() < 1.) st_odt = PISystemTime::fromSeconds(1.);
if (deferred_) {
if (!deferred_mode)
st_time = PISystemTime::current(true) + PISystemTime::fromMilliseconds(deferred_delay);
st_time -= st_inc;
} else
st_time = PISystemTime::current(true) + st_inc;
}
bool _PITimerImp_Thread::startTimer(double interval_ms) {
prepareStart(interval_ms);
thread_.setData(this);
return thread_.start(threadFuncS);
}
bool _PITimerImp_Thread::stopTimer(bool wait) {
#ifndef FREERTOS
thread_.stop(wait);
#else
thread_.stop();
if (wait)
if (!thread_.waitForFinish(10))
if (thread_.isRunning())
thread_.terminate();
#endif
return true;
}
bool _PITimerImp_Thread::threadFunc() {
if (!running_) return false;
if (deferred_) {
PISystemTime dwt;
int wth(wait_dt);
if (deferred_mode) {
dwt = deferred_datetime.toSystemTime() - PISystemTime::current();
wth = wait_dd;
} else
dwt = st_time - PISystemTime::current(true);
if (wth > 0) {
if (dwt.toMilliseconds() > wth + 1.) {
piMSleep(wth);
return false;
} else {
dwt.sleep();
deferred_ = false;
st_time = PISystemTime::current(true);
}
} else {
if (dwt.toMilliseconds() > 0.1)
return false;
}
}
st_wait = st_time - PISystemTime::current(true);
//piCout << "wait" << this << st_wait;
if (st_wait.abs() > st_odt || st_wait.seconds <= -5) {
//piCout << &thread_ << "adjust" << "...";
adjustTimes();
//piCout << &thread_ << "adjust" << "ok";
return true;
}
if (wait_tick > 0) {
if (st_wait.toMilliseconds() > wait_tick + 1.) {
piMSleep(wait_tick);
return false;
} else {
//piCout << &thread_ << "sleep for" << st_wait;
st_wait.sleep();
}
} else {
if (st_wait.toMilliseconds() > 0.1)
return false;
}
st_time += st_inc;
if (!parent->isPIObject()) {
piCout << "Achtung! PITimer \"parent\" is not PIObject!";
return false;
}
//piCout << &thread_ << "tfunc" << "...";
tfunc(parent);
//piCout << &thread_ << "tfunc" << "ok";
return true;
}
void _PITimerImp_Thread::adjustTimes() {
PISystemTime cst = PISystemTime::current(true);
if (st_time < cst) {
int rs = (cst - st_time).toSeconds() / st_inc.toSeconds();
if (rs >= 100)
st_time = cst + st_inc;
else {
while (st_time < cst)
st_time += st_inc;
}
} else {
int rs = (st_time - cst).toSeconds() / st_inc.toSeconds();
if (rs >= 100)
st_time = cst - st_inc;
else {
cst += st_inc;
while (st_time > cst)
st_time -= st_inc;
}
}
}
#ifdef PIP_TIMER_RT
void threadFuncS(sigval sv) {((_PITimerImp_RT * )sv.sival_ptr)->tfunc(((_PITimerImp_RT * )sv.sival_ptr)->parent);}
struct _PITimerImp_RT_Private_ {
itimerspec spec;
timer_t tt;
sigevent se;
};
_PITimerImp_RT::_PITimerImp_RT() {
//piCout << "new _PITimerImp_RT";
priv = new _PITimerImp_RT_Private_();
priv->tt = 0;
ti = -1;
memset(&(priv->se), 0, sizeof(priv->se));
priv->se.sigev_notify = SIGEV_THREAD;
priv->se.sigev_value.sival_ptr = this;
priv->se.sigev_notify_function = threadFuncS;
priv->se.sigev_notify_attributes = 0;
}
_PITimerImp_RT::~_PITimerImp_RT() {
stop(true);
delete priv;
}
bool _PITimerImp_RT::startTimer(double interval_ms) {
int flags(0);
priv->spec.it_interval.tv_nsec = ((int)(interval_ms * 1000) % 1000000) * 1000;
priv->spec.it_interval.tv_sec = (time_t)(interval_ms / 1000);
if (deferred_) {
if (deferred_mode) {
PISystemTime dtm = deferred_datetime.toSystemTime();
priv->spec.it_value.tv_nsec = dtm.nanoseconds;
priv->spec.it_value.tv_sec = dtm.seconds;
flags = TIMER_ABSTIME;
} else {
priv->spec.it_value.tv_nsec = ((int)(deferred_delay * 1000) % 1000000) * 1000;
priv->spec.it_value.tv_sec = (time_t)(deferred_delay / 1000);
}
} else {
priv->spec.it_value = priv->spec.it_interval;
}
ti = timer_create(CLOCK_REALTIME, &(priv->se), &(priv->tt));
//cout << "***create timer " << msecs << " msecs\n";
if (ti == -1) {
piCout << "Can`t create RT timer for " << interval_ms << " msecs: " << errorString();
return false;
}
timer_settime(priv->tt, flags, &(priv->spec), 0);
return true;
}
bool _PITimerImp_RT::stopTimer(bool wait) {
if (ti < 0) return true;
timer_delete(priv->tt);
ti = -1;
priv->tt = 0;
return true;
}
#endif
_PITimerImp_Pool::_PITimerImp_Pool(): _PITimerImp_Thread() {
wait_dt = wait_dd = wait_tick = 0;
//piCout << "new _PITimerImp_Pool";
}
_PITimerImp_Pool::Pool::Pool(): PIThread() {
setName("__S__PITimerImp_Pool::Pool");
needLockRun(true);
#ifndef FREERTOS
timers.reserve(64);
start(PIP_MIN_MSLEEP*5);
#else
start(PIP_MIN_MSLEEP);
#endif
}
_PITimerImp_Pool::Pool::~Pool() {
stop();
if (!waitForFinish(500))
terminate();
unlock();
timers.clear();
}
_PITimerImp_Pool::Pool * _PITimerImp_Pool::Pool::instance() {
static Pool pool;
return &pool;
}
void _PITimerImp_Pool::Pool::add(_PITimerImp_Pool * t) {
//piCout << "add ...";
lock();
to_remove.removeAll(t);
if (!timers.contains(t))
timers << t;
unlock();
//piCout << "add done";
}
void _PITimerImp_Pool::Pool::remove(_PITimerImp_Pool * t) {
//piCout << "remove ...";
lock();
to_remove << t;
unlock();
//piCout << "remove done";
}
void _PITimerImp_Pool::Pool::run() {
if (!to_remove.isEmpty()) {
piForeach (_PITimerImp_Pool * t, to_remove)
timers.removeAll(t);
to_remove.clear();
}
piForeach (_PITimerImp_Pool * t, timers)
t->threadFunc();
}
bool _PITimerImp_Pool::startTimer(double interval_ms) {
prepareStart(interval_ms);
Pool::instance()->add(this);
return true;
}
bool _PITimerImp_Pool::stopTimer(bool wait) {
Pool::instance()->remove(this);
return true;
}
PITimer::PITimer(): PIObject() {
#ifdef FREERTOS
imp_mode = PITimer::Thread;
#else
imp_mode = PITimer::Thread;
#endif
initFirst();
}
PITimer::PITimer(PITimer::TimerImplementation ti): PIObject() {
imp_mode = ti;
initFirst();
}
PITimer::PITimer(TimerEvent slot, void * data, PITimer::TimerImplementation ti): PIObject() {
imp_mode = ti;
initFirst();
data_t = data;
ret_func = slot;
}
PITimer::PITimer(std::function<void ()> slot, PITimer::TimerImplementation ti) {
imp_mode = ti;
initFirst();
ret_func = [slot](void *, int){slot();};
}
PITimer::PITimer(std::function<void (void *)> slot, void * data, PITimer::TimerImplementation ti) {
imp_mode = ti;
initFirst();
data_t = data;
ret_func = [slot](void *d, int){slot(d);};
}
PITimer::~PITimer() {
destroy();
}
double PITimer::interval() const {
init();
return imp->interval_;
}
void PITimer::setInterval(double ms) {
init();
setProperty("interval", ms);
imp->setInterval(ms);
}
bool PITimer::isRunning() const {
init();
return imp->running_;
}
bool PITimer::isStopped() const {
init();
return !imp->running_;
}
void PITimer::initFirst() {
lockRun = false;
callEvents = true;
data_t = 0;
ret_func = 0;
imp = 0;
setProperty("interval", 0.);
}
void PITimer::init() const {
if (imp) return;
switch (imp_mode) {
case PITimer::Pool: imp = new _PITimerImp_Pool(); break;
case PITimer::ThreadRT:
#ifdef PIP_TIMER_RT
imp = new _PITimerImp_RT();
break;
#else
piCoutObj << "Warning: \"ThreadRT\" is not available at this system! Using \"Thread\".";
#endif
case PITimer::Thread: imp = new _PITimerImp_Thread(); break;
default: piCout << "Fatal: invalid implementation() of" << this << "!"; assert(0);
}
if (!imp) return;
//piCout << this << "init" << imp;
imp->tfunc = tickImpS;
imp->parent = const_cast<PITimer*>(this);
}
void PITimer::destroy() {
if (!imp) return;
//piCout << this << "destroy" << imp;
imp->stop(false); ///BUG: WTF FreeRTOS segfault on this!
delete imp;
imp = 0;
}
void PITimer::tickImp() {
if (!isRunning()) return;
if (lockRun) lock();
if (ret_func) ret_func(data_t, 1);
tick(data_t, 1);
tickEvent(data_t, 1);
if (callEvents) maybeCallQueuedEvents();
piForeach (Delimiter & i, delims) {
if (i.delim > ++(i.tick)) continue;
i.tick = 0;
if (i.slot) i.slot(data_t, i.delim);
else if (ret_func) ret_func(data_t, i.delim);
tick(data_t, i.delim);
tickEvent(data_t, i.delim);
}
if (lockRun) unlock();
}
bool PITimer::start() {
init();
//piCout << this << "start" << imp;
return imp->start();
}
bool PITimer::start(double interval_ms_d) {
init();
//piCout << this << "start" << imp << interval_ms_d;
setProperty("interval", interval_ms_d);
return imp->start(interval_ms_d);
}
bool PITimer::start(int interval_ms_i) {
return start((double)interval_ms_i);
}
void PITimer::startDeferred(double delay_ms) {
init();
imp->startDeferred(delay_ms);
}
void PITimer::startDeferred(double interval_ms, double delay_ms) {
init();
imp->startDeferred(interval_ms, delay_ms);
}
void PITimer::startDeferred(PIDateTime start_datetime) {
startDeferred(imp->interval_, start_datetime);
}
void PITimer::startDeferred(double interval_ms, PIDateTime start_datetime) {
init();
imp->startDeferred(interval_ms, start_datetime);
}
bool PITimer::restart() {
init();
imp->stop(true);
return imp->start();
}
bool PITimer::stop() {
return stop(true);
}
bool PITimer::stop(bool wait) {
init();
//piCout << this << "stop" << imp << wait;
return imp->stop(wait);
}
bool PITimer::waitForFinish(int timeout_msecs) {
if (timeout_msecs < 0) {
while (isRunning())
msleep(PIP_MIN_MSLEEP);
return true;
}
PITimeMeasurer tm;
while (isRunning() && tm.elapsed_m() < timeout_msecs)
msleep(PIP_MIN_MSLEEP);
return tm.elapsed_m() < timeout_msecs;
}

258
libs/main/thread/pitimer.h Normal file
View File

@@ -0,0 +1,258 @@
/*! \file pitimer.h
* \brief Timer
*/
/*
PIP - Platform Independent Primitives
Timer
Ivan Pelipenko peri4ko@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PITIMER_H
#define PITIMER_H
#include "pithread.h"
#include "pitime.h"
typedef std::function<void(void *, int)> TimerEvent;
class PITimer;
class PIP_EXPORT _PITimerBase {
friend class PITimer;
public:
_PITimerBase();
virtual ~_PITimerBase() {}
double interval() const {return interval_;}
void setInterval(double i);
//! \brief Return \c true if thread is running
bool isRunning() const {return running_;}
bool isStopped() const {return !running_;}
bool start() {return start(interval_);}
bool start(double interval_ms);
void startDeferred(double delay_ms) {startDeferred(interval_, delay_ms);}
void startDeferred(double interval_ms, double delay_ms);
void startDeferred(PIDateTime start_datetime) {startDeferred(interval_, start_datetime);}
void startDeferred(double interval_ms, PIDateTime start_datetime);
bool stop(bool wait);
typedef void(*TickFunc)(PITimer*);
TickFunc tfunc;
PITimer * parent;
protected:
virtual bool startTimer(double interval_ms) = 0;
virtual bool stopTimer(bool wait) = 0;
double interval_, deferred_delay;
bool deferred_, deferred_mode; // mode: true - date, false - delay
std::atomic_bool running_;
PIDateTime deferred_datetime;
};
class PIP_EXPORT PITimer: public PIObject {
PIOBJECT_SUBCLASS(PITimer, PIObject)
public:
NO_COPY_CLASS(PITimer)
//! \brief Constructs timer with PITimer::Thread implementation
explicit PITimer();
//! \brief Timer implementations
enum TimerImplementation {
Thread /*! Timer works in his own thread. Intervals are measured by the system time */ = 0x01,
ThreadRT /*! Using POSIX timer with SIGEV_THREAD notification. \attention Doesn`t support on Windows and Mac OS! */ = 0x02,
Pool /*! Using single TimerPool for all timers with this implementation. TimerPool works as Thread implementation and
* sequentially executes all timers. \attention Use this implementation with care! */ = 0x04
};
//! \brief Constructs timer with "ti" implementation
explicit PITimer(TimerImplementation ti);
//! \brief Constructs timer with "slot" slot void(void *,int), "data" data and "ti" implementation
explicit PITimer(TimerEvent slot, void * data = 0, TimerImplementation ti = Thread);
//! \brief Constructs timer with "slot" slot void(), and "ti" implementation
explicit PITimer(std::function<void ()> slot, TimerImplementation ti = Thread);
//! \brief Constructs timer with "slot" slot void(void *), "data" data and "ti" implementation
explicit PITimer(std::function<void (void *)> slot, void * data, TimerImplementation ti = Thread);
virtual ~PITimer();
//! \brief Returns timer implementation
PITimer::TimerImplementation implementation() const {return imp_mode;}
//! \brief Returns timer loop delay in milliseconds
double interval() const;
//! \brief Set timer loop delay in milliseconds
EVENT_HANDLER1(void, setInterval, double, ms);
//! \brief Returns if timer is started
bool isRunning() const;
//! \brief Returns if timer is not started
bool isStopped() const;
EVENT_HANDLER0(bool, start);
EVENT_HANDLER1(bool, start, double, interval_ms_d);
bool start(int interval_ms_i);
EVENT_HANDLER0(bool, restart);
/** \brief Start timer with \b interval() loop delay after \b delay_msecs delay.
* \details Timer wait \b delay_msecs milliseconds and then normally starts with
* \b interval() loop delay. */
void startDeferred(double delay_ms);
/** \brief Start timer with \b interval_msecs loop delay after \b delay_msecs delay.
* \details Timer wait \b delay_msecs milliseconds and then normally starts with
* \b interval_msecs loop delay. */
void startDeferred(double interval_ms, double delay_ms);
/** \brief Start timer with \b interval() loop delay after \b start_datetime date and time.
* \details Timer wait until \b start_datetime and then normally starts with
* \b interval() loop delay. */
void startDeferred(PIDateTime start_datetime);
/** \brief Start timer with \b interval_msecs loop delay after \b start_datetime date and time.
* \details Timer wait until \b start_datetime and then normally starts with
* \b interval_msecs loop delay. */
void startDeferred(double interval_ms, PIDateTime start_datetime);
EVENT_HANDLER0(bool, stop);
EVENT_HANDLER1(bool, stop, bool, wait);
bool waitForFinish() {return waitForFinish(-1);}
bool waitForFinish(int timeout_msecs);
//! \brief Set custom data
void setData(void * data_) {data_t = data_;}
//! \brief Set timer tick function
void setSlot(TimerEvent slot) {ret_func = slot;}
//! \brief Set timer tick function
void setSlot(std::function<void ()> slot) {ret_func = [slot](void *, int){slot();};}
//! \brief Set timer tick function
void setSlot(std::function<void (void *)> slot) {ret_func = [slot](void *d, int){slot(d);};}
//! \brief Returns common data passed to tick functions
void * data() const {return data_t;}
void needLockRun(bool need) {lockRun = need;}
EVENT_HANDLER0(void, lock) {mutex_.lock();}
EVENT_HANDLER0(void, unlock) {mutex_.unlock();}
//! \brief Returns if timer should exec \a maybeCallQueuedEvents() at every tick.
//! By default \b true
bool isCallQueuedEvents() const {return callEvents;}
//! \brief If set timer exec \a maybeCallQueuedEvents() at every tick.
//! By default \b true
void setCallQueuedEvents(bool yes) {callEvents = yes;}
//! \brief Add frequency delimiter \b delim with optional delimiter slot \b slot.
void addDelimiter(int delim, TimerEvent slot = 0) {delims << Delimiter(slot, delim);}
//! \brief Add frequency delimiter \b delim with optional delimiter slot \b slot.
void addDelimiter(int delim, std::function<void ()> slot) {delims << Delimiter([slot](void *, int){slot();}, delim);}
//! \brief Add frequency delimiter \b delim with optional delimiter slot \b slot.
void addDelimiter(int delim, std::function<void (void *)> slot) {delims << Delimiter([slot](void *d, int){slot(d);}, delim);}
//! \brief Remove all frequency delimiters \b delim.
void removeDelimiter(int delim) {for (int i = 0; i < delims.size_s(); ++i) if (delims[i].delim == delim) {delims.remove(i); i--;}}
EVENT_HANDLER0(void, clearDelimiters) {delims.clear();}
EVENT2(tickEvent, void * , data_, int, delimiter)
//! \handlers
//! \{
/** \fn bool start()
* \brief Start timer with \a interval() loop delay
* \details Start execution of timer functions with frequency = 1 / msecs Hz. */
/** \fn bool start(double msecs)
* \brief Start timer with \b msecs loop delay
* \details Start execution of timer functions with frequency = 1. / msecs Hz.
* Instead of \a start(int msecs) function this variant allow start timer
* with frequencies more than 1 kHz */
//! \fn bool restart()
//! \brief Stop and start timer with \a interval() loop delay
//! \fn bool stop(bool wait = true)
//! \brief Stop timer and wait for it finish if "wait"
//! \fn void clearDelimiters()
//! \brief Remove all frequency delimiters
//! \}
//! \events
//! \{
/** \fn void tickEvent(void * data, int delimiter)
* \brief Raise on timer tick
* \details \b Data can be set with function \a setData(void * data) or from constructor.
* \b Delimiter is frequency delimiter, 1 for main loop. */
//! \}
protected:
struct PIP_EXPORT Delimiter {
Delimiter(TimerEvent slot_ = 0, int delim_ = 1) {slot = slot_; delim = delim_; tick = 0;}
TimerEvent slot;
int delim;
int tick;
};
void initFirst();
void init() const;
void destroy();
static void tickImpS(PITimer * t) {t->tickImp();}
void tickImp();
//! Virtual timer execution function, similar to "slot" or event \a void timeout(void * data, int delimiter).
//! By default is empty.
virtual void tick(void * data_, int delimiter) {}
void * data_t;
std::atomic_bool lockRun, callEvents;
PIMutex mutex_;
TimerEvent ret_func;
TimerImplementation imp_mode;
PIVector<Delimiter> delims;
mutable _PITimerBase * imp;
};
#endif // PITIMER_H