ThreadSchedule 2.0.0
Modern C++ thread management library
Loading...
Searching...
No Matches
thread_pool.hpp
Go to the documentation of this file.
1#pragma once
2
7
8#include "expected.hpp"
10#include "thread_registry.hpp"
11#include "thread_wrapper.hpp"
12#include <algorithm>
13#include <array>
14#include <atomic>
15#include <condition_variable>
16#include <cstddef>
17#include <cstdint>
18#include <future>
19#include <mutex>
20#include <optional>
21#include <queue>
22#include <random>
23#include <tuple>
24#include <vector>
25
26#if __cpp_lib_ranges >= 201911L
27# include <ranges>
28#endif
29
30namespace threadschedule
31{
32
33namespace detail
34{
35
36template <typename WorkerRange>
37inline auto configure_worker_threads(WorkerRange& workers, std::string const& name_prefix, SchedulingPolicy policy,
39{
40 bool success = true;
41 for (size_t i = 0; i < workers.size(); ++i)
42 {
43 std::string const thread_name = name_prefix + "_" + std::to_string(i);
44 if (!workers[i].set_name(thread_name).has_value())
45 success = false;
46 if (!workers[i].set_scheduling_policy(policy, priority).has_value())
47 success = false;
48 }
49 if (success)
50 return {};
51 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
52}
53
54template <typename WorkerRange>
55inline auto set_worker_affinity(WorkerRange& workers, ThreadAffinity const& affinity) -> expected<void, std::error_code>
56{
57 bool success = true;
58 for (auto& worker : workers)
59 {
60 if (!worker.set_affinity(affinity).has_value())
61 success = false;
62 }
63 if (success)
64 return {};
65 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
66}
67
68template <typename WorkerRange>
70{
71 auto const cpu_count = std::thread::hardware_concurrency();
72 if (cpu_count == 0)
73 return unexpected(std::make_error_code(std::errc::invalid_argument));
74
75 bool success = true;
76 for (size_t i = 0; i < workers.size(); ++i)
77 {
78 ThreadAffinity affinity({static_cast<int>(i % cpu_count)});
79 if (!workers[i].set_affinity(affinity).has_value())
80 success = false;
81 }
82 if (success)
83 return {};
84 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
85}
86
87template <typename Pool, typename Iterator, typename F>
88inline void parallel_for_each_chunked(Pool& pool, Iterator begin, Iterator end, F&& func, size_t num_workers)
89{
90 auto const total = static_cast<size_t>(std::distance(begin, end));
91 if (total == 0)
92 return;
93
94 size_t const chunk_size = (std::max)(size_t(1), total / (num_workers * 4));
95 std::vector<std::future<void>> futures;
96 auto it = begin;
97
98 while (it != end)
99 {
100 auto remaining = static_cast<size_t>(std::distance(it, end));
101 auto this_chunk = (std::min)(chunk_size, remaining);
102 auto chunk_end = it;
103 std::advance(chunk_end, this_chunk);
104
105 futures.push_back(pool.submit([it, chunk_end, &func]() {
106 for (auto cur = it; cur != chunk_end; ++cur)
107 func(*cur);
108 }));
109
110 it = chunk_end;
111 }
112
113 for (auto& f : futures)
114 f.get();
115}
116
117// ---------------------------------------------------------------------------
118// bind_args -- optimal argument binding, C++20 pack-capture or C++17 tuple
119// ---------------------------------------------------------------------------
120
128template <typename F, typename... Args>
129auto bind_args(F&& f, Args&&... args)
130{
131#if __cpp_init_captures >= 201803L
132 return [fn = std::forward<F>(f), ... a = std::forward<Args>(args)]() mutable { return fn(std::move(a)...); };
133#else
134 return [fn = std::forward<F>(f), tup = std::make_tuple(std::forward<Args>(args)...)]() mutable {
135 return std::apply(std::move(fn), std::move(tup));
136 };
137#endif
138}
139
140// ---------------------------------------------------------------------------
141// SboCallable -- type-erased callable with inline small-buffer storage
142// ---------------------------------------------------------------------------
143
177template <size_t TaskSize = 64>
179{
180 static_assert(TaskSize > sizeof(void*), "TaskSize must be larger than a pointer");
181
182 struct VTable
183 {
184 void (*invoke)(void* storage);
185 void (*destroy)(void* storage);
186 void (*move_to)(void* dst, void* src) noexcept;
187 };
188
189 static constexpr size_t kBufferSize = TaskSize - sizeof(VTable const*);
190
191 template <typename F>
192 static constexpr bool fits_inline_v =
193 sizeof(F) <= kBufferSize && alignof(F) <= alignof(std::max_align_t) && std::is_nothrow_move_constructible_v<F>;
194
195 template <typename F>
196 static VTable const* vtable_for() noexcept
197 {
198 if constexpr (fits_inline_v<F>)
199 {
200 static constexpr VTable vt{[](void* s) { (*static_cast<F*>(s))(); },
201 [](void* s) { static_cast<F*>(s)->~F(); },
202 [](void* dst, void* src) noexcept {
203 ::new (dst) F(std::move(*static_cast<F*>(src)));
204 static_cast<F*>(src)->~F();
205 }};
206 return &vt;
207 }
208 else
209 {
210 static constexpr VTable vt{[](void* s) { (*(*static_cast<F**>(s)))(); },
211 [](void* s) { delete *static_cast<F**>(s); },
212 [](void* dst, void* src) noexcept {
213 *static_cast<F**>(dst) = *static_cast<F**>(src);
214 *static_cast<F**>(src) = nullptr;
215 }};
216 return &vt;
217 }
218 }
219
220 public:
221 SboCallable() = default;
222
223 template <typename F, typename = std::enable_if_t<!std::is_same_v<std::decay_t<F>, SboCallable>>>
224 SboCallable(F&& f) // NOLINT(google-explicit-constructor)
225 {
226 using Decay = std::decay_t<F>;
227 vtable_ = vtable_for<Decay>();
228 if constexpr (fits_inline_v<Decay>)
229 ::new (buffer_) Decay(std::forward<F>(f));
230 else
231 *reinterpret_cast<Decay**>(buffer_) = new Decay(std::forward<F>(f));
232 }
233
234 SboCallable(SboCallable&& other) noexcept : vtable_(other.vtable_)
235 {
236 if (vtable_)
237 {
238 vtable_->move_to(buffer_, other.buffer_);
239 other.vtable_ = nullptr;
240 }
241 }
242
243 auto operator=(SboCallable&& other) noexcept -> SboCallable&
244 {
245 if (this != &other)
246 {
247 if (vtable_)
248 vtable_->destroy(buffer_);
249 vtable_ = other.vtable_;
250 if (vtable_)
251 {
252 vtable_->move_to(buffer_, other.buffer_);
253 other.vtable_ = nullptr;
254 }
255 }
256 return *this;
257 }
258
259 SboCallable(SboCallable const&) = delete;
260 auto operator=(SboCallable const&) -> SboCallable& = delete;
261
263 {
264 if (vtable_)
265 vtable_->destroy(buffer_);
266 }
267
268 explicit operator bool() const noexcept
269 {
270 return vtable_ != nullptr;
271 }
272
274 {
275 auto* vt = vtable_;
276 vtable_ = nullptr;
277 vt->invoke(buffer_);
278 vt->destroy(buffer_);
279 }
280
281 private:
282 VTable const* vtable_ = nullptr;
283 alignas(std::max_align_t) unsigned char buffer_[kBufferSize];
284};
285
286} // namespace detail
287
318
320using TaskStartCallback = std::function<void(std::chrono::steady_clock::time_point, std::thread::id)>;
321
324 std::function<void(std::chrono::steady_clock::time_point, std::thread::id, std::chrono::microseconds elapsed)>;
325
326template <typename T>
328{
329 public:
330 static constexpr size_t CACHE_LINE_SIZE = 64;
331 static constexpr size_t DEFAULT_CAPACITY = 1024;
332
333 private:
334 struct alignas(CACHE_LINE_SIZE) AlignedItem
335 {
336 T item;
337 AlignedItem() = default;
338 AlignedItem(T&& t) : item(std::move(t))
339 {
340 }
341 AlignedItem(T const& t) : item(t)
342 {
343 }
344 };
345
346 std::unique_ptr<AlignedItem[]> buffer_;
347 size_t capacity_;
348
349 alignas(CACHE_LINE_SIZE) std::atomic<size_t> top_{0}; // Owner pushes/pops here
350 alignas(CACHE_LINE_SIZE) std::atomic<size_t> bottom_{0}; // Thieves steal here
351 alignas(CACHE_LINE_SIZE) mutable std::mutex mutex_; // For synchronization
352
353 public:
354 explicit WorkStealingDeque(size_t capacity = DEFAULT_CAPACITY)
355 : buffer_(std::make_unique<AlignedItem[]>(capacity)), capacity_(capacity)
356 {
357 }
358
359 [[nodiscard]] auto push(T&& item) -> bool
360 {
361 std::lock_guard<std::mutex> lock(mutex_);
362 size_t const t = top_.load(std::memory_order_relaxed);
363 size_t const b = bottom_.load(std::memory_order_relaxed);
364
365 if (t - b >= capacity_)
366 {
367 return false;
368 }
369
370 buffer_[t % capacity_] = AlignedItem(std::move(item));
371 top_.store(t + 1, std::memory_order_release);
372 return true;
373 }
374
375 [[nodiscard]] auto push(T const& item) -> bool
376 {
377 std::lock_guard<std::mutex> lock(mutex_);
378 size_t const t = top_.load(std::memory_order_relaxed);
379 size_t const b = bottom_.load(std::memory_order_relaxed);
380
381 if (t - b >= capacity_)
382 {
383 return false;
384 }
385
386 buffer_[t % capacity_] = AlignedItem(item);
387 top_.store(t + 1, std::memory_order_release);
388 return true;
389 }
390
391 [[nodiscard]] auto pop(T& item) -> bool
392 {
393 std::lock_guard<std::mutex> lock(mutex_);
394 size_t const t = top_.load(std::memory_order_relaxed);
395 size_t const b = bottom_.load(std::memory_order_relaxed);
396
397 if (t <= b)
398 {
399 return false;
400 }
401
402 size_t const new_top = t - 1;
403 item = std::move(buffer_[new_top % capacity_].item);
404 top_.store(new_top, std::memory_order_relaxed);
405 return true;
406 }
407
408 [[nodiscard]] auto steal(T& item) -> bool
409 {
410 std::lock_guard<std::mutex> lock(mutex_);
411 size_t const b = bottom_.load(std::memory_order_relaxed);
412 size_t const t = top_.load(std::memory_order_relaxed);
413
414 if (b >= t)
415 {
416 return false;
417 }
418
419 item = std::move(buffer_[b % capacity_].item);
420 bottom_.store(b + 1, std::memory_order_relaxed);
421 return true;
422 }
423
424 [[nodiscard]] auto size() const -> size_t
425 {
426 size_t const t = top_.load(std::memory_order_relaxed);
427 size_t const b = bottom_.load(std::memory_order_relaxed);
428 return t > b ? t - b : 0;
429 }
430
431 [[nodiscard]] auto empty() const -> bool
432 {
433 return size() == 0;
434 }
435
436 void clear()
437 {
438 std::lock_guard<std::mutex> lock(mutex_);
439 bottom_.store(0, std::memory_order_relaxed);
440 top_.store(0, std::memory_order_relaxed);
441 }
442};
443
458enum class ShutdownPolicy : uint8_t
459{
462};
463
545{
546 public:
547 using Task = std::function<void()>;
548
550 {
557 std::chrono::microseconds avg_task_time;
558 };
559
560 explicit HighPerformancePool(size_t num_threads = std::thread::hardware_concurrency(),
561 size_t deque_capacity = WorkStealingDeque<Task>::DEFAULT_CAPACITY,
562 bool register_workers = false)
563 : num_threads_(num_threads == 0 ? 1 : num_threads), register_workers_(register_workers), stop_(false),
564 next_victim_(0), start_time_(std::chrono::steady_clock::now())
565 {
566 worker_queues_.resize(num_threads_);
567 for (size_t i = 0; i < num_threads_; ++i)
568 {
569 worker_queues_[i] = std::make_unique<WorkStealingDeque<Task>>(deque_capacity);
570 }
571
572 workers_.reserve(num_threads_);
573
574 for (size_t i = 0; i < num_threads_; ++i)
575 {
576 workers_.emplace_back(&HighPerformancePool::worker_function, this, i);
577 }
578 }
579
582
587
595 {
596 {
597 std::lock_guard<std::mutex> lock(overflow_mutex_);
598 if (stop_.exchange(true, std::memory_order_acq_rel))
599 return;
600
601 if (policy == ShutdownPolicy::drop_pending)
602 {
603 std::queue<Task> empty;
604 overflow_tasks_.swap(empty);
605 for (auto& q : worker_queues_)
606 q->clear();
607 }
608 }
609
610 wakeup_condition_.notify_all();
611
612 for (auto& worker : workers_)
613 {
614 if (worker.joinable())
615 worker.join();
616 }
617
618 workers_.clear();
619 }
620
627 auto shutdown_for(std::chrono::milliseconds timeout) -> bool
628 {
629 auto const deadline = std::chrono::steady_clock::now() + timeout;
630
631 {
632 std::lock_guard<std::mutex> lock(overflow_mutex_);
633 if (stop_.load(std::memory_order_acquire))
634 return true;
635 }
636
637 std::unique_lock<std::mutex> lock(completion_mutex_);
638 bool const drained = completion_condition_.wait_until(lock, deadline, [this] {
639 return pending_tasks() == 0 && active_tasks_.load(std::memory_order_acquire) == 0;
640 });
641
643 return drained;
644 }
645
662 template <typename F, typename... Args>
663 auto try_submit(F&& f, Args&&... args) -> expected<std::future<std::invoke_result_t<F, Args...>>, std::error_code>
664 {
665 using return_type = std::invoke_result_t<F, Args...>;
666
667 auto task = std::make_shared<std::packaged_task<return_type()>>(
668 detail::bind_args(std::forward<F>(f), std::forward<Args>(args)...));
669
670 std::future<return_type> result = task->get_future();
671
672 if (stop_.load(std::memory_order_acquire))
673 return unexpected(std::make_error_code(std::errc::operation_canceled));
674
675 size_t const preferred_queue = next_victim_.fetch_add(1, std::memory_order_relaxed) % num_threads_;
676
677 if (worker_queues_[preferred_queue]->push([task]() { (*task)(); }))
678 {
679 wakeup_condition_.notify_one();
680 return result;
681 }
682
683 for (size_t attempts = 0; attempts < (std::min)(num_threads_, size_t(3)); ++attempts)
684 {
685 size_t const idx = (preferred_queue + attempts + 1) % num_threads_;
686 if (worker_queues_[idx]->push([task]() { (*task)(); }))
687 {
688 wakeup_condition_.notify_one();
689 return result;
690 }
691 }
692
693 {
694 std::lock_guard<std::mutex> lock(overflow_mutex_);
695 if (stop_.load(std::memory_order_relaxed))
696 return unexpected(std::make_error_code(std::errc::operation_canceled));
697 overflow_tasks_.emplace([task]() { (*task)(); });
698 }
699
700 wakeup_condition_.notify_all();
701 return result;
702 }
703
713 template <typename F, typename... Args>
714 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
715 {
716 auto result = try_submit(std::forward<F>(f), std::forward<Args>(args)...);
717 if (!result.has_value())
718 throw std::runtime_error("HighPerformancePool is shutting down");
719 return std::move(result.value());
720 }
721
732 template <typename F, typename... Args>
733 void post(F&& f, Args&&... args)
734 {
735 auto r = try_post(std::forward<F>(f), std::forward<Args>(args)...);
736 if (!r.has_value())
737 throw std::runtime_error("HighPerformancePool is shutting down");
738 }
739
746 template <typename F, typename... Args>
747 auto try_post(F&& f, Args&&... args) -> expected<void, std::error_code>
748 {
749 Task bound(detail::bind_args(std::forward<F>(f), std::forward<Args>(args)...));
750
751 if (stop_.load(std::memory_order_acquire))
752 return unexpected(std::make_error_code(std::errc::operation_canceled));
753
754 size_t const preferred_queue = next_victim_.fetch_add(1, std::memory_order_relaxed) % num_threads_;
755
756 if (worker_queues_[preferred_queue]->push(std::move(bound)))
757 {
758 wakeup_condition_.notify_one();
759 return {};
760 }
761
762 for (size_t attempts = 0; attempts < (std::min)(num_threads_, size_t(3)); ++attempts)
763 {
764 size_t const idx = (preferred_queue + attempts + 1) % num_threads_;
765 if (worker_queues_[idx]->push(std::move(bound)))
766 {
767 wakeup_condition_.notify_one();
768 return {};
769 }
770 }
771
772 {
773 std::lock_guard<std::mutex> lock(overflow_mutex_);
774 if (stop_.load(std::memory_order_relaxed))
775 return unexpected(std::make_error_code(std::errc::operation_canceled));
776 overflow_tasks_.emplace(std::move(bound));
777 }
778
779 wakeup_condition_.notify_all();
780 return {};
781 }
782
783#if __cpp_lib_jthread >= 201911L
790 template <typename F, typename... Args>
791 auto submit(std::stop_token token, F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
792 {
793 return submit([token = std::move(token),
794 bound = detail::bind_args(std::forward<F>(f), std::forward<Args>(args)...)]() mutable {
795 if (token.stop_requested())
796 return std::invoke_result_t<F, Args...>();
797 return bound();
798 });
799 }
800
802 template <typename F, typename... Args>
803 auto try_submit(std::stop_token token, F&& f, Args&&... args)
804 -> expected<std::future<std::invoke_result_t<F, Args...>>, std::error_code>
805 {
806 return try_submit([token = std::move(token),
807 bound = detail::bind_args(std::forward<F>(f), std::forward<Args>(args)...)]() mutable {
808 if (token.stop_requested())
809 return std::invoke_result_t<F, Args...>();
810 return bound();
811 });
812 }
813#endif
814
826 template <typename Iterator>
827 auto try_submit_batch(Iterator begin, Iterator end) -> expected<std::vector<std::future<void>>, std::error_code>
828 {
829 std::vector<std::future<void>> futures;
830 size_t const batch_size = std::distance(begin, end);
831 futures.reserve(batch_size);
832
833 if (stop_.load(std::memory_order_acquire))
834 return unexpected(std::make_error_code(std::errc::operation_canceled));
835
836 size_t queue_idx = next_victim_.fetch_add(batch_size, std::memory_order_relaxed) % num_threads_;
837
838 for (auto it = begin; it != end; ++it)
839 {
840 auto task = std::make_shared<std::packaged_task<void()>>(*it);
841 futures.push_back(task->get_future());
842
843 bool queued = false;
844 for (size_t attempts = 0; attempts < num_threads_; ++attempts)
845 {
846 if (worker_queues_[queue_idx]->push([task]() { (*task)(); }))
847 {
848 queued = true;
849 break;
850 }
851 queue_idx = (queue_idx + 1) % num_threads_;
852 }
853
854 if (!queued)
855 {
856 std::lock_guard<std::mutex> lock(overflow_mutex_);
857 overflow_tasks_.emplace([task]() { (*task)(); });
858 }
859 }
860
861 wakeup_condition_.notify_all();
862 return futures;
863 }
864
870 template <typename Iterator>
871 auto submit_batch(Iterator begin, Iterator end) -> std::vector<std::future<void>>
872 {
873 auto result = try_submit_batch(begin, end);
874 if (!result.has_value())
875 throw std::runtime_error("HighPerformancePool is shutting down");
876 return std::move(result.value());
877 }
878
885 template <typename Iterator, typename F>
886 void parallel_for_each(Iterator begin, Iterator end, F&& func)
887 {
888 detail::parallel_for_each_chunked(*this, begin, end, std::forward<F>(func), num_threads_);
889 }
890
891#if __cpp_lib_ranges >= 201911L
893 template <std::ranges::input_range R>
894 auto submit_batch(R&& range)
895 {
896 return submit_batch(std::ranges::begin(range), std::ranges::end(range));
897 }
898
899 template <std::ranges::input_range R>
900 auto try_submit_batch(R&& range)
901 {
902 return try_submit_batch(std::ranges::begin(range), std::ranges::end(range));
903 }
904
905 template <std::ranges::input_range R, typename F>
906 void parallel_for_each(R&& range, F&& func)
907 {
908 parallel_for_each(std::ranges::begin(range), std::ranges::end(range), std::forward<F>(func));
909 }
911#endif
912
915
917 [[nodiscard]] auto size() const noexcept -> size_t
918 {
919 return num_threads_;
920 }
921
923 [[nodiscard]] auto pending_tasks() const -> size_t
924 {
925 size_t total = 0;
926 for (auto const& queue : worker_queues_)
927 {
928 total += queue->size();
929 }
930
931 std::lock_guard<std::mutex> lock(overflow_mutex_);
932 total += overflow_tasks_.size();
933 return total;
934 }
935
938 {
939 auto const now = std::chrono::steady_clock::now();
940 auto const elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - start_time_);
941
942 Statistics stats;
943 stats.total_threads = num_threads_;
944 stats.active_threads = active_tasks_.load(std::memory_order_acquire);
946 stats.completed_tasks = completed_tasks_.load(std::memory_order_acquire);
947 stats.stolen_tasks = stolen_tasks_.load(std::memory_order_acquire);
948
949 if (elapsed.count() > 0)
950 {
951 stats.tasks_per_second = static_cast<double>(stats.completed_tasks) / elapsed.count();
952 }
953 else
954 {
955 stats.tasks_per_second = 0.0;
956 }
957
958 auto const total_task_time = total_task_time_.load(std::memory_order_acquire);
959 if (stats.completed_tasks > 0)
960 {
961 stats.avg_task_time = std::chrono::microseconds(total_task_time / stats.completed_tasks);
962 }
963 else
964 {
965 stats.avg_task_time = std::chrono::microseconds(0);
966 }
967
968 return stats;
969 }
970
972
975
984 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
986 {
987 return detail::configure_worker_threads(workers_, name_prefix, policy, priority);
988 }
989
992 {
993 return detail::set_worker_affinity(workers_, affinity);
994 }
995
1001
1003
1006
1009 {
1010 std::unique_lock<std::mutex> lock(completion_mutex_);
1011 completion_condition_.wait(
1012 lock, [this] { return pending_tasks() == 0 && active_tasks_.load(std::memory_order_acquire) == 0; });
1013 }
1014
1016
1019
1025 {
1026 std::lock_guard<std::mutex> lock(trace_mutex_);
1027 on_task_start_ = std::move(cb);
1028 }
1029
1036 {
1037 std::lock_guard<std::mutex> lock(trace_mutex_);
1038 on_task_end_ = std::move(cb);
1039 }
1040
1042
1043 private:
1044 size_t num_threads_;
1045 bool register_workers_;
1046 std::vector<ThreadWrapper> workers_;
1047 std::vector<std::unique_ptr<WorkStealingDeque<Task>>> worker_queues_;
1048
1049 std::queue<Task> overflow_tasks_;
1050 mutable std::mutex overflow_mutex_;
1051
1052 std::atomic<bool> stop_;
1053 std::condition_variable wakeup_condition_;
1054 std::mutex wakeup_mutex_;
1055
1056 std::condition_variable completion_condition_;
1057 std::mutex completion_mutex_;
1058
1059 std::atomic<size_t> next_victim_;
1060 std::atomic<size_t> active_tasks_{0};
1061 std::atomic<size_t> completed_tasks_{0};
1062 std::atomic<size_t> stolen_tasks_{0};
1063 std::atomic<uint64_t> total_task_time_{0};
1064
1065 std::mutex trace_mutex_;
1066 TaskStartCallback on_task_start_;
1067 TaskEndCallback on_task_end_;
1068
1069 std::chrono::steady_clock::time_point start_time_;
1070
1071 // NOLINTNEXTLINE(readability-function-cognitive-complexity)
1072 void worker_function(size_t worker_id)
1073 {
1074 std::optional<AutoRegisterCurrentThread> reg_guard;
1075 if (register_workers_)
1076 reg_guard.emplace("hp_worker_" + std::to_string(worker_id), "threadschedule.pool");
1077
1078 thread_local std::mt19937 gen = []() {
1079 std::random_device device;
1080 return std::mt19937(device());
1081 }();
1082
1083 Task task;
1084 std::uniform_int_distribution<size_t> dist(0, num_threads_ - 1);
1085
1086 while (true)
1087 {
1088 bool found_task = false;
1089
1090 if (worker_queues_[worker_id]->pop(task))
1091 {
1092 found_task = true;
1093 }
1094 else
1095 {
1096 size_t const max_steal_attempts = (std::min)(num_threads_, size_t(4));
1097 for (size_t attempts = 0; attempts < max_steal_attempts; ++attempts)
1098 {
1099 size_t const victim_id = dist(gen);
1100 if (victim_id != worker_id && worker_queues_[victim_id]->steal(task))
1101 {
1102 found_task = true;
1103 stolen_tasks_.fetch_add(1, std::memory_order_relaxed);
1104 break;
1105 }
1106 }
1107 }
1108
1109 if (!found_task)
1110 {
1111 std::lock_guard<std::mutex> lock(overflow_mutex_);
1112 if (!overflow_tasks_.empty())
1113 {
1114 task = std::move(overflow_tasks_.front());
1115 overflow_tasks_.pop();
1116 found_task = true;
1117 }
1118 }
1119
1120 if (found_task)
1121 {
1122 active_tasks_.fetch_add(1, std::memory_order_relaxed);
1123
1124 auto const start_time = std::chrono::steady_clock::now();
1125 auto const tid = std::this_thread::get_id();
1126
1127 {
1128 std::lock_guard<std::mutex> tl(trace_mutex_);
1129 if (on_task_start_)
1130 on_task_start_(start_time, tid);
1131 }
1132
1133 try
1134 {
1135 task();
1136 }
1137 catch (...)
1138 {
1139 }
1140 auto const end_time = std::chrono::steady_clock::now();
1141
1142 auto const task_duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
1143 total_task_time_.fetch_add(task_duration.count(), std::memory_order_relaxed);
1144
1145 {
1146 std::lock_guard<std::mutex> tl(trace_mutex_);
1147 if (on_task_end_)
1148 on_task_end_(end_time, tid, task_duration);
1149 }
1150
1151 active_tasks_.fetch_sub(1, std::memory_order_relaxed);
1152 completed_tasks_.fetch_add(1, std::memory_order_relaxed);
1153
1154 completion_condition_.notify_all();
1155 }
1156 else
1157 {
1158 if (stop_.load(std::memory_order_acquire))
1159 {
1160 break;
1161 }
1162
1163 std::unique_lock<std::mutex> lock(wakeup_mutex_);
1164 wakeup_condition_.wait_for(lock, std::chrono::microseconds(100));
1165 }
1166 }
1167 }
1168};
1169
1170// ---------------------------------------------------------------------------
1171// Wait policies for ThreadPoolBase
1172// ---------------------------------------------------------------------------
1173
1181{
1182 template <typename Lock, typename Pred>
1183 static auto wait(std::condition_variable& cv, Lock& lock, Pred pred) -> bool
1184 {
1185 cv.wait(lock, pred);
1186 return true;
1187 }
1188};
1189
1199template <unsigned IntervalMs = 10>
1201{
1202 template <typename Lock, typename Pred>
1203 static auto wait(std::condition_variable& cv, Lock& lock, Pred pred) -> bool
1204 {
1205 return cv.wait_for(lock, std::chrono::milliseconds(IntervalMs), pred);
1206 }
1207};
1208
1209// ---------------------------------------------------------------------------
1210// ThreadPoolBase
1211// ---------------------------------------------------------------------------
1212
1265template <typename WaitPolicy>
1267{
1268 public:
1269 using Task = std::function<void()>;
1270
1272 {
1278 std::chrono::microseconds avg_task_time;
1279 };
1280
1281 explicit ThreadPoolBase(size_t num_threads = std::thread::hardware_concurrency(), bool register_workers = false)
1282 : num_threads_(num_threads == 0 ? 1 : num_threads), register_workers_(register_workers), stop_(false),
1283 start_time_(std::chrono::steady_clock::now())
1284 {
1285 workers_.reserve(num_threads_);
1286
1287 for (size_t i = 0; i < num_threads_; ++i)
1288 {
1289 workers_.emplace_back(&ThreadPoolBase::worker_function, this, i);
1290 }
1291 }
1292
1294 auto operator=(ThreadPoolBase const&) -> ThreadPoolBase& = delete;
1295
1300
1303
1309 template <typename F, typename... Args>
1310 auto try_submit(F&& f, Args&&... args) -> expected<std::future<std::invoke_result_t<F, Args...>>, std::error_code>
1311 {
1312 using return_type = std::invoke_result_t<F, Args...>;
1313
1314 auto task = std::make_shared<std::packaged_task<return_type()>>(
1315 detail::bind_args(std::forward<F>(f), std::forward<Args>(args)...));
1316
1317 std::future<return_type> result = task->get_future();
1318
1319 {
1320 std::lock_guard<std::mutex> lock(queue_mutex_);
1321 if (stop_)
1322 return unexpected(std::make_error_code(std::errc::operation_canceled));
1323 tasks_.emplace([task]() { (*task)(); });
1324 }
1325
1326 condition_.notify_one();
1327 return result;
1328 }
1329
1334 template <typename F, typename... Args>
1335 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
1336 {
1337 auto result = try_submit(std::forward<F>(f), std::forward<Args>(args)...);
1338 if (!result.has_value())
1339 throw std::runtime_error("Pool is shutting down");
1340 return std::move(result.value());
1341 }
1342
1351 template <typename F, typename... Args>
1352 void post(F&& f, Args&&... args)
1353 {
1354 auto r = try_post(std::forward<F>(f), std::forward<Args>(args)...);
1355 if (!r.has_value())
1356 throw std::runtime_error("Pool is shutting down");
1357 }
1358
1364 template <typename F, typename... Args>
1365 auto try_post(F&& f, Args&&... args) -> expected<void, std::error_code>
1366 {
1367 {
1368 std::lock_guard<std::mutex> lock(queue_mutex_);
1369 if (stop_)
1370 return unexpected(std::make_error_code(std::errc::operation_canceled));
1371 tasks_.emplace(detail::bind_args(std::forward<F>(f), std::forward<Args>(args)...));
1372 }
1373 condition_.notify_one();
1374 return {};
1375 }
1376
1377#if __cpp_lib_jthread >= 201911L
1384 template <typename F, typename... Args>
1385 auto submit(std::stop_token token, F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
1386 {
1387 return submit([token = std::move(token),
1388 bound = detail::bind_args(std::forward<F>(f), std::forward<Args>(args)...)]() mutable {
1389 if (token.stop_requested())
1390 return std::invoke_result_t<F, Args...>();
1391 return bound();
1392 });
1393 }
1394
1396 template <typename F, typename... Args>
1397 auto try_submit(std::stop_token token, F&& f, Args&&... args)
1398 -> expected<std::future<std::invoke_result_t<F, Args...>>, std::error_code>
1399 {
1400 return try_submit([token = std::move(token),
1401 bound = detail::bind_args(std::forward<F>(f), std::forward<Args>(args)...)]() mutable {
1402 if (token.stop_requested())
1403 return std::invoke_result_t<F, Args...>();
1404 return bound();
1405 });
1406 }
1407#endif
1408
1414 template <typename Iterator>
1415 auto try_submit_batch(Iterator begin, Iterator end) -> expected<std::vector<std::future<void>>, std::error_code>
1416 {
1417 std::vector<std::future<void>> futures;
1418 futures.reserve(std::distance(begin, end));
1419
1420 {
1421 std::lock_guard<std::mutex> lock(queue_mutex_);
1422 if (stop_)
1423 return unexpected(std::make_error_code(std::errc::operation_canceled));
1424
1425 for (auto it = begin; it != end; ++it)
1426 {
1427 auto task = std::make_shared<std::packaged_task<void()>>(*it);
1428 futures.push_back(task->get_future());
1429 tasks_.emplace([task]() { (*task)(); });
1430 }
1431 }
1432
1433 condition_.notify_all();
1434 return futures;
1435 }
1436
1438 template <typename Iterator>
1439 auto submit_batch(Iterator begin, Iterator end) -> std::vector<std::future<void>>
1440 {
1441 auto result = try_submit_batch(begin, end);
1442 if (!result.has_value())
1443 throw std::runtime_error("Pool is shutting down");
1444 return std::move(result.value());
1445 }
1446
1448 template <typename Iterator, typename F>
1449 void parallel_for_each(Iterator begin, Iterator end, F&& func)
1450 {
1451 detail::parallel_for_each_chunked(*this, begin, end, std::forward<F>(func), num_threads_);
1452 }
1453
1454#if __cpp_lib_ranges >= 201911L
1456 template <std::ranges::input_range R>
1457 auto submit_batch(R&& range)
1458 {
1459 return submit_batch(std::ranges::begin(range), std::ranges::end(range));
1460 }
1461
1462 template <std::ranges::input_range R>
1463 auto try_submit_batch(R&& range)
1464 {
1465 return try_submit_batch(std::ranges::begin(range), std::ranges::end(range));
1466 }
1467
1468 template <std::ranges::input_range R, typename F>
1469 void parallel_for_each(R&& range, F&& func)
1470 {
1471 parallel_for_each(std::ranges::begin(range), std::ranges::end(range), std::forward<F>(func));
1472 }
1474#endif
1475
1477
1480
1482 [[nodiscard]] auto size() const noexcept -> size_t
1483 {
1484 return num_threads_;
1485 }
1486
1488 [[nodiscard]] auto pending_tasks() const -> size_t
1489 {
1490 std::lock_guard<std::mutex> lock(queue_mutex_);
1491 return tasks_.size();
1492 }
1493
1495
1498
1503 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
1505 {
1506 return detail::configure_worker_threads(workers_, name_prefix, policy, priority);
1507 }
1508
1511 {
1512 return detail::set_worker_affinity(workers_, affinity);
1513 }
1514
1520
1522
1525
1528 {
1529 std::unique_lock<std::mutex> lock(queue_mutex_);
1530 task_finished_condition_.wait(
1531 lock, [this] { return tasks_.empty() && active_tasks_.load(std::memory_order_acquire) == 0; });
1532 }
1533
1540 {
1541 {
1542 std::lock_guard<std::mutex> lock(queue_mutex_);
1543 if (stop_)
1544 return;
1545 stop_ = true;
1546 if (policy == ShutdownPolicy::drop_pending)
1547 {
1548 std::queue<Task> empty;
1549 tasks_.swap(empty);
1550 }
1551 }
1552
1553 condition_.notify_all();
1554
1555 for (auto& worker : workers_)
1556 {
1557 if (worker.joinable())
1558 worker.join();
1559 }
1560
1561 workers_.clear();
1562 }
1563
1570 auto shutdown_for(std::chrono::milliseconds timeout) -> bool
1571 {
1572 auto const deadline = std::chrono::steady_clock::now() + timeout;
1573
1574 {
1575 std::lock_guard<std::mutex> lock(queue_mutex_);
1576 if (stop_)
1577 return true;
1578 }
1579
1580 std::unique_lock<std::mutex> lock(queue_mutex_);
1581 bool const drained = task_finished_condition_.wait_until(
1582 lock, deadline, [this] { return tasks_.empty() && active_tasks_.load(std::memory_order_acquire) == 0; });
1583 lock.unlock();
1584
1586 return drained;
1587 }
1588
1590
1593
1595 [[nodiscard]] auto get_statistics() const -> Statistics
1596 {
1597 auto const now = std::chrono::steady_clock::now();
1598 auto const elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - start_time_);
1599
1600 std::lock_guard<std::mutex> lock(queue_mutex_);
1601 Statistics stats;
1602 stats.total_threads = num_threads_;
1603 stats.active_threads = active_tasks_.load(std::memory_order_acquire);
1604 stats.pending_tasks = tasks_.size();
1605 stats.completed_tasks = completed_tasks_.load(std::memory_order_acquire);
1606
1607 if (elapsed.count() > 0)
1608 {
1609 stats.tasks_per_second = static_cast<double>(stats.completed_tasks) / elapsed.count();
1610 }
1611 else
1612 {
1613 stats.tasks_per_second = 0.0;
1614 }
1615
1616 auto const total_task_time = total_task_time_.load(std::memory_order_acquire);
1617 if (stats.completed_tasks > 0)
1618 {
1619 stats.avg_task_time = std::chrono::microseconds(total_task_time / stats.completed_tasks);
1620 }
1621 else
1622 {
1623 stats.avg_task_time = std::chrono::microseconds(0);
1624 }
1625
1626 return stats;
1627 }
1628
1630
1633
1639 {
1640 std::lock_guard<std::mutex> lock(trace_mutex_);
1641 on_task_start_ = std::move(cb);
1642 }
1643
1650 {
1651 std::lock_guard<std::mutex> lock(trace_mutex_);
1652 on_task_end_ = std::move(cb);
1653 }
1654
1656
1657 private:
1658 size_t num_threads_;
1659 bool register_workers_;
1660 std::vector<ThreadWrapper> workers_;
1661 std::queue<Task> tasks_;
1662
1663 mutable std::mutex queue_mutex_;
1664 std::condition_variable condition_;
1665 std::condition_variable task_finished_condition_;
1666 std::atomic<bool> stop_;
1667 std::atomic<size_t> active_tasks_{0};
1668 std::atomic<size_t> completed_tasks_{0};
1669 std::atomic<uint64_t> total_task_time_{0};
1670
1671 std::mutex trace_mutex_;
1672 TaskStartCallback on_task_start_;
1673 TaskEndCallback on_task_end_;
1674
1675 std::chrono::steady_clock::time_point start_time_;
1676
1677 void worker_function(size_t worker_id)
1678 {
1679 std::optional<AutoRegisterCurrentThread> reg_guard;
1680 if (register_workers_)
1681 reg_guard.emplace("pool_worker_" + std::to_string(worker_id), "threadschedule.pool");
1682
1683 while (true)
1684 {
1685 Task task;
1686 bool found_task = false;
1687
1688 {
1689 std::unique_lock<std::mutex> lock(queue_mutex_);
1690
1691 if (WaitPolicy::wait(condition_, lock, [this] { return stop_ || !tasks_.empty(); }))
1692 {
1693 if (stop_ && tasks_.empty())
1694 {
1695 return;
1696 }
1697
1698 if (!tasks_.empty())
1699 {
1700 task = std::move(tasks_.front());
1701 tasks_.pop();
1702 found_task = true;
1703 active_tasks_.fetch_add(1, std::memory_order_relaxed);
1704 }
1705 }
1706 else if (stop_)
1707 {
1708 return;
1709 }
1710 }
1711
1712 if (found_task)
1713 {
1714 auto const start_time = std::chrono::steady_clock::now();
1715 auto const tid = std::this_thread::get_id();
1716
1717 {
1718 std::lock_guard<std::mutex> tl(trace_mutex_);
1719 if (on_task_start_)
1720 on_task_start_(start_time, tid);
1721 }
1722
1723 try
1724 {
1725 task();
1726 }
1727 catch (...)
1728 {
1729 }
1730 auto const end_time = std::chrono::steady_clock::now();
1731
1732 auto const task_duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
1733 total_task_time_.fetch_add(task_duration.count(), std::memory_order_relaxed);
1734
1735 {
1736 std::lock_guard<std::mutex> tl(trace_mutex_);
1737 if (on_task_end_)
1738 on_task_end_(end_time, tid, task_duration);
1739 }
1740
1741 active_tasks_.fetch_sub(1, std::memory_order_relaxed);
1742 completed_tasks_.fetch_add(1, std::memory_order_relaxed);
1743
1744 task_finished_condition_.notify_all();
1745 }
1746 }
1747 }
1748};
1749
1761
1772
1773// ---------------------------------------------------------------------------
1774// LightweightPoolT
1775// ---------------------------------------------------------------------------
1776
1845template <size_t TaskSize = 64>
1847{
1848 public:
1854 explicit LightweightPoolT(size_t num_threads = std::thread::hardware_concurrency())
1855 : num_threads_(num_threads == 0 ? 1 : num_threads)
1856 {
1857 workers_.reserve(num_threads_);
1858 for (size_t i = 0; i < num_threads_; ++i)
1859 workers_.emplace_back(&LightweightPoolT::worker_loop, this);
1860 }
1861
1864
1869
1872
1884 template <typename F, typename... Args>
1885 void post(F&& f, Args&&... args)
1886 {
1887 auto r = try_post(std::forward<F>(f), std::forward<Args>(args)...);
1888 if (!r.has_value())
1889 throw std::runtime_error("LightweightPool is shutting down");
1890 }
1891
1898 template <typename F, typename... Args>
1899 auto try_post(F&& f, Args&&... args) -> expected<void, std::error_code>
1900 {
1901 detail::SboCallable<TaskSize> task(detail::bind_args(std::forward<F>(f), std::forward<Args>(args)...));
1902 {
1903 std::lock_guard<std::mutex> lock(mutex_);
1904 if (stop_)
1905 return unexpected(std::make_error_code(std::errc::operation_canceled));
1906 tasks_.push(std::move(task));
1907 }
1908 condition_.notify_one();
1909 return {};
1910 }
1911
1921 template <typename Iterator>
1922 void post_batch(Iterator begin, Iterator end)
1923 {
1924 auto r = try_post_batch(begin, end);
1925 if (!r.has_value())
1926 throw std::runtime_error("LightweightPool is shutting down");
1927 }
1928
1933 template <typename Iterator>
1934 auto try_post_batch(Iterator begin, Iterator end) -> expected<void, std::error_code>
1935 {
1936 {
1937 std::lock_guard<std::mutex> lock(mutex_);
1938 if (stop_)
1939 return unexpected(std::make_error_code(std::errc::operation_canceled));
1940 for (auto it = begin; it != end; ++it)
1941 tasks_.push(detail::SboCallable<TaskSize>(*it));
1942 }
1943 condition_.notify_all();
1944 return {};
1945 }
1946
1947#if __cpp_lib_ranges >= 201911L
1949 template <std::ranges::input_range R>
1950 void post_batch(R&& range)
1951 {
1952 post_batch(std::ranges::begin(range), std::ranges::end(range));
1953 }
1954
1955 template <std::ranges::input_range R>
1956 auto try_post_batch(R&& range)
1957 {
1958 return try_post_batch(std::ranges::begin(range), std::ranges::end(range));
1959 }
1961#endif
1962
1964
1967
1979 {
1980 {
1981 std::lock_guard<std::mutex> lock(mutex_);
1982 if (stop_)
1983 return;
1984 stop_ = true;
1985 if (policy == ShutdownPolicy::drop_pending)
1986 {
1987 std::queue<detail::SboCallable<TaskSize>> empty;
1988 tasks_.swap(empty);
1989 }
1990 }
1991 condition_.notify_all();
1992 for (auto& w : workers_)
1993 {
1994 if (w.joinable())
1995 w.join();
1996 }
1997 workers_.clear();
1998 }
1999
2009 auto shutdown_for(std::chrono::milliseconds timeout) -> bool
2010 {
2011 auto const deadline = std::chrono::steady_clock::now() + timeout;
2012 {
2013 std::lock_guard<std::mutex> lock(mutex_);
2014 if (stop_)
2015 return true;
2016 }
2017 std::unique_lock<std::mutex> lock(mutex_);
2018 bool const drained = drain_condition_.wait_until(
2019 lock, deadline, [this] { return tasks_.empty() && active_tasks_.load(std::memory_order_acquire) == 0; });
2020 lock.unlock();
2022 return drained;
2023 }
2024
2026
2029
2031 [[nodiscard]] auto size() const noexcept -> size_t
2032 {
2033 return num_threads_;
2034 }
2035
2037
2040
2046 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
2048 {
2049 return detail::configure_worker_threads(workers_, name_prefix, policy, priority);
2050 }
2051
2054 {
2055 return detail::set_worker_affinity(workers_, affinity);
2056 }
2057
2063
2065
2066 private:
2067 size_t num_threads_;
2068 std::vector<ThreadWrapper> workers_;
2069 std::queue<detail::SboCallable<TaskSize>> tasks_;
2070 std::mutex mutex_;
2071 std::condition_variable condition_;
2072 std::condition_variable drain_condition_;
2073 std::atomic<bool> stop_{false};
2074 std::atomic<size_t> active_tasks_{0};
2075
2076 void worker_loop()
2077 {
2078 while (true)
2079 {
2080 detail::SboCallable<TaskSize> task;
2081 {
2082 std::unique_lock<std::mutex> lock(mutex_);
2083 condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
2084 if (stop_ && tasks_.empty())
2085 return;
2086 if (!tasks_.empty())
2087 {
2088 task = std::move(tasks_.front());
2089 tasks_.pop();
2090 active_tasks_.fetch_add(1, std::memory_order_relaxed);
2091 }
2092 else
2093 continue;
2094 }
2095 try
2096 {
2097 task();
2098 }
2099 catch (...)
2100 {
2101 }
2102 active_tasks_.fetch_sub(1, std::memory_order_relaxed);
2103 drain_condition_.notify_all();
2104 }
2105 }
2106};
2107
2117
2118// ---------------------------------------------------------------------------
2119// GlobalPool
2120// ---------------------------------------------------------------------------
2121
2149template <typename PoolType>
2150class GlobalPool
2151{
2152 public:
2159 static void init(size_t num_threads)
2160 {
2161 std::call_once(init_flag_(), [num_threads] { thread_count_() = num_threads; });
2162 }
2163
2165 static auto instance() -> PoolType&
2166 {
2167 static PoolType pool(thread_count_());
2168 return pool;
2169 }
2170
2174
2175 template <typename F, typename... Args>
2176 static auto submit(F&& f, Args&&... args)
2177 {
2178 return instance().submit(std::forward<F>(f), std::forward<Args>(args)...);
2179 }
2180
2181 template <typename F, typename... Args>
2182 static auto try_submit(F&& f, Args&&... args)
2183 {
2184 return instance().try_submit(std::forward<F>(f), std::forward<Args>(args)...);
2185 }
2186
2187 template <typename F, typename... Args>
2188 static void post(F&& f, Args&&... args)
2189 {
2190 instance().post(std::forward<F>(f), std::forward<Args>(args)...);
2191 }
2192
2193 template <typename F, typename... Args>
2194 static auto try_post(F&& f, Args&&... args)
2195 {
2196 return instance().try_post(std::forward<F>(f), std::forward<Args>(args)...);
2197 }
2198
2199 template <typename Iterator>
2200 static auto submit_batch(Iterator begin, Iterator end)
2201 {
2202 return instance().submit_batch(begin, end);
2203 }
2204
2205 template <typename Iterator>
2206 static auto try_submit_batch(Iterator begin, Iterator end)
2207 {
2208 return instance().try_submit_batch(begin, end);
2209 }
2210
2211 template <typename Iterator, typename F>
2212 static void parallel_for_each(Iterator begin, Iterator end, F&& func)
2213 {
2214 instance().parallel_for_each(begin, end, std::forward<F>(func));
2215 }
2216
2217#if __cpp_lib_ranges >= 201911L
2218 template <std::ranges::input_range R>
2219 static auto submit_batch(R&& range)
2220 {
2221 return instance().submit_batch(std::forward<R>(range));
2222 }
2223
2224 template <std::ranges::input_range R>
2225 static auto try_submit_batch(R&& range)
2226 {
2227 return instance().try_submit_batch(std::forward<R>(range));
2228 }
2229
2230 template <std::ranges::input_range R, typename F>
2231 static void parallel_for_each(R&& range, F&& func)
2232 {
2233 instance().parallel_for_each(std::forward<R>(range), std::forward<F>(func));
2234 }
2235#endif
2236
2238
2239 private:
2240 GlobalPool() = default;
2241
2242 static auto init_flag_() -> std::once_flag&
2243 {
2244 static std::once_flag flag;
2245 return flag;
2246 }
2247
2248 static auto thread_count_() -> size_t&
2249 {
2250 static size_t count = std::thread::hardware_concurrency();
2251 return count;
2252 }
2253};
2254
2260
2266
2284template <typename Container, typename F>
2285void parallel_for_each(Container& container, F&& func)
2286{
2287 GlobalThreadPool::parallel_for_each(container.begin(), container.end(), std::forward<F>(func));
2288}
2289
2290} // namespace threadschedule
Singleton accessor for a process-wide pool instance.
static auto submit_batch(Iterator begin, Iterator end)
static auto try_submit(F &&f, Args &&... args)
static auto try_post(F &&f, Args &&... args)
static void parallel_for_each(Iterator begin, Iterator end, F &&func)
static auto submit(F &&f, Args &&... args)
static void post(F &&f, Args &&... args)
static auto instance() -> PoolType &
Access the singleton pool instance (created on first call).
static auto try_submit_batch(Iterator begin, Iterator end)
static void init(size_t num_threads)
Pre-configure the number of threads before first use.
void post(F &&f, Args &&... args)
Fire-and-forget task submission (throwing variant).
auto operator=(HighPerformancePool const &) -> HighPerformancePool &=delete
auto submit(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task, throwing on shutdown.
void set_on_task_end(TaskEndCallback cb)
Register a callback invoked just after each task completes.
auto size() const noexcept -> size_t
Number of worker threads in this pool.
void shutdown(ShutdownPolicy policy=ShutdownPolicy::drain)
Shut the pool down.
HighPerformancePool(size_t num_threads=std::thread::hardware_concurrency(), size_t deque_capacity=WorkStealingDeque< Task >::DEFAULT_CAPACITY, bool register_workers=false)
auto try_submit_batch(Iterator begin, Iterator end) -> expected< std::vector< std::future< void > >, std::error_code >
Submit a range of void() callables in one go (non-throwing).
auto try_submit(F &&f, Args &&... args) -> expected< std::future< std::invoke_result_t< F, Args... > >, std::error_code >
Submit a task without throwing on shutdown.
HighPerformancePool(HighPerformancePool const &)=delete
void set_on_task_start(TaskStartCallback cb)
Register a callback invoked just before each task executes.
auto try_post(F &&f, Args &&... args) -> expected< void, std::error_code >
Fire-and-forget task submission (non-throwing variant).
auto submit_batch(Iterator begin, Iterator end) -> std::vector< std::future< void > >
Submit a range of void() callables in one go (throwing).
void wait_for_tasks()
Block until all pending and active tasks have completed.
auto set_affinity(ThreadAffinity const &affinity) -> expected< void, std::error_code >
Pin all workers to the same CPU set.
auto distribute_across_cpus() -> expected< void, std::error_code >
Pin each worker to a distinct CPU core (round-robin).
auto pending_tasks() const -> size_t
Approximate count of tasks waiting in all queues.
void parallel_for_each(Iterator begin, Iterator end, F &&func)
Apply func to every element in [begin, end) in parallel.
auto shutdown_for(std::chrono::milliseconds timeout) -> bool
Attempt a timed drain: finish as many tasks as possible within timeout, then force-stop remaining wor...
auto get_statistics() const -> Statistics
Collect approximate performance counters.
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal()) -> expected< void, std::error_code >
Name, schedule and prioritize all worker threads.
Ultra-lightweight fire-and-forget thread pool.
auto try_post_batch(Iterator begin, Iterator end) -> expected< void, std::error_code >
Batch post (non-throwing).
auto operator=(LightweightPoolT const &) -> LightweightPoolT &=delete
void post_batch(Iterator begin, Iterator end)
Post a range of callables under a single lock acquisition.
auto shutdown_for(std::chrono::milliseconds timeout) -> bool
Attempt a timed drain.
void shutdown(ShutdownPolicy policy=ShutdownPolicy::drain)
Shut the pool down.
LightweightPoolT(size_t num_threads=std::thread::hardware_concurrency())
Construct a lightweight pool with num_threads workers.
auto try_post(F &&f, Args &&... args) -> expected< void, std::error_code >
Post a fire-and-forget task (non-throwing variant).
auto set_affinity(ThreadAffinity const &affinity) -> expected< void, std::error_code >
Pin all workers to the same CPU set.
LightweightPoolT(LightweightPoolT const &)=delete
void post(F &&f, Args &&... args)
Post a fire-and-forget task (throwing variant).
auto size() const noexcept -> size_t
Number of worker threads.
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal()) -> expected< void, std::error_code >
Name, schedule and prioritize all worker threads.
auto distribute_across_cpus() -> expected< void, std::error_code >
Pin each worker to a distinct CPU core (round-robin).
Manages a set of CPU indices to which a thread may be bound.
Single-queue thread pool parameterized by its idle-wait strategy.
void parallel_for_each(Iterator begin, Iterator end, F &&func)
Apply func to [begin, end) in parallel (chunked).
auto set_affinity(ThreadAffinity const &affinity) -> expected< void, std::error_code >
Pin all workers to the same CPU set.
void set_on_task_start(TaskStartCallback cb)
Register a callback invoked just before each task executes.
auto shutdown_for(std::chrono::milliseconds timeout) -> bool
Attempt a timed drain: finish as many tasks as possible within timeout, then force-stop remaining wor...
auto operator=(ThreadPoolBase const &) -> ThreadPoolBase &=delete
auto try_submit(F &&f, Args &&... args) -> expected< std::future< std::invoke_result_t< F, Args... > >, std::error_code >
Submit a task without throwing on shutdown.
void wait_for_tasks()
Block until all pending and active tasks have completed.
auto distribute_across_cpus() -> expected< void, std::error_code >
Pin each worker to a distinct CPU core (round-robin).
void shutdown(ShutdownPolicy policy=ShutdownPolicy::drain)
ThreadPoolBase(ThreadPoolBase const &)=delete
auto try_post(F &&f, Args &&... args) -> expected< void, std::error_code >
void post(F &&f, Args &&... args)
Fire-and-forget task submission (throwing variant).
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal()) -> expected< void, std::error_code >
Name, schedule and prioritize all worker threads.
std::function< void()> Task
auto submit_batch(Iterator begin, Iterator end) -> std::vector< std::future< void > >
Submit a batch of tasks (throwing).
void set_on_task_end(TaskEndCallback cb)
Register a callback invoked just after each task completes.
auto pending_tasks() const -> size_t
Number of tasks waiting in the queue.
auto size() const noexcept -> size_t
Number of worker threads.
auto submit(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task, throwing on shutdown.
ThreadPoolBase(size_t num_threads=std::thread::hardware_concurrency(), bool register_workers=false)
auto get_statistics() const -> Statistics
Collect approximate performance counters.
auto try_submit_batch(Iterator begin, Iterator end) -> expected< std::vector< std::future< void > >, std::error_code >
Submit a range of void() callables in one go (non-throwing).
Value-semantic wrapper for a thread scheduling priority.
static constexpr auto normal() noexcept -> ThreadPriority
static constexpr size_t DEFAULT_CAPACITY
auto push(T const &item) -> bool
static constexpr size_t CACHE_LINE_SIZE
WorkStealingDeque(size_t capacity=DEFAULT_CAPACITY)
Type-erased, move-only callable with configurable inline storage.
auto operator=(SboCallable const &) -> SboCallable &=delete
SboCallable(SboCallable &&other) noexcept
auto operator=(SboCallable &&other) noexcept -> SboCallable &
SboCallable(SboCallable const &)=delete
A result type that holds either a value of type T or an error of type E.
Definition expected.hpp:215
Exception thrown by expected::value() when the object is in the error state.
Definition expected.hpp:162
Polyfill for std::expected (C++23) for pre-C++23 compilers.
auto bind_args(F &&f, Args &&... args)
Bind a callable with its arguments into a nullary lambda.
auto distribute_workers_across_cpus(WorkerRange &workers) -> expected< void, std::error_code >
auto configure_worker_threads(WorkerRange &workers, std::string const &name_prefix, SchedulingPolicy policy, ThreadPriority priority) -> expected< void, std::error_code >
auto set_worker_affinity(WorkerRange &workers, ThreadAffinity const &affinity) -> expected< void, std::error_code >
void parallel_for_each_chunked(Pool &pool, Iterator begin, Iterator end, F &&func, size_t num_workers)
SchedulingPolicy
Enumeration of available thread scheduling policies.
@ OTHER
Standard round-robin time-sharing.
ShutdownPolicy
Controls how a pool handles pending tasks during shutdown.
@ drop_pending
Finish running tasks, discard queued ones.
@ drain
Finish all queued tasks before stopping (default).
ThreadPoolBase< IndefiniteWait > ThreadPool
General-purpose thread pool with indefinite blocking wait.
void parallel_for_each(Container &container, F &&func)
Convenience wrapper that applies a callable to every element of a container in parallel using the Glo...
GlobalPool< ThreadPool > GlobalThreadPool
Singleton accessor for the process-wide ThreadPool instance.
ThreadPoolBase< PollingWait<> > FastThreadPool
Thread pool with 10 ms polling wait for lower wake-up latency.
GlobalPool< HighPerformancePool > GlobalHighPerformancePool
Singleton accessor for the process-wide HighPerformancePool instance.
std::function< void(std::chrono::steady_clock::time_point, std::thread::id, std::chrono::microseconds elapsed)> TaskEndCallback
Callback invoked when a pool worker finishes executing a task.
std::function< void(std::chrono::steady_clock::time_point, std::thread::id)> TaskStartCallback
Work-stealing deque for per-thread task queues in a thread pool.
LightweightPoolT<> LightweightPool
Default lightweight pool with 64-byte task slots (56 bytes usable).
Scheduling policies, thread priority, and CPU affinity types.
Wait policy that blocks indefinitely until work is available.
static auto wait(std::condition_variable &cv, Lock &lock, Pred pred) -> bool
Wait policy that polls with a configurable timeout.
static auto wait(std::condition_variable &cv, Lock &lock, Pred pred) -> bool
Process-wide thread registry, control blocks, and composite registry.
Enhanced thread wrappers: ThreadWrapper, JThreadWrapper, and non-owning views.