version 4.7.2
PIThreadPoolLoop reworked, now threads starts on constructor and immediately run after start()
This commit is contained in:
@@ -6,7 +6,7 @@ endif()
|
|||||||
project(PIP)
|
project(PIP)
|
||||||
set(PIP_MAJOR 4)
|
set(PIP_MAJOR 4)
|
||||||
set(PIP_MINOR 7)
|
set(PIP_MINOR 7)
|
||||||
set(PIP_REVISION 1)
|
set(PIP_REVISION 2)
|
||||||
set(PIP_SUFFIX )
|
set(PIP_SUFFIX )
|
||||||
set(PIP_COMPANY SHS)
|
set(PIP_COMPANY SHS)
|
||||||
set(PIP_DOMAIN org.SHS)
|
set(PIP_DOMAIN org.SHS)
|
||||||
|
|||||||
@@ -106,16 +106,29 @@
|
|||||||
PIThreadPoolLoop::PIThreadPoolLoop(int thread_cnt) {
|
PIThreadPoolLoop::PIThreadPoolLoop(int thread_cnt) {
|
||||||
if (thread_cnt <= 0) thread_cnt = piMaxi(1, PISystemInfo::instance()->processorsCount);
|
if (thread_cnt <= 0) thread_cnt = piMaxi(1, PISystemInfo::instance()->processorsCount);
|
||||||
piForTimes(thread_cnt) {
|
piForTimes(thread_cnt) {
|
||||||
auto * t = new PIThread();
|
auto * t = new PIThread([this]() {
|
||||||
|
while (true) {
|
||||||
|
sem_exec.acquire();
|
||||||
|
if (is_destroy) return;
|
||||||
|
int cc = counter.fetch_add(1);
|
||||||
|
func(cc);
|
||||||
|
sem_done.release();
|
||||||
|
}
|
||||||
|
});
|
||||||
threads << t;
|
threads << t;
|
||||||
}
|
}
|
||||||
|
for (auto * t: threads)
|
||||||
|
t->start();
|
||||||
// piCout << "PIThreadPoolLoop" << proc_cnt << "threads";
|
// piCout << "PIThreadPoolLoop" << proc_cnt << "threads";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
PIThreadPoolLoop::~PIThreadPoolLoop() {
|
PIThreadPoolLoop::~PIThreadPoolLoop() {
|
||||||
for (auto * t: threads) {
|
is_destroy = true;
|
||||||
|
for (auto * t: threads)
|
||||||
t->stop();
|
t->stop();
|
||||||
|
sem_exec.release(threads.size());
|
||||||
|
for (auto * t: threads) {
|
||||||
if (!t->waitForFinish(100_ms)) t->terminate();
|
if (!t->waitForFinish(100_ms)) t->terminate();
|
||||||
delete t;
|
delete t;
|
||||||
}
|
}
|
||||||
@@ -127,20 +140,19 @@ void PIThreadPoolLoop::setFunction(std::function<void(int)> f) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void PIThreadPoolLoop::wait() {
|
||||||
|
// piCout << "wait" << wait_count;
|
||||||
|
if (wait_count <= 0) return;
|
||||||
|
sem_done.acquire(wait_count);
|
||||||
|
wait_count = 0;
|
||||||
|
// piCout << "wait done";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void PIThreadPoolLoop::start(int index_start, int index_count) {
|
void PIThreadPoolLoop::start(int index_start, int index_count) {
|
||||||
counter = index_start;
|
counter = index_start;
|
||||||
int end = index_start + index_count;
|
wait_count = index_count;
|
||||||
for (auto * t: threads)
|
sem_exec.release(index_count);
|
||||||
t->start([this, end, t]() {
|
|
||||||
while (1) {
|
|
||||||
int cc = counter.fetch_add(1);
|
|
||||||
if (cc >= end) {
|
|
||||||
t->stop();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
func(cc);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -154,9 +166,3 @@ void PIThreadPoolLoop::exec(int index_start, int index_count, std::function<void
|
|||||||
setFunction(f);
|
setFunction(f);
|
||||||
exec(index_start, index_count);
|
exec(index_start, index_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void PIThreadPoolLoop::wait() {
|
|
||||||
for (auto * t: threads)
|
|
||||||
t->waitForFinish();
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -26,6 +26,7 @@
|
|||||||
#ifndef PITHREADPOOLLOOP_H
|
#ifndef PITHREADPOOLLOOP_H
|
||||||
#define PITHREADPOOLLOOP_H
|
#define PITHREADPOOLLOOP_H
|
||||||
|
|
||||||
|
#include "pisemaphore.h"
|
||||||
#include "pivector.h"
|
#include "pivector.h"
|
||||||
|
|
||||||
class PIThread;
|
class PIThread;
|
||||||
@@ -83,7 +84,9 @@ public:
|
|||||||
private:
|
private:
|
||||||
PIVector<PIThread *> threads;
|
PIVector<PIThread *> threads;
|
||||||
std::function<void(int)> func;
|
std::function<void(int)> func;
|
||||||
std::atomic_int counter = {0};
|
PISemaphore sem_exec, sem_done;
|
||||||
|
std::atomic_bool is_destroy = {false};
|
||||||
|
std::atomic_int counter = {0}, wait_count = {0};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user