A simple thread pool based on, highly inspired by Easy3D
Implement a simple thread pool
This thread pool is a consumer/producer queue without the consume API. Users produce task
with Push
API, which will be consumed by the threads(workers) of the pool.
The code is as follows:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
#include <atomic>
#include <cassert>
#include <chrono>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>
class ThreadPool {
using task_type = std::function<void()>;
public:
ThreadPool(size_t num = std:🧵:hardware_concurrency()) {
for (size_t i = 0; i < num; ++i) {
workers_.emplace_back(std::thread([this] {
while (true) {
task_type task;
{
std::unique_lock<std::mutex> lock(task_mutex_);
task_cond_.wait(lock, [this] { return !tasks_.empty(); });
task = std::move(tasks_.front());
tasks_.pop();
}
if (!task) {
std::cout << "worker #" << std::this_thread::get_id() << " exited" << std::endl;
push_stop_task();
return;
}
task();
}
}));
std::cout << "worker #" << workers_.back().get_id() << " started" << std::endl;
}
}
~ThreadPool() {
Stop();
}
void Stop() {
push_stop_task();
for (auto& worker : workers_) {
if (worker.joinable()) {
worker.join();
}
}
// clear all pending tasks
std::queue<task_type> empty{};
std::swap(tasks_, empty);
}
template <typename F, typename... Args>
auto Push(F&& f, Args&&... args) {
using return_type = std::invoke_result_t<F, Args...>;
auto task
= std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
auto res = task->get_future();
{
std::lock_guard<std::mutex> lock(task_mutex_);
tasks_.emplace([task] { (*task)(); });
}
task_cond_.notify_one();
return res;
}
private:
void push_stop_task() {
std::lock_guard<std::mutex> lock(task_mutex_);
tasks_.push(task_type{});
task_cond_.notify_one();
}
std::vector<std::thread> workers_;
std::queue<task_type> tasks_;
std::mutex task_mutex_;
std::condition_variable task_cond_;
};
|
wrap the task and support arbitrary return type and arguments
We have to wrap all the functions users provide to one type before we can put the task into the queue.
So the key here is how to wrap arbitrary tasks which may have different return type and arguments.
here are the steps:
- use
std::invoke_result_t
to get the return type
- use std::bind to erase argument types
- use lambda to erase the return type
How can use get the return value
Use std::future
returned by Push