4#include <condition_variable>
28 std::queue<std::function<void()>>
tasks;
41 for (
size_t i = 0; i <
nthreads; ++i)
44 std::unique_lock<std::mutex> qlock(this->queue_mutex);
45 this->condition.wait(qlock,
46 [this] { return this->stop || !this->tasks.empty(); });
47 if (this->stop && this->tasks.empty()) return;
48 auto task(std::move(this->tasks.front()));
54 if (currentTasks == 0) waitingLast.notify_all();
60 std::unique_lock<std::mutex> wlock(this->queue_mutex);
61 while (currentTasks > 0)
62 this->waitingLast.wait(wlock, [
this]() {
return currentTasks == 0; });
67 std::unique_lock<std::mutex> lock(queue_mutex);
68 tasks.emplace([func = std::forward<F>(f)]() { std::move(func)(); });
70 condition.notify_one();
76 using R =
decltype(f());
79 if (stop)
throw std::runtime_error(
"enqueue on stopped ThreadPool");
80 auto taskPtr =
new std::packaged_task<R()>([func = std::forward<F>(f)]()->R {
81 return std::move(func)();
83 std::unique_lock<std::mutex> lock(queue_mutex);
84 tasks.emplace([taskPtr]() {
88 condition.notify_one();
89 return taskPtr->get_future();
91 std::packaged_task<R()> task([func = std::forward<F>(f)]()->R {
92 return std::move(func)();
95 return task.get_future();
98 template <
typename Container,
typename F>
99 void autoChunks(Container& v,
size_t minChunkSize,
double avgTasksPerThread, F f) {
100 if (nthreads > 0 && v.size() > 2 * minChunkSize) {
101 size_t chunkSize = std::max(
102 minChunkSize,
static_cast<size_t>(
static_cast<double>(v.size()) /
103 (
static_cast<double>(nthreads) *
104 static_cast<double>(avgTasksPerThread))));
105 auto prevIt = v.begin();
106 auto nextIt = v.begin();
110 nextId = std::min(prevId + chunkSize, v.size());
111 nextIt = std::next(prevIt,
static_cast<long>(nextId) -
static_cast<long>(prevId));
112 enqueue([prevIt, nextIt, f, &v]() {
113 for (
auto i = prevIt; i != nextIt; ++i) f(*i);
117 }
while (nextId < v.size());
119 for (
auto& e : v) f(e);
125 std::unique_lock<std::mutex> lock(queue_mutex);
128 condition.notify_all();
129 for (std::thread& worker : workers) worker.join();
Simple threadpool, largely inspired by github.com/progschj/ThreadPool.
std::condition_variable waitingLast
void setNbThreads(size_t n)
std::condition_variable condition
std::vector< std::thread > workers
auto enqueueWithFuture(F &&f)
std::atomic< int > currentTasks
std::queue< std::function< void()> > tasks
void autoChunks(Container &v, size_t minChunkSize, double avgTasksPerThread, F f)
void clear()
Clears the contents of the vector.
std::mutex et al implementation for MinGW (c) 2013-2016 by Mega Limited, Auckland,...
std::thread implementation for MinGW (c) 2013-2016 by Mega Limited, Auckland, New Zealand