Files
pip/lib/main/thread/piexecutor.h
Stepan 3cfdda7365 PIThreadPoolExecutor & PIBlockingDequeue improvements
- add support move & copy semantic
- introduce submit method for executor with future result
2020-08-05 22:59:33 +03:00

189 lines
5.9 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/*
PIP - Platform Independent Primitives
Stephan Fomenko
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef PIEXECUTOR_H
#define PIEXECUTOR_H
#include "piblockingdequeue.h"
#include <atomic>
#include <future>
/**
* @brief Wrapper for custom invoke operator available function types.
* @note Source from: "Энтони Уильямс, Параллельное программирование на С++ в действии. Практика разработки многопоточных
* программ. Пер. с англ. Слинкин А. А. - M.: ДМК Пресс, 2012 - 672c.: ил." (page 387)
*/
class FunctionWrapper {
struct ImplBase {
virtual void call() = 0;
virtual ~ImplBase() = default;
};
std::unique_ptr<ImplBase> impl;
template<typename F>
struct ImplType: ImplBase {
F f;
explicit ImplType(F&& f): f(std::forward<F>(f)) {}
void call() final { f(); }
};
public:
template<typename F, typename = std::enable_if<!std::is_same<F, FunctionWrapper>::value> >
explicit FunctionWrapper(F&& f): impl(new ImplType<F>(std::forward<F>(f))) {}
void operator()() { impl->call(); }
explicit operator bool() const noexcept { return static_cast<bool>(impl); }
FunctionWrapper() = default;
FunctionWrapper(FunctionWrapper&& other) noexcept : impl(std::move(other.impl)) {}
FunctionWrapper& operator=(FunctionWrapper&& other) noexcept {
impl = std::move(other.impl);
return *this;
}
FunctionWrapper(const FunctionWrapper& other) = delete;
FunctionWrapper& operator=(const FunctionWrapper&) = delete;
};
template <typename Thread_, template<typename> class Dequeue_>
class PIThreadPoolExecutorTemplate {
public:
NO_COPY_CLASS(PIThreadPoolExecutorTemplate)
explicit PIThreadPoolExecutorTemplate(size_t corePoolSize = 1) : isShutdown_(false) { makePool(corePoolSize); }
virtual ~PIThreadPoolExecutorTemplate() {
shutdownNow();
while (threadPool.size() > 0) delete threadPool.take_back();
}
template<typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType&& callable) {
typedef typename std::result_of<FunctionType()>::type ResultType;
if (!isShutdown_) {
std::packaged_task<ResultType()> callable_task(std::forward<FunctionType>(callable));
auto future = callable_task.get_future();
FunctionWrapper functionWrapper(callable_task);
taskQueue.offer(std::move(functionWrapper));
return future;
} else {
return std::future<ResultType>();
}
}
template<typename FunctionType>
void execute(FunctionType&& runnable) {
if (!isShutdown_) {
FunctionWrapper function_wrapper(std::forward<FunctionType>(runnable));
taskQueue.offer(std::move(function_wrapper));
}
}
void shutdown() {
isShutdown_ = true;
}
void shutdownNow() {
isShutdown_ = true;
for (size_t i = 0; i < threadPool.size(); ++i) threadPool[i]->stop();
}
bool isShutdown() const {
return isShutdown_;
}
bool awaitTermination(int timeoutMs) {
PITimeMeasurer measurer;
for (size_t i = 0; i < threadPool.size(); ++i) {
int dif = timeoutMs - (int)measurer.elapsed_m();
if (dif < 0) return false;
if (!threadPool[i]->waitForFinish(dif)) return false;
}
return true;
}
protected:
std::atomic_bool isShutdown_;
Dequeue_<FunctionWrapper> taskQueue;
PIVector<Thread_*> threadPool;
template<typename Function>
PIThreadPoolExecutorTemplate(size_t corePoolSize, Function&& onBeforeStart) : isShutdown_(false) {
makePool(corePoolSize, std::forward<Function>(onBeforeStart));
}
void makePool(size_t corePoolSize, std::function<void(Thread_*)>&& onBeforeStart = [](Thread_*){}) {
for (size_t i = 0; i < corePoolSize; ++i) {
auto* thread = new Thread_([&, i](){
auto runnable = taskQueue.poll(100);
if (runnable) {
runnable();
}
if (isShutdown_ && taskQueue.size() == 0) threadPool[i]->stop();
});
threadPool.push_back(thread);
onBeforeStart(thread);
thread->start();
}
}
};
typedef PIThreadPoolExecutorTemplate<PIThread, PIBlockingDequeue> PIThreadPoolExecutor;
#ifdef DOXYGEN
/**
* @brief Thread pools address two different problems: they usually provide improved performance when executing large
* numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and
* managing the resources, including threads, consumed when executing a collection of tasks.
*
* TODO adapt documentation to template
*/
class PIThreadPoolExecutor {
public:
explicit PIThreadPoolExecutor(size_t corePoolSize);
virtual ~PIThreadPoolExecutor();
/**
* @brief Executes the given task sometime in the future. The task execute in an existing pooled thread. If the task
* cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been
* reached.
*
* @param runnable not empty function for thread pool execution
*/
void execute(const std::function<void()> & runnable);
/**
* @brief Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be
* accepted. Invocation has no additional effect if already shut down. This method does not wait for previously
* submitted tasks to complete execution. Use awaitTermination to do that.
*/
void shutdown();
void shutdownNow();
bool isShutdown() const;
bool awaitTermination(int timeoutMs);
};
#endif //DOXYGEN
#endif //PIEXECUTOR_H