ThreadSchedule 1.0.0
Modern C++ thread management library
Loading...
Searching...
No Matches
thread_pool.hpp
1#pragma once
2
3#include "expected.hpp"
4#include "scheduler_policy.hpp"
5#include "thread_wrapper.hpp"
6#include <algorithm>
7#include <array>
8#include <atomic>
9#include <condition_variable>
10#include <future>
11#include <mutex>
12#include <queue>
13#include <random>
14#include <vector>
15
16namespace threadschedule
17{
18
49template <typename T>
50class WorkStealingDeque
51{
52 public:
53 static constexpr size_t CACHE_LINE_SIZE = 64;
54 static constexpr size_t DEFAULT_CAPACITY = 1024;
55
56 private:
57 struct alignas(CACHE_LINE_SIZE) AlignedItem
58 {
59 T item;
60 AlignedItem() = default;
61 AlignedItem(T&& t) : item(std::move(t))
62 {
63 }
64 AlignedItem(T const& t) : item(t)
65 {
66 }
67 };
68
69 std::unique_ptr<AlignedItem[]> buffer_;
70 size_t capacity_;
71
72 alignas(CACHE_LINE_SIZE) std::atomic<size_t> top_{0}; // Owner pushes/pops here
73 alignas(CACHE_LINE_SIZE) std::atomic<size_t> bottom_{0}; // Thieves steal here
74 alignas(CACHE_LINE_SIZE) mutable std::mutex mutex_; // For synchronization
75
76 public:
77 explicit WorkStealingDeque(size_t capacity = DEFAULT_CAPACITY)
78 : buffer_(std::make_unique<AlignedItem[]>(capacity)), capacity_(capacity)
79 {
80 }
81
82 // Thread-safe operations
83 [[nodiscard]] auto push(T&& item) -> bool
84 {
85 std::lock_guard<std::mutex> lock(mutex_);
86 size_t const t = top_.load(std::memory_order_relaxed);
87 size_t const b = bottom_.load(std::memory_order_relaxed);
88
89 if (t - b >= capacity_)
90 {
91 return false; // Queue full
92 }
93
94 buffer_[t % capacity_] = AlignedItem(std::move(item));
95 top_.store(t + 1, std::memory_order_release);
96 return true;
97 }
98
99 [[nodiscard]] auto push(T const& item) -> bool
100 {
101 std::lock_guard<std::mutex> lock(mutex_);
102 size_t const t = top_.load(std::memory_order_relaxed);
103 size_t const b = bottom_.load(std::memory_order_relaxed);
104
105 if (t - b >= capacity_)
106 {
107 return false; // Queue full
108 }
109
110 buffer_[t % capacity_] = AlignedItem(item);
111 top_.store(t + 1, std::memory_order_release);
112 return true;
113 }
114
115 [[nodiscard]] auto pop(T& item) -> bool
116 {
117 std::lock_guard<std::mutex> lock(mutex_);
118 size_t const t = top_.load(std::memory_order_relaxed);
119 size_t const b = bottom_.load(std::memory_order_relaxed);
120
121 if (t <= b)
122 {
123 return false; // Empty
124 }
125
126 size_t const new_top = t - 1;
127 item = std::move(buffer_[new_top % capacity_].item);
128 top_.store(new_top, std::memory_order_relaxed);
129 return true;
130 }
131
132 // Thief operations (other threads stealing work)
133 [[nodiscard]] auto steal(T& item) -> bool
134 {
135 std::lock_guard<std::mutex> lock(mutex_);
136 size_t const b = bottom_.load(std::memory_order_relaxed);
137 size_t const t = top_.load(std::memory_order_relaxed);
138
139 if (b >= t)
140 {
141 return false; // Empty
142 }
143
144 item = std::move(buffer_[b % capacity_].item);
145 bottom_.store(b + 1, std::memory_order_relaxed);
146 return true;
147 }
148
149 [[nodiscard]] auto size() const -> size_t
150 {
151 size_t const t = top_.load(std::memory_order_relaxed);
152 size_t const b = bottom_.load(std::memory_order_relaxed);
153 return t > b ? t - b : 0;
154 }
155
156 [[nodiscard]] auto empty() const -> bool
157 {
158 return size() == 0;
159 }
160};
161
242class HighPerformancePool
243{
244 public:
245 using Task = std::function<void()>;
246
248 {
249 size_t total_threads;
250 size_t active_threads;
251 size_t pending_tasks;
252 size_t completed_tasks;
253 size_t stolen_tasks;
254 double tasks_per_second;
255 std::chrono::microseconds avg_task_time;
256 };
257
258 explicit HighPerformancePool(size_t num_threads = std::thread::hardware_concurrency())
259 : num_threads_(num_threads == 0 ? 1 : num_threads), stop_(false), next_victim_(0),
260 start_time_(std::chrono::steady_clock::now())
261 {
262 // Initialize per-thread work queues
263 worker_queues_.resize(num_threads_);
264 for (size_t i = 0; i < num_threads_; ++i)
265 {
266 worker_queues_[i] = std::make_unique<WorkStealingDeque<Task>>();
267 }
268
269 workers_.reserve(num_threads_);
270
271 // Create worker threads with thread-local storage
272 for (size_t i = 0; i < num_threads_; ++i)
273 {
274 workers_.emplace_back(&HighPerformancePool::worker_function, this, i);
275 }
276 }
277
278 HighPerformancePool(HighPerformancePool const&) = delete;
279 auto operator=(HighPerformancePool const&) -> HighPerformancePool& = delete;
280
281 ~HighPerformancePool()
282 {
283 shutdown();
284 }
285
289 template <typename F, typename... Args>
290 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
291 {
292 using return_type = std::invoke_result_t<F, Args...>;
293
294 auto task = std::make_shared<std::packaged_task<return_type()>>(
295 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
296
297 std::future<return_type> result = task->get_future();
298
299 if (stop_.load(std::memory_order_acquire))
300 {
301 throw std::runtime_error("ThreadPool is shutting down");
302 }
303
304 // Try to submit to least loaded queue (round-robin with fallback)
305 size_t const preferred_queue = next_victim_.fetch_add(1, std::memory_order_relaxed) % num_threads_;
306
307 // First try the preferred queue
308 if (worker_queues_[preferred_queue]->push([task]() { (*task)(); }))
309 {
310 wakeup_condition_.notify_one();
311 return result;
312 }
313
314 // If preferred queue is full, try a few random ones
315 for (size_t attempts = 0; attempts < (std::min)(num_threads_, size_t(3)); ++attempts)
316 {
317 size_t const idx = (preferred_queue + attempts + 1) % num_threads_;
318 if (worker_queues_[idx]->push([task]() { (*task)(); }))
319 {
320 wakeup_condition_.notify_one();
321 return result;
322 }
323 }
324
325 // All local queues full, use overflow queue
326 {
327 std::lock_guard<std::mutex> lock(overflow_mutex_);
328 if (stop_.load(std::memory_order_relaxed))
329 {
330 throw std::runtime_error("ThreadPool is shutting down");
331 }
332 overflow_tasks_.emplace([task]() { (*task)(); });
333 }
334
335 wakeup_condition_.notify_all();
336 return result;
337 }
338
342 template <typename Iterator>
343 auto submit_batch(Iterator begin, Iterator end) -> std::vector<std::future<void>>
344 {
345 std::vector<std::future<void>> futures;
346 size_t const batch_size = std::distance(begin, end);
347 futures.reserve(batch_size);
348
349 if (stop_.load(std::memory_order_acquire))
350 {
351 throw std::runtime_error("ThreadPool is shutting down");
352 }
353
354 // Distribute batch across worker queues
355 size_t queue_idx = next_victim_.fetch_add(batch_size, std::memory_order_relaxed) % num_threads_;
356
357 for (auto it = begin; it != end; ++it)
358 {
359 auto task = std::make_shared<std::packaged_task<void()>>(*it);
360 futures.push_back(task->get_future());
361
362 // Try to place in worker queue, round-robin style
363 bool queued = false;
364 for (size_t attempts = 0; attempts < num_threads_; ++attempts)
365 {
366 if (worker_queues_[queue_idx]->push([task]() { (*task)(); }))
367 {
368 queued = true;
369 break;
370 }
371 queue_idx = (queue_idx + 1) % num_threads_;
372 }
373
374 if (!queued)
375 {
376 // Overflow to global queue
377 std::lock_guard<std::mutex> lock(overflow_mutex_);
378 overflow_tasks_.emplace([task]() { (*task)(); });
379 }
380 }
381
382 // Wake up workers for the batch
383 wakeup_condition_.notify_all();
384 return futures;
385 }
386
390 template <typename Iterator, typename F>
391 void parallel_for_each(Iterator begin, Iterator end, F&& func)
392 {
393 size_t const total_items = std::distance(begin, end);
394 if (total_items == 0)
395 return;
396
397 // Calculate optimal chunk size for cache efficiency
398 size_t const chunk_size = (std::max)(size_t(1), total_items / (num_threads_ * 4));
399 std::vector<std::future<void>> futures;
400
401 for (auto it = begin; it < end;)
402 {
403 auto chunk_end = (std::min)(it + chunk_size, end);
404
405 futures.push_back(submit([func, it, chunk_end]() {
406 for (auto chunk_it = it; chunk_it != chunk_end; ++chunk_it)
407 {
408 func(*chunk_it);
409 }
410 }));
411
412 it = chunk_end;
413 }
414
415 // Wait for all chunks to complete
416 for (auto& future : futures)
417 {
418 future.wait();
419 }
420 }
421
422 [[nodiscard]] auto size() const noexcept -> size_t
423 {
424 return num_threads_;
425 }
426
427 [[nodiscard]] auto pending_tasks() const -> size_t
428 {
429 size_t total = 0;
430 for (auto const& queue : worker_queues_)
431 {
432 total += queue->size();
433 }
434
435 std::lock_guard<std::mutex> lock(overflow_mutex_);
436 total += overflow_tasks_.size();
437 return total;
438 }
439
443 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
444 ThreadPriority priority = ThreadPriority::normal()) -> expected<void, std::error_code>
445 {
446 bool success = true;
447
448 for (size_t i = 0; i < workers_.size(); ++i)
449 {
450 std::string const thread_name = name_prefix + "_" + std::to_string(i);
451
452 if (!workers_[i].set_name(thread_name).has_value())
453 {
454 success = false;
455 }
456
457 if (!workers_[i].set_scheduling_policy(policy, priority).has_value())
458 {
459 success = false;
460 }
461 }
462 if (success)
463 return {};
464 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
465 }
466
467 auto set_affinity(ThreadAffinity const& affinity) -> expected<void, std::error_code>
468 {
469 bool success = true;
470
471 for (auto& worker : workers_)
472 {
473 if (!worker.set_affinity(affinity).has_value())
474 {
475 success = false;
476 }
477 }
478 if (success)
479 return {};
480 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
481 }
482
483 auto distribute_across_cpus() -> expected<void, std::error_code>
484 {
485 auto const cpu_count = std::thread::hardware_concurrency();
486 if (cpu_count == 0)
487 return unexpected(std::make_error_code(std::errc::invalid_argument));
488
489 bool success = true;
490
491 for (size_t i = 0; i < workers_.size(); ++i)
492 {
493 ThreadAffinity affinity({static_cast<int>(i % cpu_count)});
494 if (!workers_[i].set_affinity(affinity).has_value())
495 {
496 success = false;
497 }
498 }
499 if (success)
500 return {};
501 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
502 }
503
504 void wait_for_tasks()
505 {
506 std::unique_lock<std::mutex> lock(completion_mutex_);
507 completion_condition_.wait(
508 lock, [this] { return pending_tasks() == 0 && active_tasks_.load(std::memory_order_acquire) == 0; });
509 }
510
511 void shutdown()
512 {
513 {
514 std::lock_guard<std::mutex> lock(overflow_mutex_);
515 if (stop_.exchange(true, std::memory_order_acq_rel))
516 {
517 return; // Already shutting down
518 }
519 }
520
521 wakeup_condition_.notify_all();
522
523 for (auto& worker : workers_)
524 {
525 if (worker.joinable())
526 {
527 worker.join();
528 }
529 }
530
531 workers_.clear();
532 }
533
538 {
539 auto const now = std::chrono::steady_clock::now();
540 auto const elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - start_time_);
541
542 Statistics stats;
543 stats.total_threads = num_threads_;
544 stats.active_threads = active_tasks_.load(std::memory_order_acquire);
545 stats.pending_tasks = pending_tasks();
546 stats.completed_tasks = completed_tasks_.load(std::memory_order_acquire);
547 stats.stolen_tasks = stolen_tasks_.load(std::memory_order_acquire);
548
549 if (elapsed.count() > 0)
550 {
551 stats.tasks_per_second = static_cast<double>(stats.completed_tasks) / elapsed.count();
552 }
553 else
554 {
555 stats.tasks_per_second = 0.0;
556 }
557
558 auto const total_task_time = total_task_time_.load(std::memory_order_acquire);
559 if (stats.completed_tasks > 0)
560 {
561 stats.avg_task_time = std::chrono::microseconds(total_task_time / stats.completed_tasks);
562 }
563 else
564 {
565 stats.avg_task_time = std::chrono::microseconds(0);
566 }
567
568 return stats;
569 }
570
571 private:
572 size_t num_threads_;
573 std::vector<ThreadWrapper> workers_;
574 std::vector<std::unique_ptr<WorkStealingDeque<Task>>> worker_queues_;
575
576 // Overflow queue for when worker queues are full
577 std::queue<Task> overflow_tasks_;
578 mutable std::mutex overflow_mutex_;
579
580 // Synchronization
581 std::atomic<bool> stop_;
582 std::condition_variable wakeup_condition_;
583 std::mutex wakeup_mutex_;
584
585 std::condition_variable completion_condition_;
586 std::mutex completion_mutex_;
587
588 // Load balancing and statistics
589 std::atomic<size_t> next_victim_;
590 std::atomic<size_t> active_tasks_{0};
591 std::atomic<size_t> completed_tasks_{0};
592 std::atomic<size_t> stolen_tasks_{0};
593 std::atomic<uint64_t> total_task_time_{0}; // microseconds
594
595 std::chrono::steady_clock::time_point start_time_;
596
597 // NOLINTNEXTLINE(readability-function-cognitive-complexity)
598 void worker_function(size_t worker_id)
599 {
600 // Thread-local random number generator for work stealing
601 thread_local std::mt19937 gen = []() {
602 std::random_device device;
603 return std::mt19937(device());
604 }();
605
606 Task task;
607 std::uniform_int_distribution<size_t> dist(0, num_threads_ - 1);
608
609 while (true)
610 {
611 bool found_task = false;
612
613 // 1. Try to get task from own queue (fast path)
614 if (worker_queues_[worker_id]->pop(task))
615 {
616 found_task = true;
617 }
618 // 2. Try to steal from other workers (limit attempts to reduce contention)
619 else
620 {
621 size_t const max_steal_attempts = (std::min)(num_threads_, size_t(4));
622 for (size_t attempts = 0; attempts < max_steal_attempts; ++attempts)
623 {
624 size_t const victim_id = dist(gen);
625 if (victim_id != worker_id && worker_queues_[victim_id]->steal(task))
626 {
627 found_task = true;
628 stolen_tasks_.fetch_add(1, std::memory_order_relaxed);
629 break;
630 }
631 }
632 }
633
634 // 3. Try overflow queue
635 if (!found_task)
636 {
637 std::lock_guard<std::mutex> lock(overflow_mutex_);
638 if (!overflow_tasks_.empty())
639 {
640 task = std::move(overflow_tasks_.front());
641 overflow_tasks_.pop();
642 found_task = true;
643 }
644 }
645
646 if (found_task)
647 {
648 // Execute task with timing
649 active_tasks_.fetch_add(1, std::memory_order_relaxed);
650
651 auto const start_time = std::chrono::steady_clock::now();
652 try
653 {
654 task();
655 }
656 catch (...)
657 {
658 // Log exception or handle as needed
659 }
660 auto const end_time = std::chrono::steady_clock::now();
661
662 auto const task_duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
663 total_task_time_.fetch_add(task_duration.count(), std::memory_order_relaxed);
664
665 active_tasks_.fetch_sub(1, std::memory_order_relaxed);
666 completed_tasks_.fetch_add(1, std::memory_order_relaxed);
667
668 completion_condition_.notify_all();
669 }
670 else
671 {
672 // No work found, check if we should stop
673 if (stop_.load(std::memory_order_acquire))
674 {
675 break;
676 }
677
678 // Wait for work with adaptive timeout
679 std::unique_lock<std::mutex> lock(wakeup_mutex_);
680 wakeup_condition_.wait_for(lock, std::chrono::microseconds(100));
681 }
682 }
683 }
684};
685
747class FastThreadPool
748{
749 public:
750 using Task = std::function<void()>;
751
753 {
754 size_t total_threads;
755 size_t active_threads;
756 size_t pending_tasks;
757 size_t completed_tasks;
758 double tasks_per_second;
759 std::chrono::microseconds avg_task_time;
760 };
761
762 explicit FastThreadPool(size_t num_threads = std::thread::hardware_concurrency())
763 : num_threads_(num_threads == 0 ? 1 : num_threads), stop_(false), start_time_(std::chrono::steady_clock::now())
764 {
765 workers_.reserve(num_threads_);
766
767 // Create worker threads
768 for (size_t i = 0; i < num_threads_; ++i)
769 {
770 workers_.emplace_back(&FastThreadPool::worker_function, this, i);
771 }
772 }
773
774 FastThreadPool(FastThreadPool const&) = delete;
775 auto operator=(FastThreadPool const&) -> FastThreadPool& = delete;
776
777 ~FastThreadPool()
778 {
779 shutdown();
780 }
781
785 template <typename F, typename... Args>
786 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
787 {
788 using return_type = std::invoke_result_t<F, Args...>;
789
790 auto task = std::make_shared<std::packaged_task<return_type()>>(
791 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
792
793 std::future<return_type> result = task->get_future();
794
795 {
796 std::lock_guard<std::mutex> lock(queue_mutex_);
797 if (stop_)
798 {
799 throw std::runtime_error("FastThreadPool is shutting down");
800 }
801 tasks_.emplace([task]() { (*task)(); });
802 }
803
804 condition_.notify_one();
805 return result;
806 }
807
811 template <typename Iterator>
812 auto submit_batch(Iterator begin, Iterator end) -> std::vector<std::future<void>>
813 {
814 std::vector<std::future<void>> futures;
815 size_t const batch_size = std::distance(begin, end);
816 futures.reserve(batch_size);
817
818 {
819 std::lock_guard<std::mutex> lock(queue_mutex_);
820 if (stop_)
821 {
822 throw std::runtime_error("FastThreadPool is shutting down");
823 }
824
825 for (auto it = begin; it != end; ++it)
826 {
827 auto task = std::make_shared<std::packaged_task<void()>>(*it);
828 futures.push_back(task->get_future());
829 tasks_.emplace([task]() { (*task)(); });
830 }
831 }
832
833 // Wake up all workers for batch processing
834 condition_.notify_all();
835 return futures;
836 }
837
838 void shutdown()
839 {
840 {
841 std::lock_guard<std::mutex> lock(queue_mutex_);
842 if (stop_)
843 return;
844 stop_ = true;
845 }
846
847 condition_.notify_all();
848
849 for (auto& worker : workers_)
850 {
851 if (worker.joinable())
852 {
853 worker.join();
854 }
855 }
856
857 workers_.clear();
858 }
859
860 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
861 ThreadPriority priority = ThreadPriority::normal()) -> bool
862 {
863 bool success = true;
864
865 for (size_t i = 0; i < workers_.size(); ++i)
866 {
867 std::string const thread_name = name_prefix + "_" + std::to_string(i);
868
869 if (!workers_[i].set_name(thread_name))
870 {
871 success = false;
872 }
873
874 if (!workers_[i].set_scheduling_policy(policy, priority))
875 {
876 success = false;
877 }
878 }
879
880 return success;
881 }
882
883 auto set_affinity(ThreadAffinity const& affinity) -> bool
884 {
885 bool success = true;
886
887 for (auto& worker : workers_)
888 {
889 if (!worker.set_affinity(affinity))
890 {
891 success = false;
892 }
893 }
894
895 return success;
896 }
897
898 auto distribute_across_cpus() -> bool
899 {
900 auto const cpu_count = std::thread::hardware_concurrency();
901 if (cpu_count == 0)
902 return false;
903
904 bool success = true;
905
906 for (size_t i = 0; i < workers_.size(); ++i)
907 {
908 ThreadAffinity affinity({static_cast<int>(i % cpu_count)});
909 if (!workers_[i].set_affinity(affinity))
910 {
911 success = false;
912 }
913 }
914
915 return success;
916 }
917
918 [[nodiscard]] auto size() const noexcept -> size_t
919 {
920 return num_threads_;
921 }
922
923 [[nodiscard]] auto pending_tasks() const -> size_t
924 {
925 std::lock_guard<std::mutex> lock(queue_mutex_);
926 return tasks_.size();
927 }
928
929 void wait_for_tasks()
930 {
931 std::unique_lock<std::mutex> lock(queue_mutex_);
932 task_finished_condition_.wait(
933 lock, [this] { return tasks_.empty() && active_tasks_.load(std::memory_order_acquire) == 0; });
934 }
935
936 [[nodiscard]] auto get_statistics() const -> Statistics
937 {
938 auto const now = std::chrono::steady_clock::now();
939 auto const elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - start_time_);
940
941 std::lock_guard<std::mutex> lock(queue_mutex_);
942 Statistics stats;
943 stats.total_threads = num_threads_;
944 stats.active_threads = active_tasks_.load(std::memory_order_acquire);
945 stats.pending_tasks = tasks_.size();
946 stats.completed_tasks = completed_tasks_.load(std::memory_order_acquire);
947
948 if (elapsed.count() > 0)
949 {
950 stats.tasks_per_second = static_cast<double>(stats.completed_tasks) / elapsed.count();
951 }
952 else
953 {
954 stats.tasks_per_second = 0.0;
955 }
956
957 auto const total_task_time = total_task_time_.load(std::memory_order_acquire);
958 if (stats.completed_tasks > 0)
959 {
960 stats.avg_task_time = std::chrono::microseconds(total_task_time / stats.completed_tasks);
961 }
962 else
963 {
964 stats.avg_task_time = std::chrono::microseconds(0);
965 }
966
967 return stats;
968 }
969
970 private:
971 size_t num_threads_;
972 std::vector<ThreadWrapper> workers_;
973 std::queue<Task> tasks_;
974
975 mutable std::mutex queue_mutex_;
976 std::condition_variable condition_;
977 std::condition_variable task_finished_condition_;
978 std::atomic<bool> stop_;
979 std::atomic<size_t> active_tasks_{0};
980 std::atomic<size_t> completed_tasks_{0};
981 std::atomic<uint64_t> total_task_time_{0}; // microseconds
982
983 std::chrono::steady_clock::time_point start_time_;
984
985 void worker_function(size_t /* worker_id */)
986 {
987 while (true)
988 {
989 Task task;
990 bool found_task = false;
991
992 {
993 std::unique_lock<std::mutex> lock(queue_mutex_);
994
995 if (condition_.wait_for(lock, std::chrono::milliseconds(10),
996 [this] { return stop_ || !tasks_.empty(); }))
997 {
998 if (stop_ && tasks_.empty())
999 {
1000 return;
1001 }
1002
1003 if (!tasks_.empty())
1004 {
1005 task = std::move(tasks_.front());
1006 tasks_.pop();
1007 found_task = true;
1008 active_tasks_.fetch_add(1, std::memory_order_relaxed);
1009 }
1010 }
1011 else if (stop_)
1012 {
1013 return;
1014 }
1015 }
1016
1017 if (found_task)
1018 {
1019 auto const start_time = std::chrono::steady_clock::now();
1020 try
1021 {
1022 task();
1023 }
1024 catch (...)
1025 {
1026 }
1027 auto const end_time = std::chrono::steady_clock::now();
1028
1029 auto const task_duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
1030 total_task_time_.fetch_add(task_duration.count(), std::memory_order_relaxed);
1031
1032 active_tasks_.fetch_sub(1, std::memory_order_relaxed);
1033 completed_tasks_.fetch_add(1, std::memory_order_relaxed);
1034
1035 task_finished_condition_.notify_all();
1036 }
1037 }
1038 }
1039};
1040
1105class ThreadPool
1106{
1107 public:
1108 using Task = std::function<void()>;
1109
1111 {
1112 size_t total_threads;
1113 size_t active_threads;
1114 size_t pending_tasks;
1115 size_t completed_tasks;
1116 };
1117
1118 explicit ThreadPool(size_t num_threads = std::thread::hardware_concurrency())
1119 : num_threads_(num_threads == 0 ? 1 : num_threads), stop_(false)
1120 {
1121 workers_.reserve(num_threads_);
1122
1123 // Create worker threads
1124 for (size_t i = 0; i < num_threads_; ++i)
1125 {
1126 workers_.emplace_back(&ThreadPool::worker_function, this);
1127 }
1128 }
1129
1130 ThreadPool(ThreadPool const&) = delete;
1131 auto operator=(ThreadPool const&) -> ThreadPool& = delete;
1132
1133 ~ThreadPool()
1134 {
1135 shutdown();
1136 }
1137
1141 template <typename F, typename... Args>
1142 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
1143 {
1144 using return_type = std::invoke_result_t<F, Args...>;
1145
1146 auto task = std::make_shared<std::packaged_task<return_type()>>(
1147 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
1148
1149 std::future<return_type> result = task->get_future();
1150
1151 {
1152 std::lock_guard<std::mutex> lock(queue_mutex_);
1153
1154 if (stop_)
1155 {
1156 throw std::runtime_error("ThreadPool is shutting down");
1157 }
1158
1159 tasks_.emplace([task]() { (*task)(); });
1160 }
1161
1162 condition_.notify_one();
1163 return result;
1164 }
1165
1169 template <typename Iterator>
1170 auto submit_range(Iterator begin, Iterator end) -> std::vector<std::future<void>>
1171 {
1172 std::vector<std::future<void>> futures;
1173 futures.reserve(std::distance(begin, end));
1174
1175 for (auto it = begin; it != end; ++it)
1176 {
1177 futures.push_back(submit(*it));
1178 }
1179
1180 return futures;
1181 }
1182
1186 template <typename Iterator, typename F>
1187 void parallel_for_each(Iterator begin, Iterator end, F&& func)
1188 {
1189 std::vector<std::future<void>> futures;
1190 futures.reserve(std::distance(begin, end));
1191
1192 for (auto it = begin; it != end; ++it)
1193 {
1194 futures.push_back(submit([func, it]() { func(*it); }));
1195 }
1196
1197 // Wait for all tasks to complete
1198 for (auto& future : futures)
1199 {
1200 future.wait();
1201 }
1202 }
1203
1204 [[nodiscard]] auto size() const noexcept -> size_t
1205 {
1206 return num_threads_;
1207 }
1208
1209 [[nodiscard]] auto pending_tasks() const -> size_t
1210 {
1211 std::lock_guard<std::mutex> lock(queue_mutex_);
1212 return tasks_.size();
1213 }
1214
1218 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
1219 ThreadPriority priority = ThreadPriority::normal()) -> bool
1220 {
1221 bool success = true;
1222
1223 for (size_t i = 0; i < workers_.size(); ++i)
1224 {
1225 std::string const thread_name = name_prefix + "_" + std::to_string(i);
1226
1227 if (!workers_[i].set_name(thread_name))
1228 {
1229 success = false;
1230 }
1231
1232 if (!workers_[i].set_scheduling_policy(policy, priority))
1233 {
1234 success = false;
1235 }
1236 }
1237
1238 return success;
1239 }
1240
1241 auto set_affinity(ThreadAffinity const& affinity) -> bool
1242 {
1243 bool success = true;
1244
1245 for (auto& worker : workers_)
1246 {
1247 if (!worker.set_affinity(affinity))
1248 {
1249 success = false;
1250 }
1251 }
1252
1253 return success;
1254 }
1255
1256 auto distribute_across_cpus() -> bool
1257 {
1258 auto const cpu_count = std::thread::hardware_concurrency();
1259 if (cpu_count == 0)
1260 return false;
1261
1262 bool success = true;
1263
1264 for (size_t i = 0; i < workers_.size(); ++i)
1265 {
1266 ThreadAffinity affinity({static_cast<int>(i % cpu_count)});
1267 if (!workers_[i].set_affinity(affinity))
1268 {
1269 success = false;
1270 }
1271 }
1272
1273 return success;
1274 }
1275
1276 void wait_for_tasks()
1277 {
1278 std::unique_lock<std::mutex> lock(queue_mutex_);
1279 task_finished_condition_.wait(lock, [this] { return tasks_.empty() && active_tasks_ == 0; });
1280 }
1281
1282 void shutdown()
1283 {
1284 {
1285 std::lock_guard<std::mutex> lock(queue_mutex_);
1286 if (stop_)
1287 return;
1288 stop_ = true;
1289 }
1290
1291 condition_.notify_all();
1292
1293 for (auto& worker : workers_)
1294 {
1295 if (worker.joinable())
1296 {
1297 worker.join();
1298 }
1299 }
1300
1301 workers_.clear();
1302 }
1303
1304 [[nodiscard]] auto get_statistics() const -> Statistics
1305 {
1306 std::lock_guard<std::mutex> lock(queue_mutex_);
1307 Statistics stats;
1308 stats.total_threads = num_threads_;
1309 stats.active_threads = active_tasks_;
1310 stats.pending_tasks = tasks_.size();
1311 stats.completed_tasks = completed_tasks_;
1312 return stats;
1313 }
1314
1315 private:
1316 size_t num_threads_;
1317 std::vector<ThreadWrapper> workers_;
1318 std::queue<Task> tasks_;
1319
1320 mutable std::mutex queue_mutex_;
1321 std::condition_variable condition_;
1322 std::condition_variable task_finished_condition_;
1323 std::atomic<bool> stop_;
1324 std::atomic<size_t> active_tasks_{0};
1325 std::atomic<size_t> completed_tasks_{0};
1326
1327 void worker_function()
1328 {
1329 while (true)
1330 {
1331 Task task;
1332
1333 {
1334 std::unique_lock<std::mutex> lock(queue_mutex_);
1335
1336 condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
1337
1338 if (stop_ && tasks_.empty())
1339 {
1340 return;
1341 }
1342
1343 task = std::move(tasks_.front());
1344 tasks_.pop();
1345 ++active_tasks_;
1346 }
1347
1348 try
1349 {
1350 task();
1351 }
1352 catch (...)
1353 {
1354 // Log exception or handle as needed
1355 }
1356
1357 {
1358 std::lock_guard<std::mutex> lock(queue_mutex_);
1359 --active_tasks_;
1360 ++completed_tasks_;
1361 }
1362
1363 task_finished_condition_.notify_all();
1364 }
1365 }
1366};
1367
1396class GlobalThreadPool
1397{
1398 public:
1399 static auto instance() -> ThreadPool&
1400 {
1401 static ThreadPool pool(std::thread::hardware_concurrency());
1402 return pool;
1403 }
1404
1405 template <typename F, typename... Args>
1406 static auto submit(F&& f, Args&&... args)
1407 {
1408 return instance().submit(std::forward<F>(f), std::forward<Args>(args)...);
1409 }
1410
1411 template <typename Iterator>
1412 static auto submit_range(Iterator begin, Iterator end)
1413 {
1414 return instance().submit_range(begin, end);
1415 }
1416
1417 template <typename Iterator, typename F>
1418 static void parallel_for_each(Iterator begin, Iterator end, F&& func)
1419 {
1420 instance().parallel_for_each(begin, end, std::forward<F>(func));
1421 }
1422
1423 private:
1424 GlobalThreadPool() = default;
1425};
1426
1456class GlobalHighPerformancePool
1457{
1458 public:
1459 static auto instance() -> HighPerformancePool&
1460 {
1461 static HighPerformancePool pool(std::thread::hardware_concurrency());
1462 return pool;
1463 }
1464
1465 template <typename F, typename... Args>
1466 static auto submit(F&& f, Args&&... args)
1467 {
1468 return instance().submit(std::forward<F>(f), std::forward<Args>(args)...);
1469 }
1470
1471 template <typename Iterator>
1472 static auto submit_batch(Iterator begin, Iterator end)
1473 {
1474 return instance().submit_batch(begin, end);
1475 }
1476
1477 template <typename Iterator, typename F>
1478 static void parallel_for_each(Iterator begin, Iterator end, F&& func)
1479 {
1480 instance().parallel_for_each(begin, end, std::forward<F>(func));
1481 }
1482
1483 private:
1484 GlobalHighPerformancePool() = default;
1485};
1486
1515template <typename Container, typename F>
1516void parallel_for_each(Container& container, F&& func)
1517{
1518 GlobalThreadPool::parallel_for_each(container.begin(), container.end(), std::forward<F>(func));
1519}
1520
1521} // namespace threadschedule
auto submit_batch(Iterator begin, Iterator end) -> std::vector< std::future< void > >
Efficient batch processing.
auto submit(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Optimized task submission with minimal locking.
High-performance thread pool optimized for high-frequency task submission.
auto submit(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
High-performance task submission (optimized hot path)
auto submit_batch(Iterator begin, Iterator end) -> std::vector< std::future< void > >
Batch task submission for maximum throughput.
void parallel_for_each(Iterator begin, Iterator end, F &&func)
Optimized parallel for_each with work distribution.
auto get_statistics() const -> Statistics
Get detailed performance statistics.
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal()) -> expected< void, std::error_code >
Configure all worker threads.
Manages a set of CPU indices to which a thread may be bound.
Simple, general-purpose thread pool.
auto submit_range(Iterator begin, Iterator end) -> std::vector< std::future< void > >
Submit multiple tasks.
void parallel_for_each(Iterator begin, Iterator end, F &&func)
Apply a function to a range of values in parallel.
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal()) -> bool
Configure thread properties.
auto submit(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task to the thread pool.
Value-semantic wrapper for a thread scheduling priority.
A result type that holds either a value of type T or an error of type E.
Definition expected.hpp:215
Exception thrown by expected::value() when the object is in the error state.
Definition expected.hpp:162
Polyfill for std::expected (C++23) for pre-C++23 compilers.