15#include <condition_variable>
26#if __cpp_lib_ranges >= 201911L
36template <
typename WorkerRange>
41 for (
size_t i = 0; i < workers.size(); ++i)
43 std::string
const thread_name = name_prefix +
"_" + std::to_string(i);
44 if (!workers[i].set_name(thread_name).has_value())
46 if (!workers[i].set_scheduling_policy(policy, priority).has_value())
51 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
54template <
typename WorkerRange>
58 for (
auto& worker : workers)
60 if (!worker.set_affinity(affinity).has_value())
65 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
68template <
typename WorkerRange>
71 auto const cpu_count = std::thread::hardware_concurrency();
73 return unexpected(std::make_error_code(std::errc::invalid_argument));
76 for (
size_t i = 0; i < workers.size(); ++i)
79 if (!workers[i].set_affinity(affinity).has_value())
84 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
87template <
typename Pool,
typename Iterator,
typename F>
90 auto const total =
static_cast<size_t>(std::distance(begin, end));
94 size_t const chunk_size = (std::max)(
size_t(1), total / (num_workers * 4));
95 std::vector<std::future<void>> futures;
100 auto remaining =
static_cast<size_t>(std::distance(it, end));
101 auto this_chunk = (std::min)(chunk_size, remaining);
103 std::advance(chunk_end, this_chunk);
105 futures.push_back(pool.submit([it, chunk_end, &func]() {
106 for (auto cur = it; cur != chunk_end; ++cur)
113 for (
auto& f : futures)
128template <
typename F,
typename... Args>
131#if __cpp_init_captures >= 201803L
132 return [fn = std::forward<F>(f), ... a = std::forward<Args>(args)]()
mutable {
return fn(std::move(a)...); };
134 return [fn = std::forward<F>(f), tup = std::make_tuple(std::forward<Args>(args)...)]()
mutable {
135 return std::apply(std::move(fn), std::move(tup));
177template <
size_t TaskSize = 64>
180 static_assert(TaskSize >
sizeof(
void*),
"TaskSize must be larger than a pointer");
184 void (*invoke)(
void* storage);
185 void (*destroy)(
void* storage);
186 void (*move_to)(
void* dst,
void* src)
noexcept;
189 static constexpr size_t kBufferSize = TaskSize -
sizeof(VTable
const*);
191 template <
typename F>
192 static constexpr bool fits_inline_v =
193 sizeof(F) <= kBufferSize &&
alignof(F) <=
alignof(std::max_align_t) && std::is_nothrow_move_constructible_v<F>;
195 template <
typename F>
196 static VTable
const* vtable_for()
noexcept
198 if constexpr (fits_inline_v<F>)
200 static constexpr VTable vt{[](
void* s) { (*
static_cast<F*
>(s))(); },
201 [](
void* s) {
static_cast<F*
>(s)->~F(); },
202 [](
void* dst,
void* src)
noexcept {
203 ::new (dst) F(std::move(*
static_cast<F*
>(src)));
204 static_cast<F*
>(src)->~F();
210 static constexpr VTable vt{[](
void* s) { (*(*
static_cast<F**
>(s)))(); },
211 [](
void* s) {
delete *
static_cast<F**
>(s); },
212 [](
void* dst,
void* src)
noexcept {
213 *
static_cast<F**
>(dst) = *
static_cast<F**
>(src);
214 *
static_cast<F**
>(src) =
nullptr;
223 template <
typename F,
typename = std::enable_if_t<!std::is_same_v<std::decay_t<F>, SboCallable>>>
226 using Decay = std::decay_t<F>;
227 vtable_ = vtable_for<Decay>();
228 if constexpr (fits_inline_v<Decay>)
229 ::new (buffer_) Decay(std::forward<F>(f));
231 *
reinterpret_cast<Decay**
>(buffer_) =
new Decay(std::forward<F>(f));
238 vtable_->move_to(buffer_, other.buffer_);
239 other.vtable_ =
nullptr;
248 vtable_->destroy(buffer_);
249 vtable_ = other.vtable_;
252 vtable_->move_to(buffer_, other.buffer_);
253 other.vtable_ =
nullptr;
265 vtable_->destroy(buffer_);
268 explicit operator bool() const noexcept
270 return vtable_ !=
nullptr;
278 vt->destroy(buffer_);
282 VTable
const* vtable_ =
nullptr;
283 alignas(std::max_align_t)
unsigned char buffer_[kBufferSize];
320using TaskStartCallback = std::function<void(std::chrono::steady_clock::time_point, std::thread::id)>;
324 std::function<void(std::chrono::steady_clock::time_point, std::thread::id, std::chrono::microseconds elapsed)>;
334 struct alignas(CACHE_LINE_SIZE) AlignedItem
337 AlignedItem() =
default;
338 AlignedItem(T&& t) : item(std::move(t))
341 AlignedItem(T
const& t) : item(t)
346 std::unique_ptr<AlignedItem[]> buffer_;
349 alignas(CACHE_LINE_SIZE) std::atomic<size_t> top_{0};
350 alignas(CACHE_LINE_SIZE) std::atomic<size_t> bottom_{0};
351 alignas(CACHE_LINE_SIZE)
mutable std::mutex mutex_;
355 : buffer_(std::make_unique<AlignedItem[]>(capacity)), capacity_(capacity)
359 [[nodiscard]]
auto push(T&& item) ->
bool
361 std::lock_guard<std::mutex> lock(mutex_);
362 size_t const t = top_.load(std::memory_order_relaxed);
363 size_t const b = bottom_.load(std::memory_order_relaxed);
365 if (t - b >= capacity_)
370 buffer_[t % capacity_] = AlignedItem(std::move(item));
371 top_.store(t + 1, std::memory_order_release);
375 [[nodiscard]]
auto push(T
const& item) ->
bool
377 std::lock_guard<std::mutex> lock(mutex_);
378 size_t const t = top_.load(std::memory_order_relaxed);
379 size_t const b = bottom_.load(std::memory_order_relaxed);
381 if (t - b >= capacity_)
386 buffer_[t % capacity_] = AlignedItem(item);
387 top_.store(t + 1, std::memory_order_release);
391 [[nodiscard]]
auto pop(T& item) ->
bool
393 std::lock_guard<std::mutex> lock(mutex_);
394 size_t const t = top_.load(std::memory_order_relaxed);
395 size_t const b = bottom_.load(std::memory_order_relaxed);
402 size_t const new_top = t - 1;
403 item = std::move(buffer_[new_top % capacity_].item);
404 top_.store(new_top, std::memory_order_relaxed);
408 [[nodiscard]]
auto steal(T& item) ->
bool
410 std::lock_guard<std::mutex> lock(mutex_);
411 size_t const b = bottom_.load(std::memory_order_relaxed);
412 size_t const t = top_.load(std::memory_order_relaxed);
419 item = std::move(buffer_[b % capacity_].item);
420 bottom_.store(b + 1, std::memory_order_relaxed);
424 [[nodiscard]]
auto size() const ->
size_t
426 size_t const t = top_.load(std::memory_order_relaxed);
427 size_t const b = bottom_.load(std::memory_order_relaxed);
428 return t > b ? t - b : 0;
431 [[nodiscard]]
auto empty() const ->
bool
438 std::lock_guard<std::mutex> lock(mutex_);
439 bottom_.store(0, std::memory_order_relaxed);
440 top_.store(0, std::memory_order_relaxed);
547 using Task = std::function<void()>;
562 bool register_workers =
false)
563 : num_threads_(num_threads == 0 ? 1 : num_threads), register_workers_(register_workers), stop_(false),
564 next_victim_(0), start_time_(std::chrono::steady_clock::now())
566 worker_queues_.resize(num_threads_);
567 for (
size_t i = 0; i < num_threads_; ++i)
569 worker_queues_[i] = std::make_unique<WorkStealingDeque<Task>>(deque_capacity);
572 workers_.reserve(num_threads_);
574 for (
size_t i = 0; i < num_threads_; ++i)
576 workers_.emplace_back(&HighPerformancePool::worker_function,
this, i);
597 std::lock_guard<std::mutex> lock(overflow_mutex_);
598 if (stop_.exchange(
true, std::memory_order_acq_rel))
603 std::queue<Task> empty;
604 overflow_tasks_.swap(empty);
605 for (
auto& q : worker_queues_)
610 wakeup_condition_.notify_all();
612 for (
auto& worker : workers_)
614 if (worker.joinable())
629 auto const deadline = std::chrono::steady_clock::now() + timeout;
632 std::lock_guard<std::mutex> lock(overflow_mutex_);
633 if (stop_.load(std::memory_order_acquire))
637 std::unique_lock<std::mutex> lock(completion_mutex_);
638 bool const drained = completion_condition_.wait_until(lock, deadline, [
this] {
639 return pending_tasks() == 0 && active_tasks_.load(std::memory_order_acquire) == 0;
662 template <
typename F,
typename... Args>
663 auto try_submit(F&& f, Args&&... args) ->
expected<std::future<std::invoke_result_t<F, Args...>>, std::error_code>
665 using return_type = std::invoke_result_t<F, Args...>;
667 auto task = std::make_shared<std::packaged_task<return_type()>>(
670 std::future<return_type> result = task->get_future();
672 if (stop_.load(std::memory_order_acquire))
673 return unexpected(std::make_error_code(std::errc::operation_canceled));
675 size_t const preferred_queue = next_victim_.fetch_add(1, std::memory_order_relaxed) % num_threads_;
677 if (worker_queues_[preferred_queue]->push([task]() { (*task)(); }))
679 wakeup_condition_.notify_one();
683 for (
size_t attempts = 0; attempts < (std::min)(num_threads_, size_t(3)); ++attempts)
685 size_t const idx = (preferred_queue + attempts + 1) % num_threads_;
686 if (worker_queues_[idx]->push([task]() { (*task)(); }))
688 wakeup_condition_.notify_one();
694 std::lock_guard<std::mutex> lock(overflow_mutex_);
695 if (stop_.load(std::memory_order_relaxed))
696 return unexpected(std::make_error_code(std::errc::operation_canceled));
697 overflow_tasks_.emplace([task]() { (*task)(); });
700 wakeup_condition_.notify_all();
713 template <
typename F,
typename... Args>
714 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
716 auto result =
try_submit(std::forward<F>(f), std::forward<Args>(args)...);
717 if (!result.has_value())
718 throw std::runtime_error(
"HighPerformancePool is shutting down");
719 return std::move(result.value());
732 template <
typename F,
typename... Args>
733 void post(F&& f, Args&&... args)
735 auto r =
try_post(std::forward<F>(f), std::forward<Args>(args)...);
737 throw std::runtime_error(
"HighPerformancePool is shutting down");
746 template <
typename F,
typename... Args>
751 if (stop_.load(std::memory_order_acquire))
752 return unexpected(std::make_error_code(std::errc::operation_canceled));
754 size_t const preferred_queue = next_victim_.fetch_add(1, std::memory_order_relaxed) % num_threads_;
756 if (worker_queues_[preferred_queue]->push(std::move(bound)))
758 wakeup_condition_.notify_one();
762 for (
size_t attempts = 0; attempts < (std::min)(num_threads_, size_t(3)); ++attempts)
764 size_t const idx = (preferred_queue + attempts + 1) % num_threads_;
765 if (worker_queues_[idx]->push(std::move(bound)))
767 wakeup_condition_.notify_one();
773 std::lock_guard<std::mutex> lock(overflow_mutex_);
774 if (stop_.load(std::memory_order_relaxed))
775 return unexpected(std::make_error_code(std::errc::operation_canceled));
776 overflow_tasks_.emplace(std::move(bound));
779 wakeup_condition_.notify_all();
783#if __cpp_lib_jthread >= 201911L
790 template <
typename F,
typename... Args>
791 auto submit(std::stop_token token, F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
793 return submit([token = std::move(token),
794 bound =
detail::bind_args(std::forward<F>(f), std::forward<Args>(args)...)]()
mutable {
795 if (token.stop_requested())
796 return std::invoke_result_t<F, Args...>();
802 template <
typename F,
typename... Args>
803 auto try_submit(std::stop_token token, F&& f, Args&&... args)
804 -> expected<std::future<std::invoke_result_t<F, Args...>>, std::error_code>
806 return try_submit([token = std::move(token),
807 bound = detail::bind_args(std::forward<F>(f), std::forward<Args>(args)...)]()
mutable {
808 if (token.stop_requested())
809 return std::invoke_result_t<F, Args...>();
826 template <
typename Iterator>
829 std::vector<std::future<void>> futures;
830 size_t const batch_size = std::distance(begin, end);
831 futures.reserve(batch_size);
833 if (stop_.load(std::memory_order_acquire))
834 return unexpected(std::make_error_code(std::errc::operation_canceled));
836 size_t queue_idx = next_victim_.fetch_add(batch_size, std::memory_order_relaxed) % num_threads_;
838 for (
auto it = begin; it != end; ++it)
840 auto task = std::make_shared<std::packaged_task<void()>>(*it);
841 futures.push_back(task->get_future());
844 for (
size_t attempts = 0; attempts < num_threads_; ++attempts)
846 if (worker_queues_[queue_idx]->push([task]() { (*task)(); }))
851 queue_idx = (queue_idx + 1) % num_threads_;
856 std::lock_guard<std::mutex> lock(overflow_mutex_);
857 overflow_tasks_.emplace([task]() { (*task)(); });
861 wakeup_condition_.notify_all();
870 template <
typename Iterator>
871 auto submit_batch(Iterator begin, Iterator end) -> std::vector<std::future<void>>
874 if (!result.has_value())
875 throw std::runtime_error(
"HighPerformancePool is shutting down");
876 return std::move(result.value());
885 template <
typename Iterator,
typename F>
891#if __cpp_lib_ranges >= 201911L
893 template <std::ranges::input_range R>
894 auto submit_batch(R&& range)
896 return submit_batch(std::ranges::begin(range), std::ranges::end(range));
899 template <std::ranges::input_range R>
900 auto try_submit_batch(R&& range)
902 return try_submit_batch(std::ranges::begin(range), std::ranges::end(range));
905 template <std::ranges::input_range R,
typename F>
908 parallel_for_each(std::ranges::begin(range), std::ranges::end(range), std::forward<F>(func));
917 [[nodiscard]]
auto size() const noexcept ->
size_t
926 for (
auto const& queue : worker_queues_)
928 total += queue->size();
931 std::lock_guard<std::mutex> lock(overflow_mutex_);
932 total += overflow_tasks_.size();
939 auto const now = std::chrono::steady_clock::now();
940 auto const elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - start_time_);
944 stats.
active_threads = active_tasks_.load(std::memory_order_acquire);
946 stats.
completed_tasks = completed_tasks_.load(std::memory_order_acquire);
947 stats.
stolen_tasks = stolen_tasks_.load(std::memory_order_acquire);
949 if (elapsed.count() > 0)
958 auto const total_task_time = total_task_time_.load(std::memory_order_acquire);
1010 std::unique_lock<std::mutex> lock(completion_mutex_);
1011 completion_condition_.wait(
1012 lock, [
this] {
return pending_tasks() == 0 && active_tasks_.load(std::memory_order_acquire) == 0; });
1026 std::lock_guard<std::mutex> lock(trace_mutex_);
1027 on_task_start_ = std::move(cb);
1037 std::lock_guard<std::mutex> lock(trace_mutex_);
1038 on_task_end_ = std::move(cb);
1044 size_t num_threads_;
1045 bool register_workers_;
1046 std::vector<ThreadWrapper> workers_;
1047 std::vector<std::unique_ptr<WorkStealingDeque<Task>>> worker_queues_;
1049 std::queue<Task> overflow_tasks_;
1050 mutable std::mutex overflow_mutex_;
1052 std::atomic<bool> stop_;
1053 std::condition_variable wakeup_condition_;
1054 std::mutex wakeup_mutex_;
1056 std::condition_variable completion_condition_;
1057 std::mutex completion_mutex_;
1059 std::atomic<size_t> next_victim_;
1060 std::atomic<size_t> active_tasks_{0};
1061 std::atomic<size_t> completed_tasks_{0};
1062 std::atomic<size_t> stolen_tasks_{0};
1063 std::atomic<uint64_t> total_task_time_{0};
1065 std::mutex trace_mutex_;
1069 std::chrono::steady_clock::time_point start_time_;
1072 void worker_function(
size_t worker_id)
1074 std::optional<AutoRegisterCurrentThread> reg_guard;
1075 if (register_workers_)
1076 reg_guard.emplace(
"hp_worker_" + std::to_string(worker_id),
"threadschedule.pool");
1078 thread_local std::mt19937 gen = []() {
1079 std::random_device device;
1080 return std::mt19937(device());
1084 std::uniform_int_distribution<size_t> dist(0, num_threads_ - 1);
1088 bool found_task =
false;
1090 if (worker_queues_[worker_id]->pop(task))
1096 size_t const max_steal_attempts = (std::min)(num_threads_,
size_t(4));
1097 for (
size_t attempts = 0; attempts < max_steal_attempts; ++attempts)
1099 size_t const victim_id = dist(gen);
1100 if (victim_id != worker_id && worker_queues_[victim_id]->steal(task))
1103 stolen_tasks_.fetch_add(1, std::memory_order_relaxed);
1111 std::lock_guard<std::mutex> lock(overflow_mutex_);
1112 if (!overflow_tasks_.empty())
1114 task = std::move(overflow_tasks_.front());
1115 overflow_tasks_.pop();
1122 active_tasks_.fetch_add(1, std::memory_order_relaxed);
1124 auto const start_time = std::chrono::steady_clock::now();
1125 auto const tid = std::this_thread::get_id();
1128 std::lock_guard<std::mutex> tl(trace_mutex_);
1130 on_task_start_(start_time, tid);
1140 auto const end_time = std::chrono::steady_clock::now();
1142 auto const task_duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
1143 total_task_time_.fetch_add(task_duration.count(), std::memory_order_relaxed);
1146 std::lock_guard<std::mutex> tl(trace_mutex_);
1148 on_task_end_(end_time, tid, task_duration);
1151 active_tasks_.fetch_sub(1, std::memory_order_relaxed);
1152 completed_tasks_.fetch_add(1, std::memory_order_relaxed);
1154 completion_condition_.notify_all();
1158 if (stop_.load(std::memory_order_acquire))
1163 std::unique_lock<std::mutex> lock(wakeup_mutex_);
1164 wakeup_condition_.wait_for(lock, std::chrono::microseconds(100));
1182 template <
typename Lock,
typename Pred>
1183 static auto wait(std::condition_variable& cv, Lock& lock, Pred pred) ->
bool
1185 cv.wait(lock, pred);
1199template <
unsigned IntervalMs = 10>
1202 template <
typename Lock,
typename Pred>
1203 static auto wait(std::condition_variable& cv, Lock& lock, Pred pred) ->
bool
1205 return cv.wait_for(lock, std::chrono::milliseconds(IntervalMs), pred);
1265template <
typename WaitPolicy>
1269 using Task = std::function<void()>;
1281 explicit ThreadPoolBase(
size_t num_threads = std::thread::hardware_concurrency(),
bool register_workers =
false)
1282 : num_threads_(num_threads == 0 ? 1 : num_threads), register_workers_(register_workers), stop_(false),
1283 start_time_(std::chrono::steady_clock::now())
1285 workers_.reserve(num_threads_);
1287 for (
size_t i = 0; i < num_threads_; ++i)
1289 workers_.emplace_back(&ThreadPoolBase::worker_function,
this, i);
1309 template <
typename F,
typename... Args>
1310 auto try_submit(F&& f, Args&&... args) ->
expected<std::future<std::invoke_result_t<F, Args...>>, std::error_code>
1312 using return_type = std::invoke_result_t<F, Args...>;
1314 auto task = std::make_shared<std::packaged_task<return_type()>>(
1317 std::future<return_type> result = task->get_future();
1320 std::lock_guard<std::mutex> lock(queue_mutex_);
1322 return unexpected(std::make_error_code(std::errc::operation_canceled));
1323 tasks_.emplace([task]() { (*task)(); });
1326 condition_.notify_one();
1334 template <
typename F,
typename... Args>
1335 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
1337 auto result =
try_submit(std::forward<F>(f), std::forward<Args>(args)...);
1338 if (!result.has_value())
1339 throw std::runtime_error(
"Pool is shutting down");
1340 return std::move(result.value());
1351 template <
typename F,
typename... Args>
1354 auto r =
try_post(std::forward<F>(f), std::forward<Args>(args)...);
1356 throw std::runtime_error(
"Pool is shutting down");
1364 template <
typename F,
typename... Args>
1368 std::lock_guard<std::mutex> lock(queue_mutex_);
1370 return unexpected(std::make_error_code(std::errc::operation_canceled));
1371 tasks_.emplace(
detail::bind_args(std::forward<F>(f), std::forward<Args>(args)...));
1373 condition_.notify_one();
1377#if __cpp_lib_jthread >= 201911L
1384 template <
typename F,
typename... Args>
1385 auto submit(std::stop_token token, F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
1387 return submit([token = std::move(token),
1388 bound =
detail::bind_args(std::forward<F>(f), std::forward<Args>(args)...)]()
mutable {
1389 if (token.stop_requested())
1390 return std::invoke_result_t<F, Args...>();
1396 template <
typename F,
typename... Args>
1397 auto try_submit(std::stop_token token, F&& f, Args&&... args)
1398 -> expected<std::future<std::invoke_result_t<F, Args...>>, std::error_code>
1400 return try_submit([token = std::move(token),
1401 bound = detail::bind_args(std::forward<F>(f), std::forward<Args>(args)...)]()
mutable {
1402 if (token.stop_requested())
1403 return std::invoke_result_t<F, Args...>();
1414 template <
typename Iterator>
1417 std::vector<std::future<void>> futures;
1418 futures.reserve(std::distance(begin, end));
1421 std::lock_guard<std::mutex> lock(queue_mutex_);
1423 return unexpected(std::make_error_code(std::errc::operation_canceled));
1425 for (
auto it = begin; it != end; ++it)
1427 auto task = std::make_shared<std::packaged_task<void()>>(*it);
1428 futures.push_back(task->get_future());
1429 tasks_.emplace([task]() { (*task)(); });
1433 condition_.notify_all();
1438 template <
typename Iterator>
1439 auto submit_batch(Iterator begin, Iterator end) -> std::vector<std::future<void>>
1442 if (!result.has_value())
1443 throw std::runtime_error(
"Pool is shutting down");
1444 return std::move(result.value());
1448 template <
typename Iterator,
typename F>
1454#if __cpp_lib_ranges >= 201911L
1456 template <std::ranges::input_range R>
1457 auto submit_batch(R&& range)
1459 return submit_batch(std::ranges::begin(range), std::ranges::end(range));
1462 template <std::ranges::input_range R>
1463 auto try_submit_batch(R&& range)
1465 return try_submit_batch(std::ranges::begin(range), std::ranges::end(range));
1468 template <std::ranges::input_range R,
typename F>
1471 parallel_for_each(std::ranges::begin(range), std::ranges::end(range), std::forward<F>(func));
1482 [[nodiscard]]
auto size() const noexcept ->
size_t
1484 return num_threads_;
1490 std::lock_guard<std::mutex> lock(queue_mutex_);
1491 return tasks_.size();
1529 std::unique_lock<std::mutex> lock(queue_mutex_);
1530 task_finished_condition_.wait(
1531 lock, [
this] {
return tasks_.empty() && active_tasks_.load(std::memory_order_acquire) == 0; });
1542 std::lock_guard<std::mutex> lock(queue_mutex_);
1548 std::queue<Task> empty;
1553 condition_.notify_all();
1555 for (
auto& worker : workers_)
1557 if (worker.joinable())
1572 auto const deadline = std::chrono::steady_clock::now() + timeout;
1575 std::lock_guard<std::mutex> lock(queue_mutex_);
1580 std::unique_lock<std::mutex> lock(queue_mutex_);
1581 bool const drained = task_finished_condition_.wait_until(
1582 lock, deadline, [
this] {
return tasks_.empty() && active_tasks_.load(std::memory_order_acquire) == 0; });
1597 auto const now = std::chrono::steady_clock::now();
1598 auto const elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - start_time_);
1600 std::lock_guard<std::mutex> lock(queue_mutex_);
1602 stats.total_threads = num_threads_;
1603 stats.active_threads = active_tasks_.load(std::memory_order_acquire);
1604 stats.pending_tasks = tasks_.size();
1605 stats.completed_tasks = completed_tasks_.load(std::memory_order_acquire);
1607 if (elapsed.count() > 0)
1609 stats.tasks_per_second =
static_cast<double>(stats.completed_tasks) / elapsed.count();
1613 stats.tasks_per_second = 0.0;
1616 auto const total_task_time = total_task_time_.load(std::memory_order_acquire);
1617 if (stats.completed_tasks > 0)
1619 stats.avg_task_time = std::chrono::microseconds(total_task_time / stats.completed_tasks);
1623 stats.avg_task_time = std::chrono::microseconds(0);
1640 std::lock_guard<std::mutex> lock(trace_mutex_);
1641 on_task_start_ = std::move(cb);
1651 std::lock_guard<std::mutex> lock(trace_mutex_);
1652 on_task_end_ = std::move(cb);
1658 size_t num_threads_;
1659 bool register_workers_;
1660 std::vector<ThreadWrapper> workers_;
1661 std::queue<Task> tasks_;
1663 mutable std::mutex queue_mutex_;
1664 std::condition_variable condition_;
1665 std::condition_variable task_finished_condition_;
1666 std::atomic<bool> stop_;
1667 std::atomic<size_t> active_tasks_{0};
1668 std::atomic<size_t> completed_tasks_{0};
1669 std::atomic<uint64_t> total_task_time_{0};
1671 std::mutex trace_mutex_;
1675 std::chrono::steady_clock::time_point start_time_;
1677 void worker_function(
size_t worker_id)
1679 std::optional<AutoRegisterCurrentThread> reg_guard;
1680 if (register_workers_)
1681 reg_guard.emplace(
"pool_worker_" + std::to_string(worker_id),
"threadschedule.pool");
1686 bool found_task =
false;
1689 std::unique_lock<std::mutex> lock(queue_mutex_);
1691 if (WaitPolicy::wait(condition_, lock, [
this] {
return stop_ || !tasks_.empty(); }))
1693 if (stop_ && tasks_.empty())
1698 if (!tasks_.empty())
1700 task = std::move(tasks_.front());
1703 active_tasks_.fetch_add(1, std::memory_order_relaxed);
1714 auto const start_time = std::chrono::steady_clock::now();
1715 auto const tid = std::this_thread::get_id();
1718 std::lock_guard<std::mutex> tl(trace_mutex_);
1720 on_task_start_(start_time, tid);
1730 auto const end_time = std::chrono::steady_clock::now();
1732 auto const task_duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
1733 total_task_time_.fetch_add(task_duration.count(), std::memory_order_relaxed);
1736 std::lock_guard<std::mutex> tl(trace_mutex_);
1738 on_task_end_(end_time, tid, task_duration);
1741 active_tasks_.fetch_sub(1, std::memory_order_relaxed);
1742 completed_tasks_.fetch_add(1, std::memory_order_relaxed);
1744 task_finished_condition_.notify_all();
1845template <
size_t TaskSize = 64>
1855 : num_threads_(num_threads == 0 ? 1 : num_threads)
1857 workers_.reserve(num_threads_);
1858 for (
size_t i = 0; i < num_threads_; ++i)
1859 workers_.emplace_back(&LightweightPoolT::worker_loop,
this);
1884 template <
typename F,
typename... Args>
1887 auto r =
try_post(std::forward<F>(f), std::forward<Args>(args)...);
1889 throw std::runtime_error(
"LightweightPool is shutting down");
1898 template <
typename F,
typename... Args>
1903 std::lock_guard<std::mutex> lock(mutex_);
1905 return unexpected(std::make_error_code(std::errc::operation_canceled));
1906 tasks_.push(std::move(task));
1908 condition_.notify_one();
1921 template <
typename Iterator>
1926 throw std::runtime_error(
"LightweightPool is shutting down");
1933 template <
typename Iterator>
1937 std::lock_guard<std::mutex> lock(mutex_);
1939 return unexpected(std::make_error_code(std::errc::operation_canceled));
1940 for (
auto it = begin; it != end; ++it)
1943 condition_.notify_all();
1947#if __cpp_lib_ranges >= 201911L
1949 template <std::ranges::input_range R>
1950 void post_batch(R&& range)
1952 post_batch(std::ranges::begin(range), std::ranges::end(range));
1955 template <std::ranges::input_range R>
1956 auto try_post_batch(R&& range)
1958 return try_post_batch(std::ranges::begin(range), std::ranges::end(range));
1981 std::lock_guard<std::mutex> lock(mutex_);
1987 std::queue<detail::SboCallable<TaskSize>> empty;
1991 condition_.notify_all();
1992 for (
auto& w : workers_)
2011 auto const deadline = std::chrono::steady_clock::now() + timeout;
2013 std::lock_guard<std::mutex> lock(mutex_);
2017 std::unique_lock<std::mutex> lock(mutex_);
2018 bool const drained = drain_condition_.wait_until(
2019 lock, deadline, [
this] {
return tasks_.empty() && active_tasks_.load(std::memory_order_acquire) == 0; });
2031 [[nodiscard]]
auto size() const noexcept ->
size_t
2033 return num_threads_;
2067 size_t num_threads_;
2068 std::vector<ThreadWrapper> workers_;
2069 std::queue<detail::SboCallable<TaskSize>> tasks_;
2071 std::condition_variable condition_;
2072 std::condition_variable drain_condition_;
2073 std::atomic<bool> stop_{
false};
2074 std::atomic<size_t> active_tasks_{0};
2080 detail::SboCallable<TaskSize> task;
2082 std::unique_lock<std::mutex> lock(mutex_);
2083 condition_.wait(lock, [
this] {
return stop_ || !tasks_.empty(); });
2084 if (stop_ && tasks_.empty())
2086 if (!tasks_.empty())
2088 task = std::move(tasks_.front());
2090 active_tasks_.fetch_add(1, std::memory_order_relaxed);
2102 active_tasks_.fetch_sub(1, std::memory_order_relaxed);
2103 drain_condition_.notify_all();
2149template <
typename PoolType>
2159 static void init(
size_t num_threads)
2161 std::call_once(init_flag_(), [num_threads] { thread_count_() = num_threads; });
2167 static PoolType pool(thread_count_());
2175 template <
typename F,
typename... Args>
2178 return instance().submit(std::forward<F>(f), std::forward<Args>(args)...);
2181 template <
typename F,
typename... Args>
2184 return instance().try_submit(std::forward<F>(f), std::forward<Args>(args)...);
2187 template <
typename F,
typename... Args>
2188 static void post(F&& f, Args&&... args)
2190 instance().post(std::forward<F>(f), std::forward<Args>(args)...);
2193 template <
typename F,
typename... Args>
2196 return instance().try_post(std::forward<F>(f), std::forward<Args>(args)...);
2199 template <
typename Iterator>
2202 return instance().submit_batch(begin, end);
2205 template <
typename Iterator>
2208 return instance().try_submit_batch(begin, end);
2211 template <
typename Iterator,
typename F>
2214 instance().parallel_for_each(begin, end, std::forward<F>(func));
2217#if __cpp_lib_ranges >= 201911L
2218 template <std::ranges::input_range R>
2219 static auto submit_batch(R&& range)
2221 return instance().submit_batch(std::forward<R>(range));
2224 template <std::ranges::input_range R>
2225 static auto try_submit_batch(R&& range)
2227 return instance().try_submit_batch(std::forward<R>(range));
2230 template <std::ranges::input_range R,
typename F>
2233 instance().parallel_for_each(std::forward<R>(range), std::forward<F>(func));
2240 GlobalPool() =
default;
2242 static auto init_flag_() -> std::once_flag&
2244 static std::once_flag flag;
2248 static auto thread_count_() ->
size_t&
2250 static size_t count = std::thread::hardware_concurrency();
2284template <
typename Container,
typename F>
Singleton accessor for a process-wide pool instance.
static auto submit_batch(Iterator begin, Iterator end)
static auto try_submit(F &&f, Args &&... args)
static auto try_post(F &&f, Args &&... args)
static void parallel_for_each(Iterator begin, Iterator end, F &&func)
static auto submit(F &&f, Args &&... args)
static void post(F &&f, Args &&... args)
static auto instance() -> PoolType &
Access the singleton pool instance (created on first call).
static auto try_submit_batch(Iterator begin, Iterator end)
static void init(size_t num_threads)
Pre-configure the number of threads before first use.
Ultra-lightweight fire-and-forget thread pool.
auto try_post_batch(Iterator begin, Iterator end) -> expected< void, std::error_code >
Batch post (non-throwing).
auto operator=(LightweightPoolT const &) -> LightweightPoolT &=delete
void post_batch(Iterator begin, Iterator end)
Post a range of callables under a single lock acquisition.
auto shutdown_for(std::chrono::milliseconds timeout) -> bool
Attempt a timed drain.
void shutdown(ShutdownPolicy policy=ShutdownPolicy::drain)
Shut the pool down.
LightweightPoolT(size_t num_threads=std::thread::hardware_concurrency())
Construct a lightweight pool with num_threads workers.
auto try_post(F &&f, Args &&... args) -> expected< void, std::error_code >
Post a fire-and-forget task (non-throwing variant).
auto set_affinity(ThreadAffinity const &affinity) -> expected< void, std::error_code >
Pin all workers to the same CPU set.
LightweightPoolT(LightweightPoolT const &)=delete
void post(F &&f, Args &&... args)
Post a fire-and-forget task (throwing variant).
auto size() const noexcept -> size_t
Number of worker threads.
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal()) -> expected< void, std::error_code >
Name, schedule and prioritize all worker threads.
auto distribute_across_cpus() -> expected< void, std::error_code >
Pin each worker to a distinct CPU core (round-robin).
Manages a set of CPU indices to which a thread may be bound.
Single-queue thread pool parameterized by its idle-wait strategy.
void parallel_for_each(Iterator begin, Iterator end, F &&func)
Apply func to [begin, end) in parallel (chunked).
auto set_affinity(ThreadAffinity const &affinity) -> expected< void, std::error_code >
Pin all workers to the same CPU set.
void set_on_task_start(TaskStartCallback cb)
Register a callback invoked just before each task executes.
auto shutdown_for(std::chrono::milliseconds timeout) -> bool
Attempt a timed drain: finish as many tasks as possible within timeout, then force-stop remaining wor...
auto operator=(ThreadPoolBase const &) -> ThreadPoolBase &=delete
auto try_submit(F &&f, Args &&... args) -> expected< std::future< std::invoke_result_t< F, Args... > >, std::error_code >
Submit a task without throwing on shutdown.
void wait_for_tasks()
Block until all pending and active tasks have completed.
auto distribute_across_cpus() -> expected< void, std::error_code >
Pin each worker to a distinct CPU core (round-robin).
void shutdown(ShutdownPolicy policy=ShutdownPolicy::drain)
ThreadPoolBase(ThreadPoolBase const &)=delete
auto try_post(F &&f, Args &&... args) -> expected< void, std::error_code >
void post(F &&f, Args &&... args)
Fire-and-forget task submission (throwing variant).
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal()) -> expected< void, std::error_code >
Name, schedule and prioritize all worker threads.
std::function< void()> Task
auto submit_batch(Iterator begin, Iterator end) -> std::vector< std::future< void > >
Submit a batch of tasks (throwing).
void set_on_task_end(TaskEndCallback cb)
Register a callback invoked just after each task completes.
auto pending_tasks() const -> size_t
Number of tasks waiting in the queue.
auto size() const noexcept -> size_t
Number of worker threads.
auto submit(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task, throwing on shutdown.
ThreadPoolBase(size_t num_threads=std::thread::hardware_concurrency(), bool register_workers=false)
auto get_statistics() const -> Statistics
Collect approximate performance counters.
auto try_submit_batch(Iterator begin, Iterator end) -> expected< std::vector< std::future< void > >, std::error_code >
Submit a range of void() callables in one go (non-throwing).
Value-semantic wrapper for a thread scheduling priority.
static constexpr auto normal() noexcept -> ThreadPriority
auto steal(T &item) -> bool
static constexpr size_t DEFAULT_CAPACITY
auto pop(T &item) -> bool
auto size() const -> size_t
auto push(T const &item) -> bool
static constexpr size_t CACHE_LINE_SIZE
auto push(T &&item) -> bool
auto empty() const -> bool
WorkStealingDeque(size_t capacity=DEFAULT_CAPACITY)
Type-erased, move-only callable with configurable inline storage.
auto operator=(SboCallable const &) -> SboCallable &=delete
SboCallable(SboCallable &&other) noexcept
auto operator=(SboCallable &&other) noexcept -> SboCallable &
SboCallable(SboCallable const &)=delete
A result type that holds either a value of type T or an error of type E.
Exception thrown by expected::value() when the object is in the error state.
Polyfill for std::expected (C++23) for pre-C++23 compilers.
auto bind_args(F &&f, Args &&... args)
Bind a callable with its arguments into a nullary lambda.
auto distribute_workers_across_cpus(WorkerRange &workers) -> expected< void, std::error_code >
auto configure_worker_threads(WorkerRange &workers, std::string const &name_prefix, SchedulingPolicy policy, ThreadPriority priority) -> expected< void, std::error_code >
auto set_worker_affinity(WorkerRange &workers, ThreadAffinity const &affinity) -> expected< void, std::error_code >
void parallel_for_each_chunked(Pool &pool, Iterator begin, Iterator end, F &&func, size_t num_workers)
SchedulingPolicy
Enumeration of available thread scheduling policies.
@ OTHER
Standard round-robin time-sharing.
ShutdownPolicy
Controls how a pool handles pending tasks during shutdown.
@ drop_pending
Finish running tasks, discard queued ones.
@ drain
Finish all queued tasks before stopping (default).
ThreadPoolBase< IndefiniteWait > ThreadPool
General-purpose thread pool with indefinite blocking wait.
void parallel_for_each(Container &container, F &&func)
Convenience wrapper that applies a callable to every element of a container in parallel using the Glo...
GlobalPool< ThreadPool > GlobalThreadPool
Singleton accessor for the process-wide ThreadPool instance.
ThreadPoolBase< PollingWait<> > FastThreadPool
Thread pool with 10 ms polling wait for lower wake-up latency.
GlobalPool< HighPerformancePool > GlobalHighPerformancePool
Singleton accessor for the process-wide HighPerformancePool instance.
std::function< void(std::chrono::steady_clock::time_point, std::thread::id, std::chrono::microseconds elapsed)> TaskEndCallback
Callback invoked when a pool worker finishes executing a task.
std::function< void(std::chrono::steady_clock::time_point, std::thread::id)> TaskStartCallback
Work-stealing deque for per-thread task queues in a thread pool.
LightweightPoolT<> LightweightPool
Default lightweight pool with 64-byte task slots (56 bytes usable).
Scheduling policies, thread priority, and CPU affinity types.
Wait policy that blocks indefinitely until work is available.
static auto wait(std::condition_variable &cv, Lock &lock, Pred pred) -> bool
Wait policy that polls with a configurable timeout.
static auto wait(std::condition_variable &cv, Lock &lock, Pred pred) -> bool
std::chrono::microseconds avg_task_time
Process-wide thread registry, control blocks, and composite registry.
Enhanced thread wrappers: ThreadWrapper, JThreadWrapper, and non-owning views.