一个简单的 C++ 线程池

C++17 实现一个简单的线程池

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#include <future>
#include <mutex>
#include <condition_variable>
#include <queue>

class ThreadPool
{
using Task = std::packaged_task<void()>;
public:
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
ThreadPool(unsigned int threadCount) : m_thread_count(threadCount)
{

}
~ThreadPool()
{
Stop();
}
template<class F, class... Args>
auto CommitTask(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
{
using ReturnType = std::invoke_result_t<F, Args...>;
std::packaged_task<ReturnType()> packagedTask([f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...)]() mutable {
return std::apply(std::move(f), std::move(args));
});
std::future<ReturnType> taskReturn = packagedTask.get_future();
{
std::unique_lock<std::mutex> lock(m_task_mutex);
m_tasks.emplace([task = std::move(packagedTask)]() mutable {
task();
});
m_task_count++;
}
m_task_cv.notify_one();
return taskReturn;
}
void Start()
{
std::lock_guard<std::mutex> lifecycleLock(m_lifecycle_mutex);
if(m_threads.empty())
{
m_stop = false;
for(unsigned int i = 0; i < m_thread_count; ++i)
{
std::string threadName = "WorkThread [" + std::to_string(i) + "]";
std::thread thread([this, threadName = std::move(threadName)]() {
pthread_setname_np(pthread_self(), threadName.c_str());
while(true)
{
Task task;
{
std::unique_lock<std::mutex> lock(m_task_mutex);
m_waiting_thread_count++;
m_task_cv.wait(lock, [this]() {
return m_stop || (!m_pause && !m_tasks.empty());
});
m_waiting_thread_count--;
if(m_stop && m_tasks.empty())
{
break;
}
if(!m_tasks.empty())
{
task = std::move(m_tasks.front());
m_tasks.pop();
m_task_count--;
}
}
if(task.valid())
{
m_running_thread_count++;
task();
m_running_thread_count--;
}
}
});
m_threads.emplace_back(std::move(thread));
}
}
}
void Stop()
{
std::lock_guard<std::mutex> lifecycleLock(m_lifecycle_mutex);
if(!m_threads.empty())
{
{
std::lock_guard<std::mutex> lock(m_task_mutex);
m_stop = true;
m_pause = false;
}
m_task_cv.notify_all();
for(auto&& t : m_threads)
{
if(t.joinable())
{
t.join();
}
}
m_threads.clear();
m_running_thread_count = 0;
m_waiting_thread_count = 0;
m_task_count = 0;
}

}
void Pause()
{
std::lock_guard<std::mutex> lifecycleLock(m_lifecycle_mutex);
{
std::lock_guard<std::mutex> lock(m_task_mutex);
m_pause = true;
}
m_task_cv.notify_all();
}
void Resume()
{
std::lock_guard<std::mutex> lifecycleLock(m_lifecycle_mutex);
{
std::lock_guard<std::mutex> lock(m_task_mutex);
m_pause = false;
}
m_task_cv.notify_all();
}

unsigned int GetRunningThreadCount() const
{
return m_running_thread_count.load();
}

unsigned int GetWaitingThreadCount() const
{
return m_waiting_thread_count.load();
}

unsigned int GetTaskCount() const
{
return m_task_count.load();
}
private:
std::vector<std::thread> m_threads;
std::mutex m_lifecycle_mutex;
std::mutex m_task_mutex;
std::condition_variable m_task_cv;
std::queue<Task> m_tasks;
bool m_stop = false;
bool m_pause = false;
unsigned int m_thread_count = 0;
std::atomic<unsigned int> m_running_thread_count = 0;
std::atomic<unsigned int> m_waiting_thread_count = 0;
std::atomic<unsigned int> m_task_count = 0;
};

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include "ThreadPool.h"
#include <iostream>

unsigned int WorkTask(unsigned int value)
{
unsigned int totalValue = 0;
for(unsigned int i = 0; i < value + 1; ++i)
{
totalValue +=i;
}
return totalValue;
}

int main()
{
ThreadPool threadPool;
constexpr int taskACount = 50000;

std::vector<std::future<unsigned int>> taskRet;
for(unsigned i = 0; i < taskACount; ++i)
{
auto ret = threadPool.CommitTask(WorkTask, 9999999);
taskRet.emplace_back(std::move(ret));
}
std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
threadPool.Start();
threadPool.Stop();
std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "Time taken: " << duration.count() << " milliseconds" << std::endl;
return 0;
}