CellModules
threadpool.hpp
Go to the documentation of this file.
1#ifndef THREAD_POOL_H
2#define THREAD_POOL_H
3
4#include <condition_variable>
5#include <functional>
6#include <future>
7#include <memory>
8#include <mutex>
9#include <queue>
10#include <stdexcept>
11#include <thread>
12#include <type_traits>
13#include <vector>
14#ifdef _WIN32
16#include "mingw/mingw.mutex.h"
17#include "mingw/mingw.thread.h"
18#endif
19
25 private:
26 size_t nthreads = 0;
28 std::queue<std::function<void()>> tasks;
29 std::mutex queue_mutex;
30 std::condition_variable condition, waitingLast;
31 bool stop = false;
32 std::atomic<int> currentTasks;
33
34 public:
35 size_t getNbThreads() { return nthreads; }
36 ThreadPool(size_t nt) : stop(false), currentTasks(0) { setNbThreads(nt); }
37
38 void setNbThreads(size_t n) {
39 workers.clear();
40 nthreads = n;
41 for (size_t i = 0; i < nthreads; ++i)
42 workers.emplace_back([this] {
43 for (;;) {
44 std::unique_lock<std::mutex> qlock(this->queue_mutex);
45 this->condition.wait(qlock,
46 [this] { return this->stop || !this->tasks.empty(); });
47 if (this->stop && this->tasks.empty()) return;
48 auto task(std::move(this->tasks.front()));
49 this->tasks.pop();
50 qlock.unlock();
51 task();
52 qlock.lock();
53 --currentTasks;
54 if (currentTasks == 0) waitingLast.notify_all();
55 }
56 });
57 }
58
60 std::unique_lock<std::mutex> wlock(this->queue_mutex);
61 while (currentTasks > 0)
62 this->waitingLast.wait(wlock, [this]() { return currentTasks == 0; });
63 }
64
65 template <class F> auto enqueue(F&& f) {
66 if (nthreads > 0) {
67 std::unique_lock<std::mutex> lock(queue_mutex);
68 tasks.emplace([func = std::forward<F>(f)]() { std::move(func)(); });
69 ++currentTasks;
70 condition.notify_one();
71 } else
72 std::forward<F>(f)();
73 }
74
75 template <class F> auto enqueueWithFuture(F&& f) {
76 using R = decltype(f());
77 if (nthreads > 0) {
78 ++currentTasks;
79 if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
80 auto taskPtr = new std::packaged_task<R()>([func = std::forward<F>(f)]()->R {
81 return std::move(func)();
82 });
83 std::unique_lock<std::mutex> lock(queue_mutex);
84 tasks.emplace([taskPtr]() {
85 (*taskPtr)();
86 delete taskPtr;
87 });
88 condition.notify_one();
89 return taskPtr->get_future();
90 } else {
91 std::packaged_task<R()> task([func = std::forward<F>(f)]()->R {
92 return std::move(func)();
93 });
94 task();
95 return task.get_future();
96 }
97 }
98 template <typename Container, typename F>
99 void autoChunks(Container& v, size_t minChunkSize, double avgTasksPerThread, F f) {
100 if (nthreads > 0 && v.size() > 2 * minChunkSize) {
101 size_t chunkSize = std::max(
102 minChunkSize, static_cast<size_t>(static_cast<double>(v.size()) /
103 (static_cast<double>(nthreads) *
104 static_cast<double>(avgTasksPerThread))));
105 auto prevIt = v.begin();
106 auto nextIt = v.begin();
107 size_t prevId = 0;
108 size_t nextId = 0;
109 do {
110 nextId = std::min(prevId + chunkSize, v.size());
111 nextIt = std::next(prevIt, static_cast<long>(nextId) - static_cast<long>(prevId));
112 enqueue([prevIt, nextIt, f, &v]() {
113 for (auto i = prevIt; i != nextIt; ++i) f(*i);
114 });
115 prevId = nextId;
116 prevIt = nextIt;
117 } while (nextId < v.size());
118 } else {
119 for (auto& e : v) f(e);
120 }
121 }
122
124 {
125 std::unique_lock<std::mutex> lock(queue_mutex);
126 stop = true;
127 }
128 condition.notify_all();
129 for (std::thread& worker : workers) worker.join();
130 }
131};
132
133#endif
Simple threadpool, largely inspired by github.com/progschj/ThreadPool.
Definition: threadpool.hpp:24
std::condition_variable waitingLast
Definition: threadpool.hpp:30
size_t nthreads
Definition: threadpool.hpp:26
void setNbThreads(size_t n)
Definition: threadpool.hpp:38
std::condition_variable condition
Definition: threadpool.hpp:30
void waitUntilLast()
Definition: threadpool.hpp:59
auto enqueue(F &&f)
Definition: threadpool.hpp:65
size_t getNbThreads()
Definition: threadpool.hpp:35
std::vector< std::thread > workers
Definition: threadpool.hpp:27
auto enqueueWithFuture(F &&f)
Definition: threadpool.hpp:75
ThreadPool(size_t nt)
Definition: threadpool.hpp:36
std::atomic< int > currentTasks
Definition: threadpool.hpp:32
std::mutex queue_mutex
Definition: threadpool.hpp:29
std::queue< std::function< void()> > tasks
Definition: threadpool.hpp:28
void autoChunks(Container &v, size_t minChunkSize, double avgTasksPerThread, F f)
Definition: threadpool.hpp:99
void clear()
Clears the contents of the vector.
Definition: std.hpp:354
std::mutex et al implementation for MinGW (c) 2013-2016 by Mega Limited, Auckland,...
std::thread implementation for MinGW (c) 2013-2016 by Mega Limited, Auckland, New Zealand