Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/boostorg/capy
9 : //
10 :
11 : #include <boost/capy/ex/thread_pool.hpp>
12 : #include <boost/capy/detail/intrusive.hpp>
13 : #include <boost/capy/test/thread_name.hpp>
14 : #include <condition_variable>
15 : #include <cstdio>
16 : #include <mutex>
17 : #include <stop_token>
18 : #include <thread>
19 : #include <vector>
20 :
21 : /*
22 : Thread pool implementation using a shared work queue.
23 :
24 : Work items are coroutine handles wrapped in intrusive list nodes, stored
25 : in a single queue protected by a mutex. Worker threads wait on a
26 : condition_variable_any that integrates with std::stop_token for clean
27 : shutdown.
28 :
29 : Threads are started lazily on first post() via std::call_once to avoid
30 : spawning threads for pools that are constructed but never used. Each
31 : thread is named with a configurable prefix plus index for debugger
32 : visibility.
33 :
34 : Shutdown sequence: stop() requests all threads to stop via their stop
35 : tokens, then the destructor joins threads and destroys any remaining
36 : queued work without executing it.
37 : */
38 :
39 : namespace boost {
40 : namespace capy {
41 :
42 : //------------------------------------------------------------------------------
43 :
44 : class thread_pool::impl
45 : {
46 : struct work : detail::intrusive_queue<work>::node
47 : {
48 : coro h_;
49 :
50 124 : explicit work(coro h) noexcept
51 124 : : h_(h)
52 : {
53 124 : }
54 :
55 124 : void run()
56 : {
57 124 : auto h = h_;
58 124 : delete this;
59 124 : h.resume();
60 124 : }
61 :
62 0 : void destroy()
63 : {
64 0 : delete this;
65 0 : }
66 : };
67 :
68 : std::mutex mutex_;
69 : std::condition_variable_any cv_;
70 : detail::intrusive_queue<work> q_;
71 : std::vector<std::jthread> threads_;
72 : std::size_t num_threads_;
73 : char thread_name_prefix_[13]{}; // 12 chars max + null terminator
74 : std::once_flag start_flag_;
75 :
76 : public:
77 56 : ~impl()
78 : {
79 56 : stop();
80 56 : threads_.clear();
81 :
82 56 : while(auto* w = q_.pop())
83 0 : w->destroy();
84 56 : }
85 :
86 56 : impl(std::size_t num_threads, std::string_view thread_name_prefix)
87 56 : : num_threads_(num_threads)
88 : {
89 56 : if(num_threads_ == 0)
90 1 : num_threads_ = std::thread::hardware_concurrency();
91 56 : if(num_threads_ == 0)
92 0 : num_threads_ = 1;
93 :
94 : // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
95 56 : auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
96 56 : thread_name_prefix_[n] = '\0';
97 56 : }
98 :
99 : void
100 124 : post(coro h)
101 : {
102 124 : ensure_started();
103 124 : auto* w = new work(h);
104 : {
105 124 : std::lock_guard<std::mutex> lock(mutex_);
106 124 : q_.push(w);
107 124 : }
108 124 : cv_.notify_one();
109 124 : }
110 :
111 : void
112 56 : stop() noexcept
113 : {
114 90 : for (auto& t : threads_)
115 34 : t.request_stop();
116 56 : cv_.notify_all();
117 56 : }
118 :
119 : private:
120 : void
121 124 : ensure_started()
122 : {
123 124 : std::call_once(start_flag_, [this]{
124 22 : threads_.reserve(num_threads_);
125 56 : for(std::size_t i = 0; i < num_threads_; ++i)
126 34 : threads_.emplace_back(
127 68 : [this, i](std::stop_token st){ run(st, i); });
128 22 : });
129 124 : }
130 :
131 : void
132 34 : run(std::stop_token st, std::size_t index)
133 : {
134 : // Build name; set_current_thread_name truncates to platform limits.
135 : char name[16];
136 34 : std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
137 34 : set_current_thread_name(name);
138 :
139 : for(;;)
140 : {
141 158 : work* w = nullptr;
142 : {
143 158 : std::unique_lock<std::mutex> lock(mutex_);
144 380 : if(!cv_.wait(lock, st, [this]{ return !q_.empty(); }))
145 68 : return;
146 124 : w = q_.pop();
147 158 : }
148 124 : w->run();
149 124 : }
150 : }
151 : };
152 :
153 : //------------------------------------------------------------------------------
154 :
155 56 : thread_pool::
156 : ~thread_pool()
157 : {
158 56 : shutdown();
159 56 : destroy();
160 56 : delete impl_;
161 56 : }
162 :
163 56 : thread_pool::
164 56 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
165 56 : : impl_(new impl(num_threads, thread_name_prefix))
166 : {
167 56 : }
168 :
169 : void
170 0 : thread_pool::
171 : stop() noexcept
172 : {
173 0 : impl_->stop();
174 0 : }
175 :
176 : //------------------------------------------------------------------------------
177 :
178 : void
179 124 : thread_pool::executor_type::
180 : post(coro h) const
181 : {
182 124 : pool_->impl_->post(h);
183 124 : }
184 :
185 : } // capy
186 : } // boost
|