LCOV - code coverage report
Current view: top level - libs/capy/src/ex - thread_pool.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 89.0 % 73 65
Test Date: 2026-01-24 00:02:10 Functions: 87.5 % 16 14

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : // Copyright (c) 2026 Michael Vandeberg
       4              : //
       5              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7              : //
       8              : // Official repository: https://github.com/boostorg/capy
       9              : //
      10              : 
      11              : #include <boost/capy/ex/thread_pool.hpp>
      12              : #include <boost/capy/detail/intrusive.hpp>
      13              : #include <boost/capy/test/thread_name.hpp>
      14              : #include <condition_variable>
      15              : #include <cstdio>
      16              : #include <mutex>
      17              : #include <stop_token>
      18              : #include <thread>
      19              : #include <vector>
      20              : 
      21              : /*
      22              :     Thread pool implementation using a shared work queue.
      23              : 
      24              :     Work items are coroutine handles wrapped in intrusive list nodes, stored
      25              :     in a single queue protected by a mutex. Worker threads wait on a
      26              :     condition_variable_any that integrates with std::stop_token for clean
      27              :     shutdown.
      28              : 
      29              :     Threads are started lazily on first post() via std::call_once to avoid
      30              :     spawning threads for pools that are constructed but never used. Each
      31              :     thread is named with a configurable prefix plus index for debugger
      32              :     visibility.
      33              : 
      34              :     Shutdown sequence: stop() requests all threads to stop via their stop
      35              :     tokens, then the destructor joins threads and destroys any remaining
      36              :     queued work without executing it.
      37              : */
      38              : 
      39              : namespace boost {
      40              : namespace capy {
      41              : 
      42              : //------------------------------------------------------------------------------
      43              : 
      44              : class thread_pool::impl
      45              : {
      46              :     struct work : detail::intrusive_queue<work>::node
      47              :     {
      48              :         coro h_;
      49              : 
      50          124 :         explicit work(coro h) noexcept
      51          124 :             : h_(h)
      52              :         {
      53          124 :         }
      54              : 
      55          124 :         void run()
      56              :         {
      57          124 :             auto h = h_;
      58          124 :             delete this;
      59          124 :             h.resume();
      60          124 :         }
      61              : 
      62            0 :         void destroy()
      63              :         {
      64            0 :             delete this;
      65            0 :         }
      66              :     };
      67              : 
      68              :     std::mutex mutex_;
      69              :     std::condition_variable_any cv_;
      70              :     detail::intrusive_queue<work> q_;
      71              :     std::vector<std::jthread> threads_;
      72              :     std::size_t num_threads_;
      73              :     char thread_name_prefix_[13]{};  // 12 chars max + null terminator
      74              :     std::once_flag start_flag_;
      75              : 
      76              : public:
      77           56 :     ~impl()
      78              :     {
      79           56 :         stop();
      80           56 :         threads_.clear();
      81              : 
      82           56 :         while(auto* w = q_.pop())
      83            0 :             w->destroy();
      84           56 :     }
      85              : 
      86           56 :     impl(std::size_t num_threads, std::string_view thread_name_prefix)
      87           56 :         : num_threads_(num_threads)
      88              :     {
      89           56 :         if(num_threads_ == 0)
      90            1 :             num_threads_ = std::thread::hardware_concurrency();
      91           56 :         if(num_threads_ == 0)
      92            0 :             num_threads_ = 1;
      93              : 
      94              :         // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
      95           56 :         auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
      96           56 :         thread_name_prefix_[n] = '\0';
      97           56 :     }
      98              : 
      99              :     void
     100          124 :     post(coro h)
     101              :     {
     102          124 :         ensure_started();
     103          124 :         auto* w = new work(h);
     104              :         {
     105          124 :             std::lock_guard<std::mutex> lock(mutex_);
     106          124 :             q_.push(w);
     107          124 :         }
     108          124 :         cv_.notify_one();
     109          124 :     }
     110              : 
     111              :     void
     112           56 :     stop() noexcept
     113              :     {
     114           90 :         for (auto& t : threads_)
     115           34 :             t.request_stop();
     116           56 :         cv_.notify_all();
     117           56 :     }
     118              : 
     119              : private:
     120              :     void
     121          124 :     ensure_started()
     122              :     {
     123          124 :         std::call_once(start_flag_, [this]{
     124           22 :             threads_.reserve(num_threads_);
     125           56 :             for(std::size_t i = 0; i < num_threads_; ++i)
     126           34 :                 threads_.emplace_back(
     127           68 :                     [this, i](std::stop_token st){ run(st, i); });
     128           22 :         });
     129          124 :     }
     130              : 
     131              :     void
     132           34 :     run(std::stop_token st, std::size_t index)
     133              :     {
     134              :         // Build name; set_current_thread_name truncates to platform limits.
     135              :         char name[16];
     136           34 :         std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
     137           34 :         set_current_thread_name(name);
     138              : 
     139              :         for(;;)
     140              :         {
     141          158 :             work* w = nullptr;
     142              :             {
     143          158 :                 std::unique_lock<std::mutex> lock(mutex_);
     144          380 :                 if(!cv_.wait(lock, st, [this]{ return !q_.empty(); }))
     145           68 :                     return;
     146          124 :                 w = q_.pop();
     147          158 :             }
     148          124 :             w->run();
     149          124 :         }
     150              :     }
     151              : };
     152              : 
     153              : //------------------------------------------------------------------------------
     154              : 
     155           56 : thread_pool::
     156              : ~thread_pool()
     157              : {
     158           56 :     shutdown();
     159           56 :     destroy();
     160           56 :     delete impl_;
     161           56 : }
     162              : 
     163           56 : thread_pool::
     164           56 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
     165           56 :     : impl_(new impl(num_threads, thread_name_prefix))
     166              : {
     167           56 : }
     168              : 
     169              : void
     170            0 : thread_pool::
     171              : stop() noexcept
     172              : {
     173            0 :     impl_->stop();
     174            0 : }
     175              : 
     176              : //------------------------------------------------------------------------------
     177              : 
     178              : void
     179          124 : thread_pool::executor_type::
     180              : post(coro h) const
     181              : {
     182          124 :     pool_->impl_->post(h);
     183          124 : }
     184              : 
     185              : } // capy
     186              : } // boost
        

Generated by: LCOV version 2.3