ThreadSchedule 1.0.0
Modern C++ thread management library
Loading...
Searching...
No Matches
thread_pool.hpp
1#pragma once
2
3#include "expected.hpp"
4#include "scheduler_policy.hpp"
5#include "thread_wrapper.hpp"
6#include <algorithm>
7#include <array>
8#include <atomic>
9#include <condition_variable>
10#include <future>
11#include <mutex>
12#include <queue>
13#include <random>
14#include <vector>
15
16namespace threadschedule
17{
18
22template <typename T>
23class WorkStealingDeque
24{
25 public:
26 static constexpr size_t CACHE_LINE_SIZE = 64;
27 static constexpr size_t DEFAULT_CAPACITY = 1024;
28
29 private:
30 struct alignas(CACHE_LINE_SIZE) AlignedItem
31 {
32 T item;
33 AlignedItem() = default;
34 AlignedItem(T&& t) : item(std::move(t))
35 {
36 }
37 AlignedItem(T const& t) : item(t)
38 {
39 }
40 };
41
42 std::unique_ptr<AlignedItem[]> buffer_;
43 size_t capacity_;
44
45 alignas(CACHE_LINE_SIZE) std::atomic<size_t> top_{0}; // Owner pushes/pops here
46 alignas(CACHE_LINE_SIZE) std::atomic<size_t> bottom_{0}; // Thieves steal here
47 alignas(CACHE_LINE_SIZE) mutable std::mutex mutex_; // For synchronization
48
49 public:
50 explicit WorkStealingDeque(size_t capacity = DEFAULT_CAPACITY)
51 : buffer_(std::make_unique<AlignedItem[]>(capacity)), capacity_(capacity)
52 {
53 }
54
55 // Thread-safe operations
56 auto push(T&& item) -> bool
57 {
58 std::lock_guard<std::mutex> lock(mutex_);
59 size_t const t = top_.load(std::memory_order_relaxed);
60 size_t const b = bottom_.load(std::memory_order_relaxed);
61
62 if (t - b >= capacity_)
63 {
64 return false; // Queue full
65 }
66
67 buffer_[t % capacity_] = AlignedItem(std::move(item));
68 top_.store(t + 1, std::memory_order_release);
69 return true;
70 }
71
72 auto push(T const& item) -> bool
73 {
74 std::lock_guard<std::mutex> lock(mutex_);
75 size_t const t = top_.load(std::memory_order_relaxed);
76 size_t const b = bottom_.load(std::memory_order_relaxed);
77
78 if (t - b >= capacity_)
79 {
80 return false; // Queue full
81 }
82
83 buffer_[t % capacity_] = AlignedItem(item);
84 top_.store(t + 1, std::memory_order_release);
85 return true;
86 }
87
88 auto pop(T& item) -> bool
89 {
90 std::lock_guard<std::mutex> lock(mutex_);
91 size_t const t = top_.load(std::memory_order_relaxed);
92 size_t const b = bottom_.load(std::memory_order_relaxed);
93
94 if (t <= b)
95 {
96 return false; // Empty
97 }
98
99 size_t const new_top = t - 1;
100 item = std::move(buffer_[new_top % capacity_].item);
101 top_.store(new_top, std::memory_order_relaxed);
102 return true;
103 }
104
105 // Thief operations (other threads stealing work)
106 auto steal(T& item) -> bool
107 {
108 std::lock_guard<std::mutex> lock(mutex_);
109 size_t const b = bottom_.load(std::memory_order_relaxed);
110 size_t const t = top_.load(std::memory_order_relaxed);
111
112 if (b >= t)
113 {
114 return false; // Empty
115 }
116
117 item = std::move(buffer_[b % capacity_].item);
118 bottom_.store(b + 1, std::memory_order_relaxed);
119 return true;
120 }
121
122 auto size() const -> size_t
123 {
124 size_t const t = top_.load(std::memory_order_relaxed);
125 size_t const b = bottom_.load(std::memory_order_relaxed);
126 return t > b ? t - b : 0;
127 }
128
129 auto empty() const -> bool
130 {
131 return size() == 0;
132 }
133};
134
149class HighPerformancePool
150{
151 public:
152 using Task = std::function<void()>;
153
155 {
156 size_t total_threads;
157 size_t active_threads;
158 size_t pending_tasks;
159 size_t completed_tasks;
160 size_t stolen_tasks;
161 double tasks_per_second;
162 std::chrono::microseconds avg_task_time;
163 };
164
165 explicit HighPerformancePool(size_t num_threads = std::thread::hardware_concurrency())
166 : num_threads_(num_threads == 0 ? 1 : num_threads), stop_(false), next_victim_(0),
167 start_time_(std::chrono::steady_clock::now())
168 {
169 // Initialize per-thread work queues
170 worker_queues_.resize(num_threads_);
171 for (size_t i = 0; i < num_threads_; ++i)
172 {
173 worker_queues_[i] = std::make_unique<WorkStealingDeque<Task>>();
174 }
175
176 workers_.reserve(num_threads_);
177
178 // Create worker threads with thread-local storage
179 for (size_t i = 0; i < num_threads_; ++i)
180 {
181 workers_.emplace_back(&HighPerformancePool::worker_function, this, i);
182 }
183 }
184
185 HighPerformancePool(HighPerformancePool const&) = delete;
186 auto operator=(HighPerformancePool const&) -> HighPerformancePool& = delete;
187
188 ~HighPerformancePool()
189 {
190 shutdown();
191 }
192
196 template <typename F, typename... Args>
197 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
198 {
199 using return_type = std::invoke_result_t<F, Args...>;
200
201 auto task = std::make_shared<std::packaged_task<return_type()>>(
202 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
203
204 std::future<return_type> result = task->get_future();
205
206 if (stop_.load(std::memory_order_acquire))
207 {
208 throw std::runtime_error("ThreadPool is shutting down");
209 }
210
211 // Try to submit to least loaded queue (round-robin with fallback)
212 size_t const preferred_queue = next_victim_.fetch_add(1, std::memory_order_relaxed) % num_threads_;
213
214 // First try the preferred queue
215 if (worker_queues_[preferred_queue]->push([task]() { (*task)(); }))
216 {
217 wakeup_condition_.notify_one();
218 return result;
219 }
220
221 // If preferred queue is full, try a few random ones
222 for (size_t attempts = 0; attempts < (std::min)(num_threads_, size_t(3)); ++attempts)
223 {
224 size_t const idx = (preferred_queue + attempts + 1) % num_threads_;
225 if (worker_queues_[idx]->push([task]() { (*task)(); }))
226 {
227 wakeup_condition_.notify_one();
228 return result;
229 }
230 }
231
232 // All local queues full, use overflow queue
233 {
234 std::lock_guard<std::mutex> lock(overflow_mutex_);
235 if (stop_.load(std::memory_order_relaxed))
236 {
237 throw std::runtime_error("ThreadPool is shutting down");
238 }
239 overflow_tasks_.emplace([task]() { (*task)(); });
240 }
241
242 wakeup_condition_.notify_all();
243 return result;
244 }
245
249 template <typename Iterator>
250 auto submit_batch(Iterator begin, Iterator end) -> std::vector<std::future<void>>
251 {
252 std::vector<std::future<void>> futures;
253 size_t const batch_size = std::distance(begin, end);
254 futures.reserve(batch_size);
255
256 if (stop_.load(std::memory_order_acquire))
257 {
258 throw std::runtime_error("ThreadPool is shutting down");
259 }
260
261 // Distribute batch across worker queues
262 size_t queue_idx = next_victim_.fetch_add(batch_size, std::memory_order_relaxed) % num_threads_;
263
264 for (auto it = begin; it != end; ++it)
265 {
266 auto task = std::make_shared<std::packaged_task<void()>>(*it);
267 futures.push_back(task->get_future());
268
269 // Try to place in worker queue, round-robin style
270 bool queued = false;
271 for (size_t attempts = 0; attempts < num_threads_; ++attempts)
272 {
273 if (worker_queues_[queue_idx]->push([task]() { (*task)(); }))
274 {
275 queued = true;
276 break;
277 }
278 queue_idx = (queue_idx + 1) % num_threads_;
279 }
280
281 if (!queued)
282 {
283 // Overflow to global queue
284 std::lock_guard<std::mutex> lock(overflow_mutex_);
285 overflow_tasks_.emplace([task]() { (*task)(); });
286 }
287 }
288
289 // Wake up workers for the batch
290 wakeup_condition_.notify_all();
291 return futures;
292 }
293
297 template <typename Iterator, typename F>
298 void parallel_for_each(Iterator begin, Iterator end, F&& func)
299 {
300 size_t const total_items = std::distance(begin, end);
301 if (total_items == 0)
302 return;
303
304 // Calculate optimal chunk size for cache efficiency
305 size_t const chunk_size = (std::max)(size_t(1), total_items / (num_threads_ * 4));
306 std::vector<std::future<void>> futures;
307
308 for (auto it = begin; it < end;)
309 {
310 auto chunk_end = (std::min)(it + chunk_size, end);
311
312 futures.push_back(submit([func, it, chunk_end]() {
313 for (auto chunk_it = it; chunk_it != chunk_end; ++chunk_it)
314 {
315 func(*chunk_it);
316 }
317 }));
318
319 it = chunk_end;
320 }
321
322 // Wait for all chunks to complete
323 for (auto& future : futures)
324 {
325 future.wait();
326 }
327 }
328
329 auto size() const noexcept -> size_t
330 {
331 return num_threads_;
332 }
333
334 auto pending_tasks() const -> size_t
335 {
336 size_t total = 0;
337 for (auto const& queue : worker_queues_)
338 {
339 total += queue->size();
340 }
341
342 std::lock_guard<std::mutex> lock(overflow_mutex_);
343 total += overflow_tasks_.size();
344 return total;
345 }
346
350 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
351 ThreadPriority priority = ThreadPriority::normal()) -> expected<void, std::error_code>
352 {
353 bool success = true;
354
355 for (size_t i = 0; i < workers_.size(); ++i)
356 {
357 std::string const thread_name = name_prefix + "_" + std::to_string(i);
358
359 if (!workers_[i].set_name(thread_name).has_value())
360 {
361 success = false;
362 }
363
364 if (!workers_[i].set_scheduling_policy(policy, priority).has_value())
365 {
366 success = false;
367 }
368 }
369 if (success)
370 return {};
371 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
372 }
373
374 auto set_affinity(ThreadAffinity const& affinity) -> expected<void, std::error_code>
375 {
376 bool success = true;
377
378 for (auto& worker : workers_)
379 {
380 if (!worker.set_affinity(affinity).has_value())
381 {
382 success = false;
383 }
384 }
385 if (success)
386 return {};
387 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
388 }
389
390 auto distribute_across_cpus() -> expected<void, std::error_code>
391 {
392 auto const cpu_count = std::thread::hardware_concurrency();
393 if (cpu_count == 0)
394 return unexpected(std::make_error_code(std::errc::invalid_argument));
395
396 bool success = true;
397
398 for (size_t i = 0; i < workers_.size(); ++i)
399 {
400 ThreadAffinity affinity({static_cast<int>(i % cpu_count)});
401 if (!workers_[i].set_affinity(affinity).has_value())
402 {
403 success = false;
404 }
405 }
406 if (success)
407 return {};
408 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
409 }
410
411 void wait_for_tasks()
412 {
413 std::unique_lock<std::mutex> lock(completion_mutex_);
414 completion_condition_.wait(
415 lock, [this] { return pending_tasks() == 0 && active_tasks_.load(std::memory_order_acquire) == 0; });
416 }
417
418 void shutdown()
419 {
420 {
421 std::lock_guard<std::mutex> lock(overflow_mutex_);
422 if (stop_.exchange(true, std::memory_order_acq_rel))
423 {
424 return; // Already shutting down
425 }
426 }
427
428 wakeup_condition_.notify_all();
429
430 for (auto& worker : workers_)
431 {
432 if (worker.joinable())
433 {
434 worker.join();
435 }
436 }
437
438 workers_.clear();
439 }
440
445 {
446 auto const now = std::chrono::steady_clock::now();
447 auto const elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - start_time_);
448
449 Statistics stats;
450 stats.total_threads = num_threads_;
451 stats.active_threads = active_tasks_.load(std::memory_order_acquire);
452 stats.pending_tasks = pending_tasks();
453 stats.completed_tasks = completed_tasks_.load(std::memory_order_acquire);
454 stats.stolen_tasks = stolen_tasks_.load(std::memory_order_acquire);
455
456 if (elapsed.count() > 0)
457 {
458 stats.tasks_per_second = static_cast<double>(stats.completed_tasks) / elapsed.count();
459 }
460 else
461 {
462 stats.tasks_per_second = 0.0;
463 }
464
465 auto const total_task_time = total_task_time_.load(std::memory_order_acquire);
466 if (stats.completed_tasks > 0)
467 {
468 stats.avg_task_time = std::chrono::microseconds(total_task_time / stats.completed_tasks);
469 }
470 else
471 {
472 stats.avg_task_time = std::chrono::microseconds(0);
473 }
474
475 return stats;
476 }
477
478 private:
479 size_t num_threads_;
480 std::vector<ThreadWrapper> workers_;
481 std::vector<std::unique_ptr<WorkStealingDeque<Task>>> worker_queues_;
482
483 // Overflow queue for when worker queues are full
484 std::queue<Task> overflow_tasks_;
485 mutable std::mutex overflow_mutex_;
486
487 // Synchronization
488 std::atomic<bool> stop_;
489 std::condition_variable wakeup_condition_;
490 std::mutex wakeup_mutex_;
491
492 std::condition_variable completion_condition_;
493 std::mutex completion_mutex_;
494
495 // Load balancing and statistics
496 std::atomic<size_t> next_victim_;
497 std::atomic<size_t> active_tasks_{0};
498 std::atomic<size_t> completed_tasks_{0};
499 std::atomic<size_t> stolen_tasks_{0};
500 std::atomic<uint64_t> total_task_time_{0}; // microseconds
501
502 std::chrono::steady_clock::time_point start_time_;
503
504 // NOLINTNEXTLINE(readability-function-cognitive-complexity)
505 void worker_function(size_t worker_id)
506 {
507 // Thread-local random number generator for work stealing
508 thread_local std::random_device rd;
509 thread_local std::mt19937 gen = []() {
510 std::random_device device;
511 return std::mt19937(device());
512 }();
513
514 Task task;
515 std::uniform_int_distribution<size_t> dist(0, num_threads_ - 1);
516
517 while (true)
518 {
519 bool found_task = false;
520
521 // 1. Try to get task from own queue (fast path)
522 if (worker_queues_[worker_id]->pop(task))
523 {
524 found_task = true;
525 }
526 // 2. Try to steal from other workers (limit attempts to reduce contention)
527 else
528 {
529 size_t const max_steal_attempts = (std::min)(num_threads_, size_t(4));
530 for (size_t attempts = 0; attempts < max_steal_attempts; ++attempts)
531 {
532 size_t const victim_id = dist(gen);
533 if (victim_id != worker_id && worker_queues_[victim_id]->steal(task))
534 {
535 found_task = true;
536 stolen_tasks_.fetch_add(1, std::memory_order_relaxed);
537 break;
538 }
539 }
540 }
541
542 // 3. Try overflow queue
543 if (!found_task)
544 {
545 std::lock_guard<std::mutex> lock(overflow_mutex_);
546 if (!overflow_tasks_.empty())
547 {
548 task = std::move(overflow_tasks_.front());
549 overflow_tasks_.pop();
550 found_task = true;
551 }
552 }
553
554 if (found_task)
555 {
556 // Execute task with timing
557 active_tasks_.fetch_add(1, std::memory_order_relaxed);
558
559 auto const start_time = std::chrono::steady_clock::now();
560 try
561 {
562 task();
563 }
564 catch (...)
565 {
566 // Log exception or handle as needed
567 }
568 auto const end_time = std::chrono::steady_clock::now();
569
570 auto const task_duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
571 total_task_time_.fetch_add(task_duration.count(), std::memory_order_relaxed);
572
573 active_tasks_.fetch_sub(1, std::memory_order_relaxed);
574 completed_tasks_.fetch_add(1, std::memory_order_relaxed);
575
576 completion_condition_.notify_all();
577 }
578 else
579 {
580 // No work found, check if we should stop
581 if (stop_.load(std::memory_order_acquire))
582 {
583 break;
584 }
585
586 // Wait for work with adaptive timeout
587 std::unique_lock<std::mutex> lock(wakeup_mutex_);
588 wakeup_condition_.wait_for(lock, std::chrono::microseconds(100));
589 }
590 }
591 }
592};
593
603class FastThreadPool
604{
605 public:
606 using Task = std::function<void()>;
607
609 {
610 size_t total_threads;
611 size_t active_threads;
612 size_t pending_tasks;
613 size_t completed_tasks;
614 double tasks_per_second;
615 std::chrono::microseconds avg_task_time;
616 };
617
618 explicit FastThreadPool(size_t num_threads = std::thread::hardware_concurrency())
619 : num_threads_(num_threads == 0 ? 1 : num_threads), stop_(false), start_time_(std::chrono::steady_clock::now())
620 {
621 workers_.reserve(num_threads_);
622
623 // Create worker threads
624 for (size_t i = 0; i < num_threads_; ++i)
625 {
626 workers_.emplace_back(&FastThreadPool::worker_function, this, i);
627 }
628 }
629
630 FastThreadPool(FastThreadPool const&) = delete;
631 auto operator=(FastThreadPool const&) -> FastThreadPool& = delete;
632
633 ~FastThreadPool()
634 {
635 shutdown();
636 }
637
641 template <typename F, typename... Args>
642 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
643 {
644 using return_type = std::invoke_result_t<F, Args...>;
645
646 auto task = std::make_shared<std::packaged_task<return_type()>>(
647 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
648
649 std::future<return_type> result = task->get_future();
650
651 {
652 std::lock_guard<std::mutex> lock(queue_mutex_);
653 if (stop_)
654 {
655 throw std::runtime_error("FastThreadPool is shutting down");
656 }
657 tasks_.emplace([task]() { (*task)(); });
658 }
659
660 condition_.notify_one();
661 return result;
662 }
663
667 template <typename Iterator>
668 auto submit_batch(Iterator begin, Iterator end) -> std::vector<std::future<void>>
669 {
670 std::vector<std::future<void>> futures;
671 size_t const batch_size = std::distance(begin, end);
672 futures.reserve(batch_size);
673
674 {
675 std::lock_guard<std::mutex> lock(queue_mutex_);
676 if (stop_)
677 {
678 throw std::runtime_error("FastThreadPool is shutting down");
679 }
680
681 for (auto it = begin; it != end; ++it)
682 {
683 auto task = std::make_shared<std::packaged_task<void()>>(*it);
684 futures.push_back(task->get_future());
685 tasks_.emplace([task]() { (*task)(); });
686 }
687 }
688
689 // Wake up all workers for batch processing
690 condition_.notify_all();
691 return futures;
692 }
693
694 void shutdown()
695 {
696 {
697 std::lock_guard<std::mutex> lock(queue_mutex_);
698 if (stop_)
699 return;
700 stop_ = true;
701 }
702
703 condition_.notify_all();
704
705 for (auto& worker : workers_)
706 {
707 if (worker.joinable())
708 {
709 worker.join();
710 }
711 }
712
713 workers_.clear();
714 }
715
716 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
717 ThreadPriority priority = ThreadPriority::normal()) -> bool
718 {
719 bool success = true;
720
721 for (size_t i = 0; i < workers_.size(); ++i)
722 {
723 std::string const thread_name = name_prefix + "_" + std::to_string(i);
724
725 if (!workers_[i].set_name(thread_name))
726 {
727 success = false;
728 }
729
730 if (!workers_[i].set_scheduling_policy(policy, priority))
731 {
732 success = false;
733 }
734 }
735
736 return success;
737 }
738
739 auto distribute_across_cpus() -> bool
740 {
741 auto const cpu_count = std::thread::hardware_concurrency();
742 if (cpu_count == 0)
743 return false;
744
745 bool success = true;
746
747 for (size_t i = 0; i < workers_.size(); ++i)
748 {
749 ThreadAffinity affinity({static_cast<int>(i % cpu_count)});
750 if (!workers_[i].set_affinity(affinity))
751 {
752 success = false;
753 }
754 }
755
756 return success;
757 }
758
759 auto size() const noexcept -> size_t
760 {
761 return num_threads_;
762 }
763
764 auto pending_tasks() const -> size_t
765 {
766 std::lock_guard<std::mutex> lock(queue_mutex_);
767 return tasks_.size();
768 }
769
770 auto get_statistics() const -> Statistics
771 {
772 auto const now = std::chrono::steady_clock::now();
773 auto const elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - start_time_);
774
775 std::lock_guard<std::mutex> lock(queue_mutex_);
776 Statistics stats;
777 stats.total_threads = num_threads_;
778 stats.active_threads = active_tasks_.load(std::memory_order_acquire);
779 stats.pending_tasks = tasks_.size();
780 stats.completed_tasks = completed_tasks_.load(std::memory_order_acquire);
781
782 if (elapsed.count() > 0)
783 {
784 stats.tasks_per_second = static_cast<double>(stats.completed_tasks) / elapsed.count();
785 }
786 else
787 {
788 stats.tasks_per_second = 0.0;
789 }
790
791 auto const total_task_time = total_task_time_.load(std::memory_order_acquire);
792 if (stats.completed_tasks > 0)
793 {
794 stats.avg_task_time = std::chrono::microseconds(total_task_time / stats.completed_tasks);
795 }
796 else
797 {
798 stats.avg_task_time = std::chrono::microseconds(0);
799 }
800
801 return stats;
802 }
803
804 private:
805 size_t num_threads_;
806 std::vector<ThreadWrapper> workers_;
807 std::queue<Task> tasks_;
808
809 mutable std::mutex queue_mutex_;
810 std::condition_variable condition_;
811 std::atomic<bool> stop_;
812 std::atomic<size_t> active_tasks_{0};
813 std::atomic<size_t> completed_tasks_{0};
814 std::atomic<uint64_t> total_task_time_{0}; // microseconds
815
816 std::chrono::steady_clock::time_point start_time_;
817
818 void worker_function(size_t /* worker_id */)
819 {
820 while (true)
821 {
822 Task task;
823 bool found_task = false;
824
825 {
826 std::unique_lock<std::mutex> lock(queue_mutex_);
827
828 // Use timeout to avoid indefinite blocking
829 if (condition_.wait_for(lock, std::chrono::milliseconds(10),
830 [this] { return stop_ || !tasks_.empty(); }))
831 {
832 if (stop_ && tasks_.empty())
833 {
834 return;
835 }
836
837 if (!tasks_.empty())
838 {
839 task = std::move(tasks_.front());
840 tasks_.pop();
841 found_task = true;
842 }
843 }
844 else if (stop_)
845 {
846 return;
847 }
848 }
849
850 if (found_task)
851 {
852 active_tasks_.fetch_add(1, std::memory_order_relaxed);
853
854 auto const start_time = std::chrono::steady_clock::now();
855 try
856 {
857 task();
858 }
859 catch (...)
860 {
861 // Log exception or handle as needed
862 }
863 auto const end_time = std::chrono::steady_clock::now();
864
865 auto const task_duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
866 total_task_time_.fetch_add(task_duration.count(), std::memory_order_relaxed);
867
868 active_tasks_.fetch_sub(1, std::memory_order_relaxed);
869 completed_tasks_.fetch_add(1, std::memory_order_relaxed);
870 }
871 }
872 }
873};
874
887class ThreadPool
888{
889 public:
890 using Task = std::function<void()>;
891
893 {
894 size_t total_threads;
895 size_t active_threads;
896 size_t pending_tasks;
897 size_t completed_tasks;
898 };
899
900 explicit ThreadPool(size_t num_threads = std::thread::hardware_concurrency())
901 : num_threads_(num_threads == 0 ? 1 : num_threads), stop_(false)
902 {
903 workers_.reserve(num_threads_);
904
905 // Create worker threads
906 for (size_t i = 0; i < num_threads_; ++i)
907 {
908 workers_.emplace_back(&ThreadPool::worker_function, this);
909 }
910 }
911
912 ThreadPool(ThreadPool const&) = delete;
913 auto operator=(ThreadPool const&) -> ThreadPool& = delete;
914
915 ~ThreadPool()
916 {
917 shutdown();
918 }
919
923 template <typename F, typename... Args>
924 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
925 {
926 using return_type = std::invoke_result_t<F, Args...>;
927
928 auto task = std::make_shared<std::packaged_task<return_type()>>(
929 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
930
931 std::future<return_type> result = task->get_future();
932
933 {
934 std::lock_guard<std::mutex> lock(queue_mutex_);
935
936 if (stop_)
937 {
938 throw std::runtime_error("ThreadPool is shutting down");
939 }
940
941 tasks_.emplace([task]() { (*task)(); });
942 }
943
944 condition_.notify_one();
945 return result;
946 }
947
951 template <typename Iterator>
952 auto submit_range(Iterator begin, Iterator end) -> std::vector<std::future<void>>
953 {
954 std::vector<std::future<void>> futures;
955 futures.reserve(std::distance(begin, end));
956
957 for (auto it = begin; it != end; ++it)
958 {
959 futures.push_back(submit(*it));
960 }
961
962 return futures;
963 }
964
968 template <typename Iterator, typename F>
969 void parallel_for_each(Iterator begin, Iterator end, F&& func)
970 {
971 std::vector<std::future<void>> futures;
972 futures.reserve(std::distance(begin, end));
973
974 for (auto it = begin; it != end; ++it)
975 {
976 futures.push_back(submit([func, it]() { func(*it); }));
977 }
978
979 // Wait for all tasks to complete
980 for (auto& future : futures)
981 {
982 future.wait();
983 }
984 }
985
986 auto size() const noexcept -> size_t
987 {
988 return num_threads_;
989 }
990
991 auto pending_tasks() const -> size_t
992 {
993 std::lock_guard<std::mutex> lock(queue_mutex_);
994 return tasks_.size();
995 }
996
1000 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
1001 ThreadPriority priority = ThreadPriority::normal()) -> bool
1002 {
1003 bool success = true;
1004
1005 for (size_t i = 0; i < workers_.size(); ++i)
1006 {
1007 std::string const thread_name = name_prefix + "_" + std::to_string(i);
1008
1009 if (!workers_[i].set_name(thread_name))
1010 {
1011 success = false;
1012 }
1013
1014 if (!workers_[i].set_scheduling_policy(policy, priority))
1015 {
1016 success = false;
1017 }
1018 }
1019
1020 return success;
1021 }
1022
1023 auto set_affinity(ThreadAffinity const& affinity) -> bool
1024 {
1025 bool success = true;
1026
1027 for (auto& worker : workers_)
1028 {
1029 if (!worker.set_affinity(affinity))
1030 {
1031 success = false;
1032 }
1033 }
1034
1035 return success;
1036 }
1037
1038 auto distribute_across_cpus() -> bool
1039 {
1040 auto const cpu_count = std::thread::hardware_concurrency();
1041 if (cpu_count == 0)
1042 return false;
1043
1044 bool success = true;
1045
1046 for (size_t i = 0; i < workers_.size(); ++i)
1047 {
1048 ThreadAffinity affinity({static_cast<int>(i % cpu_count)});
1049 if (!workers_[i].set_affinity(affinity))
1050 {
1051 success = false;
1052 }
1053 }
1054
1055 return success;
1056 }
1057
1058 void wait_for_tasks()
1059 {
1060 std::unique_lock<std::mutex> lock(queue_mutex_);
1061 task_finished_condition_.wait(lock, [this] { return tasks_.empty() && active_tasks_ == 0; });
1062 }
1063
1064 void shutdown()
1065 {
1066 {
1067 std::lock_guard<std::mutex> lock(queue_mutex_);
1068 if (stop_)
1069 return;
1070 stop_ = true;
1071 }
1072
1073 condition_.notify_all();
1074
1075 for (auto& worker : workers_)
1076 {
1077 if (worker.joinable())
1078 {
1079 worker.join();
1080 }
1081 }
1082
1083 workers_.clear();
1084 }
1085
1086 auto get_statistics() const -> Statistics
1087 {
1088 std::lock_guard<std::mutex> lock(queue_mutex_);
1089 Statistics stats;
1090 stats.total_threads = num_threads_;
1091 stats.active_threads = active_tasks_;
1092 stats.pending_tasks = tasks_.size();
1093 stats.completed_tasks = completed_tasks_;
1094 return stats;
1095 }
1096
1097 private:
1098 size_t num_threads_;
1099 std::vector<ThreadWrapper> workers_;
1100 std::queue<Task> tasks_;
1101
1102 mutable std::mutex queue_mutex_;
1103 std::condition_variable condition_;
1104 std::condition_variable task_finished_condition_;
1105 std::atomic<bool> stop_;
1106 std::atomic<size_t> active_tasks_{0};
1107 std::atomic<size_t> completed_tasks_{0};
1108
1109 void worker_function()
1110 {
1111 while (true)
1112 {
1113 Task task;
1114
1115 {
1116 std::unique_lock<std::mutex> lock(queue_mutex_);
1117
1118 condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
1119
1120 if (stop_ && tasks_.empty())
1121 {
1122 return;
1123 }
1124
1125 task = std::move(tasks_.front());
1126 tasks_.pop();
1127 ++active_tasks_;
1128 }
1129
1130 try
1131 {
1132 task();
1133 }
1134 catch (...)
1135 {
1136 // Log exception or handle as needed
1137 }
1138
1139 {
1140 std::lock_guard<std::mutex> lock(queue_mutex_);
1141 --active_tasks_;
1142 ++completed_tasks_;
1143 }
1144
1145 task_finished_condition_.notify_all();
1146 }
1147 }
1148};
1149
1153class GlobalThreadPool
1154{
1155 public:
1156 static auto instance() -> ThreadPool&
1157 {
1158 static ThreadPool pool(std::thread::hardware_concurrency());
1159 return pool;
1160 }
1161
1162 template <typename F, typename... Args>
1163 static auto submit(F&& f, Args&&... args)
1164 {
1165 return instance().submit(std::forward<F>(f), std::forward<Args>(args)...);
1166 }
1167
1168 template <typename Iterator>
1169 static auto submit_range(Iterator begin, Iterator end)
1170 {
1171 return instance().submit_range(begin, end);
1172 }
1173
1174 template <typename Iterator, typename F>
1175 static void parallel_for_each(Iterator begin, Iterator end, F&& func)
1176 {
1177 instance().parallel_for_each(begin, end, std::forward<F>(func));
1178 }
1179
1180 private:
1181 GlobalThreadPool() = default;
1182};
1183
1187class GlobalHighPerformancePool
1188{
1189 public:
1190 static auto instance() -> HighPerformancePool&
1191 {
1192 static HighPerformancePool pool(std::thread::hardware_concurrency());
1193 return pool;
1194 }
1195
1196 template <typename F, typename... Args>
1197 static auto submit(F&& f, Args&&... args)
1198 {
1199 return instance().submit(std::forward<F>(f), std::forward<Args>(args)...);
1200 }
1201
1202 template <typename Iterator>
1203 static auto submit_batch(Iterator begin, Iterator end)
1204 {
1205 return instance().submit_batch(begin, end);
1206 }
1207
1208 template <typename Iterator, typename F>
1209 static void parallel_for_each(Iterator begin, Iterator end, F&& func)
1210 {
1211 instance().parallel_for_each(begin, end, std::forward<F>(func));
1212 }
1213
1214 private:
1215 GlobalHighPerformancePool() = default;
1216};
1217
1221template <typename Container, typename F>
1222void parallel_for_each(Container& container, F&& func)
1223{
1224 GlobalThreadPool::parallel_for_each(container.begin(), container.end(), std::forward<F>(func));
1225}
1226
1227} // namespace threadschedule
auto submit_batch(Iterator begin, Iterator end) -> std::vector< std::future< void > >
Efficient batch processing.
auto submit(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Optimized task submission with minimal locking.
High-performance thread pool optimized for high-frequency task submission.
auto submit(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
High-performance task submission (optimized hot path)
auto submit_batch(Iterator begin, Iterator end) -> std::vector< std::future< void > >
Batch task submission for maximum throughput.
void parallel_for_each(Iterator begin, Iterator end, F &&func)
Optimized parallel for_each with work distribution.
auto get_statistics() const -> Statistics
Get detailed performance statistics.
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal()) -> expected< void, std::error_code >
Configure all worker threads.
Simple thread pool for general-purpose use.
auto submit_range(Iterator begin, Iterator end) -> std::vector< std::future< void > >
Submit multiple tasks.
void parallel_for_each(Iterator begin, Iterator end, F &&func)
Apply a function to a range of values in parallel.
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal()) -> bool
Configure thread properties.
auto submit(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task to the thread pool.
Thread priority wrapper with validation.