Compare commits
5 Commits
5794eac20a
...
PIThreadPo
| Author | SHA1 | Date | |
|---|---|---|---|
| a16f629dc5 | |||
| 5868e0ec9d | |||
| 3102b985d5 | |||
| 93547beb38 | |||
| edb7189013 |
@@ -217,6 +217,7 @@ public:
|
||||
//! \~english Waits until the thread starts. Returns \b false if the timeout expires first.
|
||||
//! \~russian Ожидает запуска потока. Возвращает \b false, если таймаут истек раньше.
|
||||
bool waitForStart(PISystemTime timeout = {});
|
||||
|
||||
//! \~english Deprecated overload of \a waitForStart() that accepts milliseconds.
|
||||
//! \~russian Устаревшая перегрузка \a waitForStart(), принимающая миллисекунды.
|
||||
bool waitForStart(int timeout_msecs) DEPRECATEDM("use waitForStart(PISystemTime)") {
|
||||
@@ -226,6 +227,7 @@ public:
|
||||
//! \~english Waits for thread completion. Returns \b false if the timeout expires first.
|
||||
//! \~russian Ожидает завершения потока. Возвращает \b false, если таймаут истек раньше.
|
||||
bool waitForFinish(PISystemTime timeout = {});
|
||||
|
||||
//! \~english Deprecated overload of \a waitForFinish() that accepts milliseconds.
|
||||
//! \~russian Устаревшая перегрузка \a waitForFinish(), принимающая миллисекунды.
|
||||
bool waitForFinish(int timeout_msecs) DEPRECATEDM("use waitForFinish(PISystemTime)") {
|
||||
|
||||
@@ -71,8 +71,8 @@
|
||||
#include "pispinlock.h"
|
||||
#include "pithread.h"
|
||||
#include "pithreadnotifier.h"
|
||||
#include "pithreadpoolexecutor.h"
|
||||
#include "pithreadpoolloop.h"
|
||||
#include "pithreadpoolworker.h"
|
||||
#include "pitimer.h"
|
||||
|
||||
#endif // PITHREADMODULE_H
|
||||
|
||||
@@ -1,83 +0,0 @@
|
||||
/*
|
||||
PIP - Platform Independent Primitives
|
||||
|
||||
Stephan Fomenko
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "pithreadpoolexecutor.h"
|
||||
|
||||
#include "piliterals_time.h"
|
||||
|
||||
/*! \class PIThreadPoolExecutor
|
||||
* \brief Thread pools address two different problems: they usually provide improved performance when executing large
|
||||
* numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and
|
||||
* managing the resources, including threads, consumed when executing a collection of tasks.
|
||||
*/
|
||||
|
||||
|
||||
PIThreadPoolExecutor::PIThreadPoolExecutor(int corePoolSize): isShutdown_(false) {
|
||||
for (int i = 0; i < corePoolSize; ++i) {
|
||||
PIThread * thread = new PIThread([&, i]() {
|
||||
auto runnable = taskQueue.poll(100_ms, std::function<void()>());
|
||||
if (runnable) {
|
||||
runnable();
|
||||
}
|
||||
if (isShutdown_ && taskQueue.size() == 0) threadPool[i]->stop();
|
||||
});
|
||||
threadPool.push_back(thread);
|
||||
thread->start();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool PIThreadPoolExecutor::awaitTermination(PISystemTime timeout) {
|
||||
PITimeMeasurer measurer;
|
||||
for (size_t i = 0; i < threadPool.size(); ++i) {
|
||||
auto dif = timeout - measurer.elapsed();
|
||||
if (dif.isNegative()) return false;
|
||||
if (!threadPool[i]->waitForFinish(dif)) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void PIThreadPoolExecutor::shutdownNow() {
|
||||
isShutdown_ = true;
|
||||
for (size_t i = 0; i < threadPool.size(); ++i)
|
||||
threadPool[i]->stop();
|
||||
}
|
||||
|
||||
|
||||
PIThreadPoolExecutor::~PIThreadPoolExecutor() {
|
||||
shutdownNow();
|
||||
while (threadPool.size() > 0)
|
||||
delete threadPool.take_back();
|
||||
}
|
||||
|
||||
|
||||
void PIThreadPoolExecutor::execute(std::function<void()> runnable) {
|
||||
if (!isShutdown_) taskQueue.offer(std::move(runnable));
|
||||
}
|
||||
|
||||
|
||||
bool PIThreadPoolExecutor::isShutdown() const {
|
||||
return isShutdown_;
|
||||
}
|
||||
|
||||
|
||||
void PIThreadPoolExecutor::shutdown() {
|
||||
isShutdown_ = true;
|
||||
}
|
||||
@@ -1,95 +1,23 @@
|
||||
//! \~\file pithreadpoolexecutor.h
|
||||
//! \~\ingroup Thread
|
||||
//! \brief
|
||||
//! \~english Thread pool executor
|
||||
//! \~russian Исполнитель пула потоков
|
||||
//!
|
||||
//! \details
|
||||
//! \~english Executes tasks in a pool of worker threads.
|
||||
//! \~russian Выполняет задачи в пуле рабочих потоков.
|
||||
/*
|
||||
PIP - Platform Independent Primitives
|
||||
|
||||
Stephan Fomenko
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef PITHREADPOOLEXECUTOR_H
|
||||
#define PITHREADPOOLEXECUTOR_H
|
||||
|
||||
#include "piblockingqueue.h"
|
||||
#include "pithread.h"
|
||||
#include "pithreadpoolworker.h"
|
||||
|
||||
#include <atomic>
|
||||
namespace {
|
||||
DEPRECATEDM("Use pithreadpoolworker.h") const bool deprecated_header_is_used = true;
|
||||
}
|
||||
|
||||
|
||||
//! \~\ingroup Thread
|
||||
//! \~\brief
|
||||
//! \~english Fixed-size pool of worker threads for fire-and-forget tasks.
|
||||
//! \~russian Фиксированный пул рабочих потоков для задач без ожидания результата.
|
||||
class PIP_EXPORT PIThreadPoolExecutor {
|
||||
class DEPRECATEDM("Use PIThreadPoolWorker") PIThreadPoolExecutor: public PIThreadPoolWorker {
|
||||
public:
|
||||
//! \~english Constructs executor with \a corePoolSize worker threads.
|
||||
//! \~russian Создает исполнитель с \a corePoolSize рабочими потоками.
|
||||
explicit PIThreadPoolExecutor(int corePoolSize);
|
||||
PIThreadPoolExecutor(int threads_count): PIThreadPoolWorker(threads_count) { start(); }
|
||||
~PIThreadPoolExecutor() { stopAndWait(); }
|
||||
|
||||
//! \~english Stops worker threads and destroys executor resources.
|
||||
//! \~russian Останавливает рабочие потоки и уничтожает ресурсы исполнителя.
|
||||
virtual ~PIThreadPoolExecutor();
|
||||
|
||||
//! \~\brief
|
||||
//! \~english Submits \a runnable for asynchronous execution by a worker thread.
|
||||
//! \~russian Передает \a runnable на асинхронное выполнение рабочим потоком.
|
||||
//! \details
|
||||
//! \~english
|
||||
//! This is a best-effort fire-and-forget call and does not report whether the task was accepted.
|
||||
//! \After shutdown requests new tasks are ignored.
|
||||
//! \~russian
|
||||
//! Это вызов по принципу best-effort без ожидания результата и без сообщения о том, была ли задача принята.
|
||||
//! \После запроса на завершение новые задачи игнорируются.
|
||||
void execute(std::function<void()> runnable);
|
||||
|
||||
//! \~english Requests immediate shutdown and stops worker threads without waiting for queued tasks to finish.
|
||||
//! \~russian Запрашивает немедленное завершение и останавливает рабочие потоки без ожидания завершения задач в очереди.
|
||||
void shutdownNow();
|
||||
|
||||
//! \~\brief
|
||||
//! \~english Requests orderly shutdown: new tasks are rejected and workers stop after the current queue is drained.
|
||||
//! \~russian Запрашивает упорядоченное завершение: новые задачи отклоняются, а рабочие потоки останавливаются после опустошения текущей
|
||||
//! очереди.
|
||||
//! \details
|
||||
//! \~english This method does not wait for worker termination.
|
||||
//! \~russian Этот метод не ожидает завершения рабочих потоков.
|
||||
void shutdown();
|
||||
|
||||
//! \~english Returns \c true after \a shutdown() or \a shutdownNow() has been requested.
|
||||
//! \~russian Возвращает \c true после запроса \a shutdown() или \a shutdownNow().
|
||||
bool isShutdown() const;
|
||||
|
||||
//! \~\brief
|
||||
//! \~english Waits up to \a timeout for all worker threads to finish.
|
||||
//! \~russian Ожидает до \a timeout завершения всех рабочих потоков.
|
||||
//! \return
|
||||
//! \~english \c false if the timeout expires first.
|
||||
//! \~russian \c false, если таймаут истек раньше.
|
||||
bool awaitTermination(PISystemTime timeout);
|
||||
|
||||
private:
|
||||
std::atomic_bool isShutdown_;
|
||||
PIBlockingQueue<std::function<void()>> taskQueue;
|
||||
PIVector<PIThread *> threadPool;
|
||||
void execute(std::function<void()> runnable) DEPRECATEDM("Use enqueueTask()") { enqueueTask(runnable); }
|
||||
void shutdownNow() DEPRECATEDM("Use stopAndWait()") { stopAndWait(); }
|
||||
void shutdown() DEPRECATEDM("Use stop()") { stop(); }
|
||||
bool isShutdown() const DEPRECATEDM("Use !isRunning()") { return !isRunning(); }
|
||||
bool awaitTermination(PISystemTime timeout) DEPRECATEDM("Use waitForFinish()") { return waitForFinish(timeout); }
|
||||
};
|
||||
|
||||
#endif // PITHREADPOOLEXECUTOR_H
|
||||
|
||||
#endif
|
||||
|
||||
238
libs/main/thread/pithreadpoolworker.cpp
Normal file
238
libs/main/thread/pithreadpoolworker.cpp
Normal file
@@ -0,0 +1,238 @@
|
||||
/*
|
||||
PIP - Platform Independent Primitives
|
||||
|
||||
Ivan Pelipenko, Stephan Fomenko
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "pithreadpoolworker.h"
|
||||
|
||||
#include "pisysteminfo.h"
|
||||
|
||||
//! \addtogroup Thread
|
||||
//! \{
|
||||
//! \class PIThreadPoolWorker pithreadpoolworker.h
|
||||
//! \~\details
|
||||
//! \~english
|
||||
//! \PIThreadPoolWorker is a class that implements a fixed-size pool of worker threads for general-purpose task execution. It
|
||||
//! allows starting threads, enqueuing tasks as functors or class member methods, and managing their execution. The class provides methods
|
||||
//! to wait for all tasks or specific tasks by ID to complete, as well as to gracefully stop the pool with timeouts. The lifecycle of each
|
||||
//! task can be monitored using the taskStarted and taskFinished events. It is a versatile tool for multithreaded asynchronous operations.
|
||||
//! \~russian
|
||||
//! PIThreadPoolWorker — это класс, реализующий фиксированный пул рабочих потоков для выполнения задач общего назначения. Он
|
||||
//! позволяет запускать потоки, добавлять в очередь задачи в виде функторов или методов класса, а также управлять их выполнением. Класс
|
||||
//! предоставляет методы для ожидания завершения всех задач или отдельных задач по их идентификатору, а также для корректной остановки пула
|
||||
//! с таймаутами. С помощью событий taskStarted и taskFinished можно отслеживать жизненный цикл каждой задачи. Это универсальный инструмент
|
||||
//! для многопоточного асинхронного выполнения операций.
|
||||
//!
|
||||
//! \}
|
||||
|
||||
|
||||
PIThreadPoolWorker::PIThreadPoolWorker(int threads_count) {
|
||||
if (threads_count < 0) threads_count = PISystemInfo::instance()->processorsCount;
|
||||
assertm(threads_count > 0, "Invalid threads count!");
|
||||
for (int i = 0; i < threads_count; ++i) {
|
||||
Worker * w = new Worker();
|
||||
w->thread.setSlot([this, w]() { threadFunc(w); });
|
||||
workers << w;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
PIThreadPoolWorker::~PIThreadPoolWorker() {
|
||||
piDeleteAllAndClear(workers);
|
||||
}
|
||||
|
||||
|
||||
void PIThreadPoolWorker::start() {
|
||||
for (auto w: workers) {
|
||||
w->thread.start();
|
||||
w->notifier.notify();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void PIThreadPoolWorker::stop() {
|
||||
for (auto w: workers) {
|
||||
w->thread.stop();
|
||||
w->notifier.notify();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool PIThreadPoolWorker::stopAndWait(PISystemTime timeout) {
|
||||
stop();
|
||||
return waitForFinish(timeout);
|
||||
}
|
||||
|
||||
|
||||
bool PIThreadPoolWorker::waitForStart(PISystemTime timeout) {
|
||||
PITimeMeasurer tm;
|
||||
for (auto w: workers) {
|
||||
if (timeout.isNull())
|
||||
w->thread.waitForStart();
|
||||
else {
|
||||
auto remains = timeout - tm.elapsed();
|
||||
if (remains.isNegative()) return false;
|
||||
if (!w->thread.waitForStart(remains)) return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool PIThreadPoolWorker::waitForFinish(PISystemTime timeout) {
|
||||
PITimeMeasurer tm;
|
||||
for (auto w: workers) {
|
||||
if (timeout.isNull())
|
||||
w->thread.waitForFinish();
|
||||
else {
|
||||
auto remains = timeout - tm.elapsed();
|
||||
if (remains.isNegative()) return false;
|
||||
if (!w->thread.waitForFinish(remains)) return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool PIThreadPoolWorker::isRunning() const {
|
||||
return workers.every([](Worker * w) { return w->thread.isRunning(); });
|
||||
}
|
||||
|
||||
|
||||
bool PIThreadPoolWorker::waitForTasks(PISystemTime timeout) {
|
||||
if (!isRunning()) return tasks_queue.getRef()->isEmpty();
|
||||
auto checkWorking = [this] {
|
||||
if (tasks_queue.getRef()->isNotEmpty()) return true;
|
||||
for (auto * w: workers)
|
||||
if (w->in_work > 0) return true;
|
||||
return false;
|
||||
};
|
||||
if (timeout.isNull()) {
|
||||
for (;;) {
|
||||
if (!checkWorking()) break;
|
||||
piMinSleep();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
PITimeMeasurer tm;
|
||||
while (tm.elapsed() < timeout) {
|
||||
if (!checkWorking()) return true;
|
||||
piMinSleep();
|
||||
}
|
||||
return tm.elapsed() < timeout;
|
||||
}
|
||||
|
||||
|
||||
bool PIThreadPoolWorker::waitForTask(int64_t id, PISystemTime timeout) {
|
||||
if (!isRunning()) return tasks_queue.getRef()->every([id](const Task & t) { return t.id != id; });
|
||||
auto checkWorking = [this, id] {
|
||||
if (tasks_queue.getRef()->any([id](const Task & t) { return t.id == id; })) return true;
|
||||
for (auto * w: workers)
|
||||
if (w->in_work == id) return true;
|
||||
return false;
|
||||
};
|
||||
if (timeout.isNull()) {
|
||||
for (;;) {
|
||||
if (!checkWorking()) break;
|
||||
piMinSleep();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
PITimeMeasurer tm;
|
||||
while (tm.elapsed() < timeout) {
|
||||
if (!checkWorking()) return true;
|
||||
piMinSleep();
|
||||
}
|
||||
return tm.elapsed() < timeout;
|
||||
}
|
||||
|
||||
|
||||
void PIThreadPoolWorker::exec() {
|
||||
start();
|
||||
waitForStart();
|
||||
waitForTasks();
|
||||
stopAndWait();
|
||||
}
|
||||
|
||||
|
||||
int64_t PIThreadPoolWorker::enqueueTask(std::function<void(int64_t)> func, PIObject * context) {
|
||||
if (context) {
|
||||
if (!contexts[context]) {
|
||||
contexts << context;
|
||||
CONNECTL(context, deleted, ([this, context](PIObject *) {
|
||||
contexts.remove(context);
|
||||
auto qref = tasks_queue.getRef();
|
||||
// auto prev_size = qref->size();
|
||||
// piCout << "deleted" << (void *)context << qref->map<void *>([](const Task & t) { return t.context; });
|
||||
qref->removeWhere([context](const Task & t) { return t.context == context; });
|
||||
// piCout << prev_size << qref->size() << qref->map<void *>([](const Task & t) { return t.context; });
|
||||
}));
|
||||
}
|
||||
}
|
||||
int64_t id = ++next_task_id;
|
||||
Task task;
|
||||
task.id = id;
|
||||
task.func = std::move(func);
|
||||
task.context = context;
|
||||
tasks_queue.getRef()->enqueue(std::move(task));
|
||||
for (auto * w: workers)
|
||||
w->notifier.notify();
|
||||
return id;
|
||||
}
|
||||
|
||||
|
||||
bool PIThreadPoolWorker::removeTask(int64_t id) {
|
||||
auto qref = tasks_queue.getRef();
|
||||
auto prev_size = qref->size();
|
||||
qref->removeWhere([id](const Task & t) { return t.id == id; });
|
||||
return prev_size != qref->size();
|
||||
}
|
||||
|
||||
|
||||
void PIThreadPoolWorker::clearTasks() {
|
||||
tasks_queue.getRef()->clear();
|
||||
}
|
||||
|
||||
|
||||
PIThreadPoolWorker::TaskStatus PIThreadPoolWorker::taskStatus(int64_t id) const {
|
||||
if (id <= 0) return TaskStatus::Unknown;
|
||||
auto qref = tasks_queue.getRef();
|
||||
for (auto w: workers)
|
||||
if (w->in_work == id) return TaskStatus::InProgress;
|
||||
for (const auto & t: *qref)
|
||||
if (t.id == id) return TaskStatus::Enqueued;
|
||||
return id <= next_task_id ? TaskStatus::DoneOrCancelled : TaskStatus::Unknown;
|
||||
}
|
||||
|
||||
|
||||
void PIThreadPoolWorker::threadFunc(Worker * w) {
|
||||
w->notifier.wait();
|
||||
if (w->thread.isStopping()) return;
|
||||
Task task;
|
||||
{
|
||||
auto ref = tasks_queue.getRef();
|
||||
if (ref->isEmpty()) return;
|
||||
task = ref->dequeue();
|
||||
}
|
||||
if (!task.isValid()) return;
|
||||
w->in_work = task.id;
|
||||
taskStarted(task.id);
|
||||
task.func(task.id);
|
||||
w->in_work = -1;
|
||||
taskFinished(task.id);
|
||||
w->notifier.notify();
|
||||
}
|
||||
177
libs/main/thread/pithreadpoolworker.h
Normal file
177
libs/main/thread/pithreadpoolworker.h
Normal file
@@ -0,0 +1,177 @@
|
||||
//! \~\file pithreadpoolworker.h
|
||||
//! \~\ingroup Thread
|
||||
//! \brief
|
||||
//! \~english Thread pool worker
|
||||
//! \~russian Исполнитель пула потоков
|
||||
/*
|
||||
PIP - Platform Independent Primitives
|
||||
|
||||
Ivan Pelipenko, Stephan Fomenko
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef PITHREADPOOLWORKER_H
|
||||
#define PITHREADPOOLWORKER_H
|
||||
|
||||
#include "piprotectedvariable.h"
|
||||
#include "pithread.h"
|
||||
|
||||
|
||||
//! \~\ingroup Thread
|
||||
//! \~\brief
|
||||
//! \~english Fixed-size pool of worker threads for generic-purpose tasks.
|
||||
//! \~russian Фиксированный пул рабочих потоков для задач общего назначения.
|
||||
class PIP_EXPORT PIThreadPoolWorker: public PIObject {
|
||||
PIOBJECT(PIThreadPoolWorker)
|
||||
|
||||
public:
|
||||
//! \~english Constructs executor with \a threads_count worker threads. If \a threads_count < 0 processor threads count used.
|
||||
//! \~russian Создает исполнитель с \a threads_count рабочими потоками. Если \a threads_count < 0 используется количество потоков
|
||||
//! процессора.
|
||||
explicit PIThreadPoolWorker(int threads_count = -1);
|
||||
|
||||
//! \~english Destroy worker threads. Call \a stopAndWait() before.
|
||||
//! \~russian Уничтожает рабочие потоки. Вызывайте перед этим \a stopAndWait().
|
||||
virtual ~PIThreadPoolWorker();
|
||||
|
||||
|
||||
//! \~english Task status.
|
||||
//! \~russian Статус задачи.
|
||||
enum class TaskStatus {
|
||||
Unknown /** \~english ID <= 0 or not queued yet \~russian ID <= 0 или не поставлена в очередь */,
|
||||
Enqueued /** \~english Wait for execution \~russian Ожидает выполнения */,
|
||||
InProgress /** \~english In execution now \~russian В процессе выполнения */,
|
||||
DoneOrCancelled /** \~english Done or cancelled \~russian Выполнена или отменена */
|
||||
};
|
||||
|
||||
|
||||
//! \~english Starts the threads.
|
||||
//! \~russian Запускает потоки.
|
||||
void start();
|
||||
|
||||
//! \~english Requests graceful threads shutdown.
|
||||
//! \~russian Запрашивает корректное завершение потоков.
|
||||
void stop();
|
||||
|
||||
//! \~english Requests stop and waits for threads completion. Returns \b false if the timeout expires.
|
||||
//! \~russian Запрашивает остановку и ожидает завершения потоков. Возвращает \b false, если таймаут истек.
|
||||
bool stopAndWait(PISystemTime timeout = {});
|
||||
|
||||
//! \~english Waits until the threads starts. Returns \b false if the timeout expires first.
|
||||
//! \~russian Ожидает запуска потоков. Возвращает \b false, если таймаут истек раньше.
|
||||
bool waitForStart(PISystemTime timeout = {});
|
||||
|
||||
//! \~english Waits for threads completion. Returns \b false if the timeout expires first.
|
||||
//! \~russian Ожидает завершения потоков. Возвращает \b false, если таймаут истек раньше.
|
||||
bool waitForFinish(PISystemTime timeout = {});
|
||||
|
||||
//! \~english Returns whether the threads are currently running.
|
||||
//! \~russian Возвращает, выполняются ли потоки в данный момент.
|
||||
bool isRunning() const;
|
||||
|
||||
//! \~english Waits for all tasks completion. Returns \b false if the timeout expires first.
|
||||
//! \~russian Ожидает завершения всех задач. Возвращает \b false, если таймаут истек раньше.
|
||||
bool waitForTasks(PISystemTime timeout = {});
|
||||
|
||||
//! \~english Waits for task with id \a id completion. Returns \b false if the timeout expires first.
|
||||
//! \~russian Ожидает завершения задачи с id \a id. Возвращает \b false, если таймаут истек раньше.
|
||||
bool waitForTask(int64_t id, PISystemTime timeout = {});
|
||||
|
||||
//! \~english Starts threads, wait for all tasks complete and threads stop.
|
||||
//! \~russian Запускает потоки, ожидает завершения всех задач и остановки потоков.
|
||||
void exec();
|
||||
|
||||
|
||||
//! \~english Queue functor to execution. Pass task ID in functor. Returns task ID.
|
||||
//! \~russian Запланировать функтор на выполнение. В функтор передастся ID задачи. Возвращает ID задачи.
|
||||
int64_t enqueueTask(std::function<void(int64_t)> func, PIObject * context = nullptr);
|
||||
|
||||
//! \~english Queue functor to execution. Returns task ID.
|
||||
//! \~russian Запланировать функтор на выполнение. Возвращает ID задачи.
|
||||
int64_t enqueueTask(std::function<void()> func, PIObject * context = nullptr) {
|
||||
return enqueueTask([func](int64_t) { func(); }, context);
|
||||
}
|
||||
|
||||
//! \~english Queue class member method to execution. Pass task ID in method. Returns task ID.
|
||||
//! \~russian Запланировать член-метод класса на выполнение. В метод передастся ID задачи. Возвращает ID задачи.
|
||||
template<typename O>
|
||||
int64_t enqueueTask(O * obj, void (O::*member_func)(int64_t)) {
|
||||
return enqueueTask([obj, member_func](int64_t id) { (obj->*member_func)(id); },
|
||||
PIObject::isPIObject(obj) ? dynamic_cast<PIObject *>(obj) : nullptr);
|
||||
}
|
||||
|
||||
//! \~english Queue class member method to execution. Returns task ID.
|
||||
//! \~russian Запланировать член-метод класса на выполнение. Возвращает ID задачи.
|
||||
template<typename O>
|
||||
int64_t enqueueTask(O * obj, void (O::*member_func)()) {
|
||||
return enqueueTask([obj, member_func](int64_t) { (obj->*member_func)(); },
|
||||
PIObject::isPIObject(obj) ? dynamic_cast<PIObject *>(obj) : nullptr);
|
||||
}
|
||||
|
||||
//! \~english Remove task with id \a id from queue. Returns if task delete.
|
||||
//! \~russian Удаляет задачу с id \a id из очереди. Возвращиает была ли задача удалена.
|
||||
bool removeTask(int64_t id);
|
||||
|
||||
//! \~english Remove all queued tasks.
|
||||
//! \~russian Удаляет все задачи из очереди.
|
||||
void clearTasks();
|
||||
|
||||
//! \~english Returns task status with id \a id.
|
||||
//! \~russian Возвращиает статус задачи с id \a id.
|
||||
TaskStatus taskStatus(int64_t id) const;
|
||||
|
||||
|
||||
//! \events
|
||||
//! \{
|
||||
|
||||
//! \fn void taskStarted(int64_t id)
|
||||
//! \brief
|
||||
//! \~english Raised on start execution task with id \a id.
|
||||
//! \~russian Вызывается при старте выполнения задачи с id \a id.
|
||||
EVENT1(taskStarted, int64_t, id);
|
||||
|
||||
//! \fn void taskFinished(int64_t id)
|
||||
//! \brief
|
||||
//! \~english Raised on finish execution task with id \a id.
|
||||
//! \~russian Вызывается при завершении выполнения задачи с id \a id.
|
||||
EVENT1(taskFinished, int64_t, id);
|
||||
|
||||
//! \}
|
||||
|
||||
|
||||
private:
|
||||
struct Worker {
|
||||
PIThread thread;
|
||||
PIThreadNotifier notifier;
|
||||
std::atomic_int64_t in_work = {-1};
|
||||
};
|
||||
struct Task {
|
||||
bool isValid() const { return func != nullptr; }
|
||||
PIObject * context = nullptr;
|
||||
std::function<void(int64_t)> func = nullptr;
|
||||
int64_t id = -1;
|
||||
};
|
||||
|
||||
void threadFunc(Worker * w);
|
||||
|
||||
mutable PIVector<Worker *> workers;
|
||||
mutable PIProtectedVariable<PIQueue<Task>> tasks_queue;
|
||||
|
||||
PISet<PIObject *> contexts;
|
||||
std::atomic_int64_t next_task_id = {0};
|
||||
};
|
||||
|
||||
|
||||
#endif // PITHREADPOOLWORKER_H
|
||||
@@ -274,6 +274,18 @@ void PITimer::stopAndWait(PISystemTime timeout) {
|
||||
}
|
||||
|
||||
|
||||
void PITimer::setSlot(std::function<void()> func) {
|
||||
ret_func_delim = nullptr;
|
||||
ret_func = std::move(func);
|
||||
}
|
||||
|
||||
|
||||
void PITimer::setSlot(std::function<void(int)> func) {
|
||||
ret_func = nullptr;
|
||||
ret_func_delim = std::move(func);
|
||||
}
|
||||
|
||||
|
||||
void PITimer::addDelimiter(int delim, std::function<void(int)> func) {
|
||||
delims << Delimiter(std::move(func), delim);
|
||||
}
|
||||
|
||||
@@ -110,11 +110,6 @@ public:
|
||||
//! \~english Deprecated overload of \a start() that accepts milliseconds.
|
||||
//! \~russian Устаревшая перегрузка \a start(), принимающая миллисекунды.
|
||||
bool start(double interval_ms) DEPRECATEDM("use start(PISystemTime)") { return start(PISystemTime::fromMilliseconds(interval_ms)); }
|
||||
EVENT_HANDLER0(bool, start);
|
||||
|
||||
EVENT_HANDLER0(bool, restart);
|
||||
|
||||
EVENT_HANDLER0(void, stop);
|
||||
|
||||
//! \~english Deprecated overload of \a stopAndWait() that accepts milliseconds.
|
||||
//! \~russian Устаревшая перегрузка \a stopAndWait(), принимающая миллисекунды.
|
||||
@@ -126,23 +121,15 @@ public:
|
||||
|
||||
//! \~english Sets a tick callback that ignores the delimiter value.
|
||||
//! \~russian Устанавливает обратный вызов тика, игнорирующий значение делителя.
|
||||
void setSlot(std::function<void()> func) {
|
||||
ret_func_delim = nullptr;
|
||||
ret_func = std::move(func);
|
||||
}
|
||||
void setSlot(std::function<void()> func);
|
||||
|
||||
//! \~english Sets a tick callback that receives the current delimiter value.
|
||||
//! \~russian Устанавливает обратный вызов тика, принимающий текущее значение делителя.
|
||||
void setSlot(std::function<void(int)> func) {
|
||||
ret_func = nullptr;
|
||||
ret_func_delim = std::move(func);
|
||||
}
|
||||
void setSlot(std::function<void(int)> func);
|
||||
|
||||
//! \~english Enables locking of the internal mutex around tick processing.
|
||||
//! \~russian Включает блокировку внутреннего мьютекса вокруг обработки тиков.
|
||||
void needLockRun(bool need) { lockRun = need; }
|
||||
EVENT_HANDLER0(void, lock) { mutex_.lock(); }
|
||||
EVENT_HANDLER0(void, unlock) { mutex_.unlock(); }
|
||||
|
||||
//! \~english Returns whether the timer drains queued delivery for itself as performer on each main tick. By default \b true.
|
||||
//! \~russian Возвращает, должен ли таймер обрабатывать отложенную доставку для себя как исполнителя на каждом основном тике. По
|
||||
@@ -169,9 +156,6 @@ public:
|
||||
//! \~russian Удаляет все делители со значением \a delim.
|
||||
void removeDelimiter(int delim);
|
||||
|
||||
EVENT_HANDLER0(void, clearDelimiters) { delims.clear(); }
|
||||
|
||||
EVENT1(tickEvent, int, delimiter);
|
||||
|
||||
//! \handlers
|
||||
//! \{
|
||||
@@ -180,31 +164,37 @@ public:
|
||||
//! \brief
|
||||
//! \~english Starts the timer with the current \a interval().
|
||||
//! \~russian Запускает таймер с текущим значением \a interval().
|
||||
EVENT_HANDLER0(bool, start);
|
||||
|
||||
//! \fn bool restart()
|
||||
//! \brief
|
||||
//! \~english Stops the timer, then starts it again with the current \a interval().
|
||||
//! \~russian Останавливает таймер, затем снова запускает его с текущим значением \a interval().
|
||||
EVENT_HANDLER0(bool, restart);
|
||||
|
||||
//! \fn bool stop()
|
||||
//! \brief
|
||||
//! \~english Requests stop and wakes the timer thread, but does not wait for completion.
|
||||
//! \~russian Запрашивает остановку и пробуждает поток таймера, но не ожидает завершения.
|
||||
EVENT_HANDLER0(void, stop);
|
||||
|
||||
//! \fn void clearDelimiters()
|
||||
//! \brief
|
||||
//! \~english Remove all frequency delimiters
|
||||
//! \~russian Удаляет все делители частоты
|
||||
EVENT_HANDLER0(void, clearDelimiters) { delims.clear(); }
|
||||
|
||||
//! \fn void lock()
|
||||
//! \brief
|
||||
//! \~english Locks the internal timer mutex.
|
||||
//! \~russian Блокирует внутренний мьютекс таймера.
|
||||
EVENT_HANDLER0(void, lock) { mutex_.lock(); }
|
||||
|
||||
//! \fn void unlock()
|
||||
//! \brief
|
||||
//! \~english Unlocks the internal timer mutex.
|
||||
//! \~russian Разблокирует внутренний мьютекс таймера.
|
||||
EVENT_HANDLER0(void, unlock) { mutex_.unlock(); }
|
||||
|
||||
//! \}
|
||||
//! \events
|
||||
@@ -219,7 +209,7 @@ public:
|
||||
//! "delimiter" is the frequency delimiter, 1 for the main loop.
|
||||
//! \~russian
|
||||
//! "delimiter" - делитель частоты, 1 для основного цикла.
|
||||
|
||||
EVENT1(tickEvent, int, delimiter);
|
||||
|
||||
//! \}
|
||||
|
||||
|
||||
168
main.cpp
168
main.cpp
@@ -1,143 +1,53 @@
|
||||
#include "libs/http_client/curl_thread_pool_p.h"
|
||||
#include "picodeparser.h"
|
||||
#include "pidigest.h"
|
||||
#include "pihttpclient.h"
|
||||
#include "piliterals.h"
|
||||
#include "pip.h"
|
||||
#include "piunits.h"
|
||||
#include "pivaluetree_conversions.h"
|
||||
#include "pithreadpoolexecutor.h"
|
||||
|
||||
using namespace PICoutManipulators;
|
||||
using namespace PIHTTP;
|
||||
using namespace PIUnits::Class;
|
||||
|
||||
int rcnt = 0, scnt = 0;
|
||||
|
||||
inline PIByteArray SMBusTypeInfo_genHash(PIString n) {
|
||||
PICrypt c;
|
||||
return piSerialize(c.shorthash(n.removeAll(" "), PIString("SMBusDataHashKey").toByteArray()));
|
||||
}
|
||||
class A: public PIObject {
|
||||
PIOBJECT(A)
|
||||
|
||||
public:
|
||||
A(PIString n = {}) { setName(n); }
|
||||
void foo() {
|
||||
100_ms .sleep();
|
||||
piCoutObj << "foo!";
|
||||
100_ms .sleep();
|
||||
}
|
||||
};
|
||||
|
||||
int main(int argc, char * argv[]) {
|
||||
PICrypt _crypt;
|
||||
// auto ba = PIFile::readAll("logo.png");
|
||||
PIString str = "hello!"_a;
|
||||
PIByteArray ba = str.toAscii();
|
||||
PIByteArray key = PIString("SMBusDataHashKey").toByteArray();
|
||||
PIVector<A *> objects;
|
||||
objects << new A("1") << new A("2") << new A("3");
|
||||
|
||||
const int times = 1000000;
|
||||
PITimeMeasurer tm;
|
||||
PISystemTime el;
|
||||
PITimer status_timer;
|
||||
PIThreadPoolWorker pool(2);
|
||||
pool.start();
|
||||
|
||||
tm.reset();
|
||||
piForTimes(times) {
|
||||
PIDigest::calculateWithKey(ba, key, PIDigest::Type::SipHash_2_4_128);
|
||||
}
|
||||
el = tm.elapsed();
|
||||
piCout << "PIDigest" << el.toString();
|
||||
// int64_t id = -1;
|
||||
// status_timer.start(10_Hz, [&id, &pool] { piCout << "[timer] status" << (int)pool.taskStatus(id); });
|
||||
|
||||
tm.reset();
|
||||
piForTimes(times) {
|
||||
_crypt.shorthash(str, key);
|
||||
}
|
||||
el = tm.elapsed();
|
||||
piCout << " sodium" << el.toString();
|
||||
100_ms .sleep();
|
||||
// pool.enqueueTask([](int64_t id) {
|
||||
// piCout << "[task ] start, id" << id;
|
||||
// // 500_ms .sleep();
|
||||
// piCout << "[task ] done";
|
||||
// });
|
||||
pool.enqueueTask(objects[0], &A::foo);
|
||||
pool.enqueueTask(objects[1], &A::foo);
|
||||
pool.enqueueTask(objects[1], &A::foo);
|
||||
pool.enqueueTask(objects[2], &A::foo);
|
||||
|
||||
tm.reset();
|
||||
piForTimes(times) {
|
||||
PIDigest::calculateWithKey(ba, key, PIDigest::Type::BLAKE2b_128);
|
||||
}
|
||||
el = tm.elapsed();
|
||||
piCout << " blake" << el.toString();
|
||||
10_ms .sleep();
|
||||
delete objects[1];
|
||||
objects.remove(1);
|
||||
// piCout << "[main ]" << "enqueued, id" << id;
|
||||
|
||||
return 0;
|
||||
// 200_ms .sleep();
|
||||
piCout << "[main ]" << "wait ...";
|
||||
piCout << "[main ]" << "wait done";
|
||||
|
||||
1000_ms .sleep();
|
||||
|
||||
PIEthernet *eth_r, *eth_s;
|
||||
eth_r = PIIODevice::createFromFullPath("eth://udp: 192.168.1.25 :10000")->cast<PIEthernet>();
|
||||
eth_s = PIIODevice::createFromFullPath("eth://udp: : : 192.168.1.25:10000")->cast<PIEthernet>();
|
||||
|
||||
eth_r->setReadBufferSize(1_MiB);
|
||||
CONNECTL(eth_r, threadedReadEvent, [](const uchar * readed, ssize_t size) {
|
||||
// piCout << "rec";
|
||||
piMSleep(1);
|
||||
++rcnt;
|
||||
});
|
||||
eth_r->startThreadedRead();
|
||||
|
||||
PIByteArray _ba(1400);
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
eth_s->write(_ba);
|
||||
++scnt;
|
||||
}
|
||||
|
||||
0.2_s .sleep();
|
||||
|
||||
piCout << "snd" << scnt;
|
||||
piCout << "rec" << rcnt;
|
||||
|
||||
piDeleteSafety(eth_r);
|
||||
piDeleteSafety(eth_s);
|
||||
return 0;
|
||||
|
||||
PITranslator::loadLang("ru");
|
||||
/*auto ucl = PIUnits::allClasses();
|
||||
for (auto c: ucl) {
|
||||
piCout << (c->className() + ":");
|
||||
for (auto t: c->allTypes()) {
|
||||
piCout << " " << c->name(t) << "->" << c->unit(t);
|
||||
}
|
||||
}*/
|
||||
|
||||
// PIUnits::Value(1);
|
||||
// piCout << PIUnits::name(PIUnits::Class::Information::Bit);
|
||||
// piCout << PIUnits::name(PIUnits::Class::Information::Byte);
|
||||
// piCout << PIUnits::name(PIUnits::Class::Information::_LastType);
|
||||
// piCout << PIUnits::name((int)PIUnits::Class::Angle::Degree);
|
||||
|
||||
// piCout << PIUnits::unit(PIUnits::Class::Information::Bit);
|
||||
// piCout << PIUnits::unit(PIUnits::Class::Information::Byte);
|
||||
// piCout << PIUnits::unit(PIUnits::Class::Information::_LastType);
|
||||
// piCout << PIUnits::unit((int)PIUnits::Class::Angle::Degree);
|
||||
|
||||
// for (int i = -10; i < 10; ++i)
|
||||
// piCout << PIUnits::Value(pow10(i * 0.99), PIUnits::Class::Distance::Meter).toString();
|
||||
|
||||
auto v = PIUnits::Value(M_PI, Angle::Radian);
|
||||
piCout << v << "=" << v.converted(Angle::Degree);
|
||||
|
||||
v = PIUnits::Value(45, Angle::Degree);
|
||||
piCout << v << "=" << v.converted(Angle::Radian);
|
||||
|
||||
piCout << PIUnits::Value(5E-5, Time::Second);
|
||||
piCout << PIUnits::Value(3E-3, Time::Second);
|
||||
piCout << PIUnits::Value(0.8, Time::Second);
|
||||
piCout << PIUnits::Value(1.2, Time::Second);
|
||||
piCout << PIUnits::Value(1001, Time::Second);
|
||||
piCout << PIUnits::Value(1000001, Time::Second);
|
||||
|
||||
piCout << PIUnits::Value(1_KB, Information::Byte);
|
||||
piCout << PIUnits::Value(1_MB, Information::Byte);
|
||||
piCout << PIUnits::Value(1_MiB, Information::Byte);
|
||||
piCout << PIUnits::Value(1_MB, Information::Byte).converted(Information::Bit);
|
||||
piCout << PIUnits::Value(1_MiB, Information::Byte).converted(Information::Bit);
|
||||
|
||||
piCout << PIUnits::Value(0., Temperature::Celsius).converted(Temperature::Kelvin);
|
||||
piCout << PIUnits::Value(0., Temperature::Celsius).converted(Temperature::Fahrenheit);
|
||||
piCout << PIUnits::Value(100., Temperature::Celsius).converted(Temperature::Fahrenheit);
|
||||
|
||||
piCout << PIUnits::Value(1., Pressure::Atmosphere).converted(Pressure::Pascal);
|
||||
piCout << PIUnits::Value(1., Pressure::Atmosphere).converted(Pressure::MillimetreOfMercury);
|
||||
piCout << PIUnits::Value(766., Pressure::MillimetreOfMercury).converted(Pressure::Atmosphere);
|
||||
|
||||
piCout << PIUnits::Value(5E-5, Time::Second).converted(Time::Hertz);
|
||||
piCout << PIUnits::Value(3E-3, Time::Second).converted(Time::Hertz);
|
||||
piCout << PIUnits::Value(0.8, Time::Second).converted(Time::Hertz);
|
||||
piCout << PIUnits::Value(1.2, Time::Second).converted(Time::Hertz);
|
||||
piCout << PIUnits::Value(1001, Time::Second).converted(Time::Hertz);
|
||||
piCout << PIUnits::Value(1000001, Time::Second).converted(Time::Hertz);
|
||||
// piCout << PIUnits::Value(0.2, Time::Second).converted(Time::Hertz);
|
||||
// piCout << PIUnits::Value(5E-5, Time::Second).converted(Time::Hertz);
|
||||
pool.stopAndWait();
|
||||
status_timer.stopAndWait();
|
||||
piDeleteAll(objects);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ FetchContent_Declare(
|
||||
)
|
||||
# For Windows: Prevent overriding the parent project's compiler/linker settings
|
||||
set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
|
||||
set(BUILD_GMOCK OFF CACHE BOOL "Build Google Mock" FORCE)
|
||||
FetchContent_MakeAvailable(googletest)
|
||||
|
||||
enable_testing()
|
||||
@@ -32,7 +33,6 @@ macro(pip_test NAME)
|
||||
set(PIP_TESTS_LIST ${PIP_TESTS_LIST} PARENT_SCOPE)
|
||||
endmacro()
|
||||
|
||||
#pip_test(concurrent)
|
||||
pip_test(math)
|
||||
pip_test(core)
|
||||
pip_test(piobject)
|
||||
|
||||
@@ -1,262 +0,0 @@
|
||||
#include "piblockingqueue.h"
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
class MockConditionVar: public PIConditionVariable {
|
||||
public:
|
||||
bool isWaitCalled = false;
|
||||
bool isWaitForCalled = false;
|
||||
bool isTrueCondition = false;
|
||||
int timeout = -1;
|
||||
|
||||
void wait(PIMutex & lk) override { isWaitCalled = true; }
|
||||
|
||||
void wait(PIMutex & lk, const std::function<bool()> & condition) override {
|
||||
isWaitCalled = true;
|
||||
isTrueCondition = condition();
|
||||
}
|
||||
|
||||
bool waitFor(PIMutex & lk, int timeoutMs) override {
|
||||
isWaitForCalled = true;
|
||||
timeout = timeoutMs;
|
||||
return false;
|
||||
}
|
||||
|
||||
bool waitFor(PIMutex & lk, int timeoutMs, const std::function<bool()> & condition) override {
|
||||
isWaitForCalled = true;
|
||||
isTrueCondition = condition();
|
||||
timeout = timeoutMs;
|
||||
return isTrueCondition;
|
||||
}
|
||||
};
|
||||
|
||||
TEST(BlockingDequeueUnitTest, put_is_block_when_capacity_reach) {
|
||||
size_t capacity = 0;
|
||||
auto conditionVarAdd = new MockConditionVar();
|
||||
auto conditionVarRem = new MockConditionVar();
|
||||
PIBlockingQueue<int> dequeue(capacity, conditionVarAdd, conditionVarRem);
|
||||
dequeue.put(11);
|
||||
ASSERT_TRUE(conditionVarRem->isWaitCalled);
|
||||
ASSERT_FALSE(conditionVarRem->isTrueCondition);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, offer_timedout_is_false_when_capacity_reach) {
|
||||
size_t capacity = 0;
|
||||
int timeout = 11;
|
||||
auto conditionVarAdd = new MockConditionVar();
|
||||
auto conditionVarRem = new MockConditionVar();
|
||||
PIBlockingQueue<int> dequeue(capacity, conditionVarAdd, conditionVarRem);
|
||||
ASSERT_FALSE(dequeue.offer(11, timeout));
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, offer_timedout_is_block_when_capacity_reach) {
|
||||
size_t capacity = 0;
|
||||
int timeout = 11;
|
||||
auto conditionVarAdd = new MockConditionVar();
|
||||
auto conditionVarRem = new MockConditionVar();
|
||||
PIBlockingQueue<int> dequeue(capacity, conditionVarAdd, conditionVarRem);
|
||||
dequeue.offer(11, timeout);
|
||||
EXPECT_TRUE(conditionVarRem->isWaitForCalled);
|
||||
EXPECT_EQ(timeout, conditionVarRem->timeout);
|
||||
ASSERT_FALSE(conditionVarRem->isTrueCondition);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, offer_is_true_before_capacity_reach) {
|
||||
size_t capacity = 1;
|
||||
PIBlockingQueue<int> dequeue(capacity);
|
||||
ASSERT_TRUE(dequeue.offer(10));
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, offer_is_false_when_capacity_reach) {
|
||||
size_t capacity = 1;
|
||||
PIBlockingQueue<int> dequeue(capacity);
|
||||
dequeue.offer(11);
|
||||
ASSERT_FALSE(dequeue.offer(10));
|
||||
}
|
||||
|
||||
// TODO change take_is_block_when_empty to prevent segfault
|
||||
TEST(DISABLED_BlockingDequeueUnitTest, take_is_block_when_empty) {
|
||||
size_t capacity = 1;
|
||||
auto conditionVar = new MockConditionVar();
|
||||
PIBlockingQueue<int> dequeue(capacity, conditionVar);
|
||||
// May cause segfault because take front of empty queue
|
||||
dequeue.take();
|
||||
EXPECT_TRUE(conditionVar->isWaitCalled);
|
||||
ASSERT_FALSE(conditionVar->isTrueCondition);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, take_is_not_block_when_not_empty) {
|
||||
size_t capacity = 1;
|
||||
auto conditionVar = new MockConditionVar();
|
||||
PIBlockingQueue<int> dequeue(capacity, conditionVar);
|
||||
dequeue.offer(111);
|
||||
dequeue.take();
|
||||
|
||||
EXPECT_TRUE(conditionVar->isWaitCalled);
|
||||
ASSERT_TRUE(conditionVar->isTrueCondition);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, take_is_value_eq_to_offer_value) {
|
||||
size_t capacity = 1;
|
||||
auto conditionVar = new MockConditionVar();
|
||||
PIBlockingQueue<int> dequeue(capacity, conditionVar);
|
||||
|
||||
dequeue.offer(111);
|
||||
ASSERT_EQ(dequeue.take(), 111);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, take_is_last) {
|
||||
size_t capacity = 10;
|
||||
auto conditionVar = new MockConditionVar();
|
||||
PIBlockingQueue<int> dequeue(capacity, conditionVar);
|
||||
EXPECT_TRUE(dequeue.offer(111));
|
||||
EXPECT_TRUE(dequeue.offer(222));
|
||||
ASSERT_EQ(dequeue.take(), 111);
|
||||
ASSERT_EQ(dequeue.take(), 222);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, poll_is_not_block_when_empty) {
|
||||
size_t capacity = 1;
|
||||
bool isOk;
|
||||
auto conditionVar = new MockConditionVar();
|
||||
PIBlockingQueue<int> dequeue(capacity, conditionVar);
|
||||
dequeue.poll(0, 111, &isOk);
|
||||
EXPECT_FALSE(conditionVar->isWaitForCalled);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, poll_is_default_value_when_empty) {
|
||||
size_t capacity = 1;
|
||||
bool isOk;
|
||||
auto conditionVar = new MockConditionVar();
|
||||
PIBlockingQueue<int> dequeue(capacity, conditionVar);
|
||||
ASSERT_EQ(dequeue.poll(0, 111, &isOk), 111);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, poll_is_offer_value_when_not_empty) {
|
||||
size_t capacity = 1;
|
||||
bool isOk;
|
||||
auto conditionVar = new MockConditionVar();
|
||||
PIBlockingQueue<int> dequeue(capacity, conditionVar);
|
||||
dequeue.offer(111);
|
||||
ASSERT_EQ(dequeue.poll(0, -1, &isOk), 111);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, poll_timeouted_is_block_when_empty) {
|
||||
size_t capacity = 1;
|
||||
int timeout = 11;
|
||||
auto conditionVar = new MockConditionVar();
|
||||
PIBlockingQueue<int> dequeue(capacity, conditionVar);
|
||||
dequeue.poll(timeout, 111);
|
||||
EXPECT_TRUE(conditionVar->isWaitForCalled);
|
||||
EXPECT_EQ(timeout, conditionVar->timeout);
|
||||
ASSERT_FALSE(conditionVar->isTrueCondition);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, poll_timeouted_is_default_value_when_empty) {
|
||||
size_t capacity = 1;
|
||||
int timeout = 11;
|
||||
auto conditionVar = new MockConditionVar();
|
||||
PIBlockingQueue<int> dequeue(capacity, conditionVar);
|
||||
ASSERT_EQ(dequeue.poll(timeout, 111), 111);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, poll_timeouted_is_not_block_when_not_empty) {
|
||||
size_t capacity = 1;
|
||||
int timeout = 11;
|
||||
auto conditionVar = new MockConditionVar();
|
||||
PIBlockingQueue<int> dequeue(capacity, conditionVar);
|
||||
dequeue.offer(111);
|
||||
dequeue.poll(timeout, -1);
|
||||
|
||||
EXPECT_TRUE(conditionVar->isWaitForCalled);
|
||||
ASSERT_TRUE(conditionVar->isTrueCondition);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, poll_timeouted_is_offer_value_when_not_empty) {
|
||||
size_t capacity = 1;
|
||||
int timeout = 11;
|
||||
auto conditionVar = new MockConditionVar();
|
||||
PIBlockingQueue<int> dequeue(capacity, conditionVar);
|
||||
dequeue.offer(111);
|
||||
ASSERT_EQ(dequeue.poll(timeout, -1), 111);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, poll_timeouted_is_last) {
|
||||
size_t capacity = 10;
|
||||
auto conditionVar = new MockConditionVar();
|
||||
PIBlockingQueue<int> dequeue(capacity, conditionVar);
|
||||
dequeue.offer(111);
|
||||
dequeue.offer(222);
|
||||
ASSERT_EQ(dequeue.poll(10, -1), 111);
|
||||
ASSERT_EQ(dequeue.poll(10, -1), 222);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, capacity_is_eq_constructor_capacity) {
|
||||
size_t capacity = 10;
|
||||
PIBlockingQueue<int> dequeue(capacity);
|
||||
ASSERT_EQ(dequeue.capacity(), capacity);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, remainingCapacity_is_dif_of_capacity_and_size) {
|
||||
size_t capacity = 2;
|
||||
PIBlockingQueue<int> dequeue(capacity);
|
||||
ASSERT_EQ(dequeue.remainingCapacity(), capacity);
|
||||
dequeue.offer(111);
|
||||
ASSERT_EQ(dequeue.remainingCapacity(), capacity - 1);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, remainingCapacity_is_zero_when_capacity_reach) {
|
||||
size_t capacity = 1;
|
||||
PIBlockingQueue<int> dequeue(capacity);
|
||||
dequeue.offer(111);
|
||||
dequeue.offer(111);
|
||||
ASSERT_EQ(dequeue.remainingCapacity(), 0);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, size_is_eq_to_num_of_elements) {
|
||||
size_t capacity = 1;
|
||||
PIBlockingQueue<int> dequeue(capacity);
|
||||
ASSERT_EQ(dequeue.size(), 0);
|
||||
dequeue.offer(111);
|
||||
ASSERT_EQ(dequeue.size(), 1);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, size_is_eq_to_capacity_when_capacity_reach) {
|
||||
size_t capacity = 1;
|
||||
PIBlockingQueue<int> dequeue(capacity);
|
||||
dequeue.offer(111);
|
||||
dequeue.offer(111);
|
||||
ASSERT_EQ(dequeue.size(), capacity);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, drainTo_is_elements_moved) {
|
||||
size_t capacity = 10;
|
||||
PIDeque<int> refDeque;
|
||||
for (size_t i = 0; i < capacity / 2; ++i)
|
||||
refDeque.push_back(i * 10);
|
||||
PIBlockingQueue<int> blockingDequeue(refDeque);
|
||||
PIDeque<int> deque;
|
||||
blockingDequeue.drainTo(deque);
|
||||
ASSERT_EQ(blockingDequeue.size(), 0);
|
||||
ASSERT_TRUE(deque == refDeque);
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, drainTo_is_ret_eq_to_size_when_all_moved) {
|
||||
size_t capacity = 10;
|
||||
PIDeque<int> refDeque;
|
||||
for (size_t i = 0; i < capacity / 2; ++i)
|
||||
refDeque.push_back(i * 10);
|
||||
PIBlockingQueue<int> blockingDequeue(refDeque);
|
||||
PIDeque<int> deque;
|
||||
ASSERT_EQ(blockingDequeue.drainTo(deque), refDeque.size());
|
||||
}
|
||||
|
||||
TEST(BlockingDequeueUnitTest, drainTo_is_ret_eq_to_maxCount) {
|
||||
size_t capacity = 10;
|
||||
PIDeque<int> refDeque;
|
||||
for (size_t i = 0; i < capacity / 2; ++i)
|
||||
refDeque.push_back(i * 10);
|
||||
PIBlockingQueue<int> blockingDequeue(refDeque);
|
||||
PIDeque<int> deque;
|
||||
ASSERT_EQ(blockingDequeue.drainTo(deque, refDeque.size() - 1), refDeque.size() - 1);
|
||||
}
|
||||
@@ -1,57 +0,0 @@
|
||||
#include "piconditionvar.h"
|
||||
#include "pithread.h"
|
||||
#include "testutil.h"
|
||||
|
||||
#include "gmock/gmock.h"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
class ConditionLock
|
||||
: public ::testing::Test
|
||||
, public TestUtil {
|
||||
public:
|
||||
PIMutex * m = new PIMutex();
|
||||
bool isProtect;
|
||||
bool isReleased;
|
||||
};
|
||||
|
||||
|
||||
TEST_F(ConditionLock, DISABLED_lock_is_protect) {
|
||||
m->lock();
|
||||
isProtect = true;
|
||||
createThread([&]() {
|
||||
m->lock();
|
||||
isProtect = false;
|
||||
});
|
||||
EXPECT_FALSE(thread->waitForFinish(WAIT_THREAD_TIME_MS));
|
||||
ASSERT_TRUE(isProtect);
|
||||
}
|
||||
|
||||
TEST_F(ConditionLock, DISABLED_unlock_is_release) {
|
||||
m->lock();
|
||||
isReleased = false;
|
||||
m->unlock();
|
||||
|
||||
createThread([&]() {
|
||||
m->lock();
|
||||
isReleased = true;
|
||||
m->unlock();
|
||||
});
|
||||
ASSERT_TRUE(isReleased);
|
||||
}
|
||||
|
||||
TEST_F(ConditionLock, tryLock_is_false_when_locked) {
|
||||
createThread([&]() {
|
||||
m->lock();
|
||||
piMSleep(WAIT_THREAD_TIME_MS);
|
||||
});
|
||||
ASSERT_FALSE(m->tryLock());
|
||||
}
|
||||
|
||||
TEST_F(ConditionLock, tryLock_is_true_when_unlocked) {
|
||||
ASSERT_TRUE(m->tryLock());
|
||||
}
|
||||
|
||||
TEST_F(ConditionLock, tryLock_is_recursive_lock_enable) {
|
||||
m->lock();
|
||||
ASSERT_TRUE(m->tryLock());
|
||||
}
|
||||
@@ -1,209 +0,0 @@
|
||||
#include "piconditionvar.h"
|
||||
#include "pithread.h"
|
||||
#include "testutil.h"
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
class ConditionVariable
|
||||
: public ::testing::Test
|
||||
, public TestUtil {
|
||||
public:
|
||||
~ConditionVariable() { delete variable; }
|
||||
PIMutex m;
|
||||
PIConditionVariable * variable;
|
||||
|
||||
protected:
|
||||
void SetUp() override {
|
||||
variable = new PIConditionVariable();
|
||||
adapterFunctionDefault = [&]() {
|
||||
m.lock();
|
||||
variable->wait(m);
|
||||
m.unlock();
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(ConditionVariable, wait_is_block) {
|
||||
createThread();
|
||||
ASSERT_FALSE(thread->waitForFinish(WAIT_THREAD_TIME_MS));
|
||||
}
|
||||
|
||||
TEST_F(ConditionVariable, wait_is_block_when_notifyOne_before_wait) {
|
||||
variable->notifyOne();
|
||||
createThread();
|
||||
ASSERT_FALSE(thread->waitForFinish(WAIT_THREAD_TIME_MS));
|
||||
}
|
||||
|
||||
TEST_F(ConditionVariable, wait_is_block_when_notifyAll_before_wait) {
|
||||
variable->notifyAll();
|
||||
createThread();
|
||||
ASSERT_FALSE(thread->waitForFinish(WAIT_THREAD_TIME_MS));
|
||||
}
|
||||
|
||||
TEST_F(ConditionVariable, wait_is_unblock_when_notifyOne_after_wait) {
|
||||
createThread();
|
||||
variable->notifyOne();
|
||||
ASSERT_TRUE(thread->waitForFinish(WAIT_THREAD_TIME_MS));
|
||||
}
|
||||
|
||||
TEST_F(ConditionVariable, wait_is_unblock_when_notifyAll_after_wait) {
|
||||
PIVector<PIThread *> threads;
|
||||
|
||||
for (int i = 0; i < THREAD_COUNT; ++i) {
|
||||
threads.push_back(new PIThread([=]() { adapterFunctionDefault(); }));
|
||||
}
|
||||
|
||||
piForeach(PIThread * thread, threads)
|
||||
thread->startOnce();
|
||||
piMSleep(WAIT_THREAD_TIME_MS * THREAD_COUNT);
|
||||
variable->notifyAll();
|
||||
PITimeMeasurer measurer;
|
||||
piForeach(PIThread * thread, threads) {
|
||||
int timeout = WAIT_THREAD_TIME_MS * THREAD_COUNT - (int)measurer.elapsed_m();
|
||||
thread->waitForFinish(timeout > 0 ? timeout : 0);
|
||||
}
|
||||
for (size_t i = 0; i < threads.size(); ++i)
|
||||
EXPECT_FALSE(threads[i]->isRunning()) << "Thread " << i << " still running";
|
||||
piForeach(PIThread * thread, threads)
|
||||
delete thread;
|
||||
}
|
||||
|
||||
TEST_F(ConditionVariable, wait_is_one_unblock_when_notifyOne) {
|
||||
PIVector<PIThread *> threads;
|
||||
|
||||
for (int i = 0; i < THREAD_COUNT; ++i) {
|
||||
threads.push_back(new PIThread(adapterFunctionDefault));
|
||||
}
|
||||
|
||||
piForeach(PIThread * thread, threads)
|
||||
thread->startOnce();
|
||||
piMSleep(WAIT_THREAD_TIME_MS * THREAD_COUNT);
|
||||
variable->notifyOne();
|
||||
piMSleep(WAIT_THREAD_TIME_MS * THREAD_COUNT);
|
||||
int runningThreadCount = 0;
|
||||
piForeach(PIThread * thread, threads)
|
||||
if (thread->isRunning()) runningThreadCount++;
|
||||
ASSERT_EQ(runningThreadCount, THREAD_COUNT - 1);
|
||||
}
|
||||
|
||||
TEST_F(ConditionVariable, wait_is_protected_unblock_when_notifyOne) {
|
||||
createThread([&]() {
|
||||
m.lock();
|
||||
variable->wait(m);
|
||||
piMSleep(2 * WAIT_THREAD_TIME_MS);
|
||||
// Missing unlock
|
||||
});
|
||||
variable->notifyOne();
|
||||
piMSleep(WAIT_THREAD_TIME_MS);
|
||||
ASSERT_FALSE(m.tryLock());
|
||||
}
|
||||
|
||||
TEST_F(ConditionVariable, wait_condition_is_block) {
|
||||
createThread([&]() {
|
||||
m.lock();
|
||||
variable->wait(m, []() { return false; });
|
||||
m.unlock();
|
||||
});
|
||||
ASSERT_FALSE(thread->waitForFinish(WAIT_THREAD_TIME_MS));
|
||||
}
|
||||
|
||||
TEST_F(ConditionVariable, wait_condition_is_check_condition_before_block) {
|
||||
bool isConditionChecked = false;
|
||||
createThread([&]() {
|
||||
m.lock();
|
||||
variable->wait(m, [&]() {
|
||||
isConditionChecked = true;
|
||||
return false;
|
||||
});
|
||||
m.unlock();
|
||||
});
|
||||
m.lock();
|
||||
ASSERT_TRUE(isConditionChecked);
|
||||
m.unlock();
|
||||
}
|
||||
|
||||
TEST_F(ConditionVariable, wait_condition_is_check_condition_when_notifyOne) {
|
||||
bool isConditionChecked;
|
||||
createThread([&]() {
|
||||
m.lock();
|
||||
variable->wait(m, [&]() {
|
||||
isConditionChecked = true;
|
||||
return false;
|
||||
});
|
||||
m.unlock();
|
||||
});
|
||||
m.lock();
|
||||
isConditionChecked = false;
|
||||
m.unlock();
|
||||
variable->notifyOne();
|
||||
piMSleep(threadStartTime + 1);
|
||||
m.lock();
|
||||
ASSERT_TRUE(isConditionChecked);
|
||||
m.unlock();
|
||||
}
|
||||
|
||||
TEST_F(ConditionVariable, wait_condition_is_unblock_when_condition_and_notifyOne) {
|
||||
bool condition = false;
|
||||
createThread([&]() {
|
||||
m.lock();
|
||||
variable->wait(m, [&]() { return condition; });
|
||||
m.unlock();
|
||||
});
|
||||
m.lock();
|
||||
condition = true;
|
||||
m.unlock();
|
||||
variable->notifyOne();
|
||||
ASSERT_TRUE(thread->waitForFinish(WAIT_THREAD_TIME_MS));
|
||||
}
|
||||
|
||||
TEST_F(ConditionVariable, DISABLED_waitFor_is_block_before_timeout) {
|
||||
createThread([&]() {
|
||||
PITimeMeasurer measurer;
|
||||
m.lock();
|
||||
variable->waitFor(m, WAIT_THREAD_TIME_MS * 2);
|
||||
m.unlock();
|
||||
// Not reliable because spurious wakeup may happen
|
||||
ASSERT_GE(measurer.elapsed_m(), WAIT_THREAD_TIME_MS);
|
||||
});
|
||||
EXPECT_TRUE(thread->waitForFinish(WAIT_THREAD_TIME_MS * 3));
|
||||
}
|
||||
|
||||
TEST_F(ConditionVariable, waitFor_is_unblock_when_timeout) {
|
||||
std::atomic_bool isUnblock(false);
|
||||
createThread([&]() {
|
||||
m.lock();
|
||||
variable->waitFor(m, WAIT_THREAD_TIME_MS);
|
||||
isUnblock = true;
|
||||
m.unlock();
|
||||
});
|
||||
// Test failed if suspend forever
|
||||
EXPECT_TRUE(thread->waitForFinish(2 * WAIT_THREAD_TIME_MS));
|
||||
ASSERT_TRUE(isUnblock);
|
||||
}
|
||||
|
||||
TEST_F(ConditionVariable, waitFor_is_false_when_timeout) {
|
||||
bool waitRet = true;
|
||||
createThread([&]() {
|
||||
m.lock();
|
||||
waitRet = variable->waitFor(m, WAIT_THREAD_TIME_MS);
|
||||
m.unlock();
|
||||
});
|
||||
EXPECT_TRUE(thread->waitForFinish(2 * WAIT_THREAD_TIME_MS));
|
||||
ASSERT_FALSE(waitRet);
|
||||
}
|
||||
|
||||
TEST_F(ConditionVariable, waitFor_is_unblock_when_condition_and_notifyOne) {
|
||||
bool condition = false;
|
||||
createThread([&]() {
|
||||
m.lock();
|
||||
variable->waitFor(m, 3 * WAIT_THREAD_TIME_MS, [&]() { return condition; });
|
||||
m.unlock();
|
||||
});
|
||||
EXPECT_TRUE(thread->isRunning());
|
||||
m.lock();
|
||||
condition = true;
|
||||
m.unlock();
|
||||
variable->notifyOne();
|
||||
piMSleep(WAIT_THREAD_TIME_MS);
|
||||
ASSERT_FALSE(thread->isRunning());
|
||||
}
|
||||
@@ -1,51 +0,0 @@
|
||||
#include "pimutex.h"
|
||||
#include "pithreadpoolexecutor.h"
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
const int WAIT_THREAD_TIME_MS = 30;
|
||||
|
||||
TEST(ExcutorIntegrationTest, execute_is_runnable_invoke) {
|
||||
PIMutex m;
|
||||
int invokedRunnables = 0;
|
||||
PIThreadPoolExecutor executorService(1);
|
||||
executorService.execute([&]() {
|
||||
m.lock();
|
||||
invokedRunnables++;
|
||||
m.unlock();
|
||||
});
|
||||
piMSleep(WAIT_THREAD_TIME_MS);
|
||||
ASSERT_EQ(invokedRunnables, 1);
|
||||
}
|
||||
|
||||
TEST(ExcutorIntegrationTest, execute_is_not_execute_after_shutdown) {
|
||||
bool isRunnableInvoke = false;
|
||||
PIThreadPoolExecutor executorService(1);
|
||||
executorService.shutdown();
|
||||
executorService.execute([&]() { isRunnableInvoke = true; });
|
||||
piMSleep(WAIT_THREAD_TIME_MS);
|
||||
ASSERT_FALSE(isRunnableInvoke);
|
||||
}
|
||||
|
||||
TEST(ExcutorIntegrationTest, execute_is_execute_before_shutdown) {
|
||||
bool isRunnableInvoke = false;
|
||||
PIThreadPoolExecutor executorService(1);
|
||||
executorService.execute([&]() {
|
||||
piMSleep(WAIT_THREAD_TIME_MS);
|
||||
isRunnableInvoke = true;
|
||||
});
|
||||
executorService.shutdown();
|
||||
piMSleep(2 * WAIT_THREAD_TIME_MS);
|
||||
ASSERT_TRUE(isRunnableInvoke);
|
||||
}
|
||||
|
||||
TEST(ExcutorIntegrationTest, execute_is_awaitTermination_wait) {
|
||||
PIThreadPoolExecutor executorService(1);
|
||||
executorService.execute([&]() { piMSleep(2 * WAIT_THREAD_TIME_MS); });
|
||||
executorService.shutdown();
|
||||
PITimeMeasurer measurer;
|
||||
ASSERT_TRUE(executorService.awaitTermination(3 * WAIT_THREAD_TIME_MS));
|
||||
double waitTime = measurer.elapsed_m();
|
||||
ASSERT_GE(waitTime, WAIT_THREAD_TIME_MS);
|
||||
ASSERT_LE(waitTime, 4 * WAIT_THREAD_TIME_MS);
|
||||
}
|
||||
@@ -1,57 +0,0 @@
|
||||
#include "pithreadnotifier.h"
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
|
||||
TEST(PIThreadNotifierTest, One) {
|
||||
PIThreadNotifier n;
|
||||
int cnt = 0;
|
||||
PIThread t1(
|
||||
[&n, &cnt]() {
|
||||
n.wait();
|
||||
cnt++;
|
||||
},
|
||||
true);
|
||||
piMSleep(10);
|
||||
n.notifyOnce();
|
||||
piMSleep(10);
|
||||
ASSERT_EQ(cnt, 1);
|
||||
n.notifyOnce();
|
||||
piMSleep(10);
|
||||
ASSERT_EQ(cnt, 2);
|
||||
}
|
||||
|
||||
|
||||
TEST(PIThreadNotifierTest, Two) {
|
||||
PIThreadNotifier n;
|
||||
int cnt1 = 0;
|
||||
int cnt2 = 0;
|
||||
int cnt3 = 0;
|
||||
PIThread t1(
|
||||
[&n, &cnt1]() {
|
||||
n.wait();
|
||||
cnt1++;
|
||||
piMSleep(2);
|
||||
},
|
||||
true);
|
||||
PIThread t2(
|
||||
[&n, &cnt2]() {
|
||||
n.wait();
|
||||
cnt2++;
|
||||
piMSleep(2);
|
||||
},
|
||||
true);
|
||||
PIThread t3(
|
||||
[&n, &cnt3]() {
|
||||
n.notifyOnce();
|
||||
cnt3++;
|
||||
piMSleep(1);
|
||||
},
|
||||
true);
|
||||
piMSleep(20);
|
||||
t3.stop(true);
|
||||
piMSleep(100);
|
||||
t1.stop();
|
||||
t2.stop();
|
||||
ASSERT_EQ(cnt1 + cnt2, cnt3);
|
||||
}
|
||||
@@ -1,62 +0,0 @@
|
||||
#ifndef AWRCANFLASHER_TESTUTIL_H
|
||||
#define AWRCANFLASHER_TESTUTIL_H
|
||||
|
||||
#include "pithread.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
/**
|
||||
* Minimum wait thread start, switch context or another interthread communication action time. Increase it if tests
|
||||
* write "Start thread timeout reach!" message. You can reduce it if you want increase test performance.
|
||||
*/
|
||||
const int WAIT_THREAD_TIME_MS = 400;
|
||||
|
||||
const int THREAD_COUNT = 5;
|
||||
|
||||
class TestUtil: public PIObject {
|
||||
PIOBJECT(TestUtil)
|
||||
|
||||
public:
|
||||
double threadStartTime;
|
||||
PIThread * thread = new PIThread();
|
||||
std::atomic_bool isRunning;
|
||||
std::function<void()> adapterFunctionDefault;
|
||||
|
||||
TestUtil(): isRunning(false) {}
|
||||
|
||||
bool createThread(const std::function<void()> & fun = nullptr, PIThread * thread_ = nullptr) {
|
||||
std::function<void()> actualFun = fun == nullptr ? adapterFunctionDefault : fun;
|
||||
if (thread_ == nullptr) thread_ = thread;
|
||||
thread_->startOnce([=](void *) {
|
||||
isRunning = true;
|
||||
actualFun();
|
||||
});
|
||||
return waitThread(thread_);
|
||||
}
|
||||
|
||||
bool waitThread(PIThread * thread_, bool runningStatus = true) {
|
||||
PITimeMeasurer measurer;
|
||||
bool isTimeout = !thread_->waitForStart(WAIT_THREAD_TIME_MS);
|
||||
while (!isRunning) {
|
||||
isTimeout = WAIT_THREAD_TIME_MS <= measurer.elapsed_m();
|
||||
if (isTimeout) break;
|
||||
piUSleep(100);
|
||||
}
|
||||
|
||||
threadStartTime = measurer.elapsed_m();
|
||||
|
||||
if (isTimeout) piCout << "Start thread timeout reach!";
|
||||
|
||||
if (threadStartTime > 1) {
|
||||
piCout << "Start time" << threadStartTime << "ms";
|
||||
} else if (threadStartTime > 0.001) {
|
||||
piCout << "Start time" << threadStartTime * 1000 << "mcs";
|
||||
} else {
|
||||
piCout << "Start time" << threadStartTime * 1000 * 1000 << "ns";
|
||||
}
|
||||
|
||||
return !isTimeout;
|
||||
}
|
||||
};
|
||||
|
||||
#endif // AWRCANFLASHER_TESTUTIL_H
|
||||
Reference in New Issue
Block a user