23class WorkStealingDeque
26 static constexpr size_t CACHE_LINE_SIZE = 64;
27 static constexpr size_t DEFAULT_CAPACITY = 1024;
30 struct alignas(CACHE_LINE_SIZE) AlignedItem
33 AlignedItem() =
default;
34 AlignedItem(T&& t) : item(std::move(t))
37 AlignedItem(T
const& t) : item(t)
42 std::unique_ptr<AlignedItem[]> buffer_;
45 alignas(CACHE_LINE_SIZE) std::atomic<size_t> top_{0};
46 alignas(CACHE_LINE_SIZE) std::atomic<size_t> bottom_{0};
47 alignas(CACHE_LINE_SIZE)
mutable std::mutex mutex_;
50 explicit WorkStealingDeque(
size_t capacity = DEFAULT_CAPACITY)
51 : buffer_(std::make_unique<AlignedItem[]>(capacity)), capacity_(capacity)
56 auto push(T&& item) ->
bool
58 std::lock_guard<std::mutex> lock(mutex_);
59 size_t const t = top_.load(std::memory_order_relaxed);
60 size_t const b = bottom_.load(std::memory_order_relaxed);
62 if (t - b >= capacity_)
67 buffer_[t % capacity_] = AlignedItem(std::move(item));
68 top_.store(t + 1, std::memory_order_release);
72 auto push(T
const& item) ->
bool
74 std::lock_guard<std::mutex> lock(mutex_);
75 size_t const t = top_.load(std::memory_order_relaxed);
76 size_t const b = bottom_.load(std::memory_order_relaxed);
78 if (t - b >= capacity_)
83 buffer_[t % capacity_] = AlignedItem(item);
84 top_.store(t + 1, std::memory_order_release);
88 auto pop(T& item) ->
bool
90 std::lock_guard<std::mutex> lock(mutex_);
91 size_t const t = top_.load(std::memory_order_relaxed);
92 size_t const b = bottom_.load(std::memory_order_relaxed);
99 size_t const new_top = t - 1;
100 item = std::move(buffer_[new_top % capacity_].item);
101 top_.store(new_top, std::memory_order_relaxed);
106 auto steal(T& item) ->
bool
108 std::lock_guard<std::mutex> lock(mutex_);
109 size_t const b = bottom_.load(std::memory_order_relaxed);
110 size_t const t = top_.load(std::memory_order_relaxed);
117 item = std::move(buffer_[b % capacity_].item);
118 bottom_.store(b + 1, std::memory_order_relaxed);
122 auto size()
const ->
size_t
124 size_t const t = top_.load(std::memory_order_relaxed);
125 size_t const b = bottom_.load(std::memory_order_relaxed);
126 return t > b ? t - b : 0;
129 auto empty()
const ->
bool
149class HighPerformancePool
152 using Task = std::function<void()>;
156 size_t total_threads;
157 size_t active_threads;
158 size_t pending_tasks;
159 size_t completed_tasks;
161 double tasks_per_second;
162 std::chrono::microseconds avg_task_time;
165 explicit HighPerformancePool(
size_t num_threads = std::thread::hardware_concurrency())
166 : num_threads_(num_threads == 0 ? 1 : num_threads), stop_(false), next_victim_(0),
167 start_time_(std::chrono::steady_clock::now())
170 worker_queues_.resize(num_threads_);
171 for (
size_t i = 0; i < num_threads_; ++i)
173 worker_queues_[i] = std::make_unique<WorkStealingDeque<Task>>();
176 workers_.reserve(num_threads_);
179 for (
size_t i = 0; i < num_threads_; ++i)
181 workers_.emplace_back(&HighPerformancePool::worker_function,
this, i);
185 HighPerformancePool(HighPerformancePool
const&) =
delete;
186 auto operator=(HighPerformancePool
const&) -> HighPerformancePool& =
delete;
188 ~HighPerformancePool()
196 template <
typename F,
typename... Args>
197 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
199 using return_type = std::invoke_result_t<F, Args...>;
201 auto task = std::make_shared<std::packaged_task<return_type()>>(
202 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
204 std::future<return_type> result = task->get_future();
206 if (stop_.load(std::memory_order_acquire))
208 throw std::runtime_error(
"ThreadPool is shutting down");
212 size_t const preferred_queue = next_victim_.fetch_add(1, std::memory_order_relaxed) % num_threads_;
215 if (worker_queues_[preferred_queue]->push([task]() { (*task)(); }))
217 wakeup_condition_.notify_one();
222 for (
size_t attempts = 0; attempts < (std::min)(num_threads_, size_t(3)); ++attempts)
224 size_t const idx = (preferred_queue + attempts + 1) % num_threads_;
225 if (worker_queues_[idx]->push([task]() { (*task)(); }))
227 wakeup_condition_.notify_one();
234 std::lock_guard<std::mutex> lock(overflow_mutex_);
235 if (stop_.load(std::memory_order_relaxed))
237 throw std::runtime_error(
"ThreadPool is shutting down");
239 overflow_tasks_.emplace([task]() { (*task)(); });
242 wakeup_condition_.notify_all();
249 template <
typename Iterator>
250 auto submit_batch(Iterator begin, Iterator end) -> std::vector<std::future<void>>
252 std::vector<std::future<void>> futures;
253 size_t const batch_size = std::distance(begin, end);
254 futures.reserve(batch_size);
256 if (stop_.load(std::memory_order_acquire))
258 throw std::runtime_error(
"ThreadPool is shutting down");
262 size_t queue_idx = next_victim_.fetch_add(batch_size, std::memory_order_relaxed) % num_threads_;
264 for (
auto it = begin; it != end; ++it)
266 auto task = std::make_shared<std::packaged_task<void()>>(*it);
267 futures.push_back(task->get_future());
271 for (
size_t attempts = 0; attempts < num_threads_; ++attempts)
273 if (worker_queues_[queue_idx]->push([task]() { (*task)(); }))
278 queue_idx = (queue_idx + 1) % num_threads_;
284 std::lock_guard<std::mutex> lock(overflow_mutex_);
285 overflow_tasks_.emplace([task]() { (*task)(); });
290 wakeup_condition_.notify_all();
297 template <
typename Iterator,
typename F>
300 size_t const total_items = std::distance(begin, end);
301 if (total_items == 0)
305 size_t const chunk_size = (std::max)(
size_t(1), total_items / (num_threads_ * 4));
306 std::vector<std::future<void>> futures;
308 for (
auto it = begin; it < end;)
310 auto chunk_end = (std::min)(it + chunk_size, end);
312 futures.push_back(
submit([func, it, chunk_end]() {
313 for (
auto chunk_it = it; chunk_it != chunk_end; ++chunk_it)
323 for (
auto& future : futures)
329 auto size() const noexcept ->
size_t
334 auto pending_tasks() const ->
size_t
337 for (
auto const& queue : worker_queues_)
339 total += queue->size();
342 std::lock_guard<std::mutex> lock(overflow_mutex_);
343 total += overflow_tasks_.size();
350 auto configure_threads(std::string
const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
355 for (
size_t i = 0; i < workers_.size(); ++i)
357 std::string
const thread_name = name_prefix +
"_" + std::to_string(i);
359 if (!workers_[i].set_name(thread_name).has_value())
364 if (!workers_[i].set_scheduling_policy(policy, priority).has_value())
371 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
378 for (
auto& worker : workers_)
380 if (!worker.set_affinity(affinity).has_value())
387 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
390 auto distribute_across_cpus() -> expected<void, std::error_code>
392 auto const cpu_count = std::thread::hardware_concurrency();
394 return unexpected(std::make_error_code(std::errc::invalid_argument));
398 for (
size_t i = 0; i < workers_.size(); ++i)
400 ThreadAffinity affinity({
static_cast<int>(i % cpu_count)});
401 if (!workers_[i].set_affinity(affinity).has_value())
408 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
411 void wait_for_tasks()
413 std::unique_lock<std::mutex> lock(completion_mutex_);
414 completion_condition_.wait(
415 lock, [
this] {
return pending_tasks() == 0 && active_tasks_.load(std::memory_order_acquire) == 0; });
421 std::lock_guard<std::mutex> lock(overflow_mutex_);
422 if (stop_.exchange(
true, std::memory_order_acq_rel))
428 wakeup_condition_.notify_all();
430 for (
auto& worker : workers_)
432 if (worker.joinable())
446 auto const now = std::chrono::steady_clock::now();
447 auto const elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - start_time_);
450 stats.total_threads = num_threads_;
451 stats.active_threads = active_tasks_.load(std::memory_order_acquire);
452 stats.pending_tasks = pending_tasks();
453 stats.completed_tasks = completed_tasks_.load(std::memory_order_acquire);
454 stats.stolen_tasks = stolen_tasks_.load(std::memory_order_acquire);
456 if (elapsed.count() > 0)
458 stats.tasks_per_second =
static_cast<double>(stats.completed_tasks) / elapsed.count();
462 stats.tasks_per_second = 0.0;
465 auto const total_task_time = total_task_time_.load(std::memory_order_acquire);
466 if (stats.completed_tasks > 0)
468 stats.avg_task_time = std::chrono::microseconds(total_task_time / stats.completed_tasks);
472 stats.avg_task_time = std::chrono::microseconds(0);
480 std::vector<ThreadWrapper> workers_;
481 std::vector<std::unique_ptr<WorkStealingDeque<Task>>> worker_queues_;
484 std::queue<Task> overflow_tasks_;
485 mutable std::mutex overflow_mutex_;
488 std::atomic<bool> stop_;
489 std::condition_variable wakeup_condition_;
490 std::mutex wakeup_mutex_;
492 std::condition_variable completion_condition_;
493 std::mutex completion_mutex_;
496 std::atomic<size_t> next_victim_;
497 std::atomic<size_t> active_tasks_{0};
498 std::atomic<size_t> completed_tasks_{0};
499 std::atomic<size_t> stolen_tasks_{0};
500 std::atomic<uint64_t> total_task_time_{0};
502 std::chrono::steady_clock::time_point start_time_;
505 void worker_function(
size_t worker_id)
508 thread_local std::random_device rd;
509 thread_local std::mt19937 gen = []() {
510 std::random_device device;
511 return std::mt19937(device());
515 std::uniform_int_distribution<size_t> dist(0, num_threads_ - 1);
519 bool found_task =
false;
522 if (worker_queues_[worker_id]->pop(task))
529 size_t const max_steal_attempts = (std::min)(num_threads_,
size_t(4));
530 for (
size_t attempts = 0; attempts < max_steal_attempts; ++attempts)
532 size_t const victim_id = dist(gen);
533 if (victim_id != worker_id && worker_queues_[victim_id]->steal(task))
536 stolen_tasks_.fetch_add(1, std::memory_order_relaxed);
545 std::lock_guard<std::mutex> lock(overflow_mutex_);
546 if (!overflow_tasks_.empty())
548 task = std::move(overflow_tasks_.front());
549 overflow_tasks_.pop();
557 active_tasks_.fetch_add(1, std::memory_order_relaxed);
559 auto const start_time = std::chrono::steady_clock::now();
568 auto const end_time = std::chrono::steady_clock::now();
570 auto const task_duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
571 total_task_time_.fetch_add(task_duration.count(), std::memory_order_relaxed);
573 active_tasks_.fetch_sub(1, std::memory_order_relaxed);
574 completed_tasks_.fetch_add(1, std::memory_order_relaxed);
576 completion_condition_.notify_all();
581 if (stop_.load(std::memory_order_acquire))
587 std::unique_lock<std::mutex> lock(wakeup_mutex_);
588 wakeup_condition_.wait_for(lock, std::chrono::microseconds(100));
606 using Task = std::function<void()>;
610 size_t total_threads;
611 size_t active_threads;
612 size_t pending_tasks;
613 size_t completed_tasks;
614 double tasks_per_second;
615 std::chrono::microseconds avg_task_time;
618 explicit FastThreadPool(
size_t num_threads = std::thread::hardware_concurrency())
619 : num_threads_(num_threads == 0 ? 1 : num_threads), stop_(false), start_time_(std::chrono::steady_clock::now())
621 workers_.reserve(num_threads_);
624 for (
size_t i = 0; i < num_threads_; ++i)
626 workers_.emplace_back(&FastThreadPool::worker_function,
this, i);
630 FastThreadPool(FastThreadPool
const&) =
delete;
631 auto operator=(FastThreadPool
const&) -> FastThreadPool& =
delete;
641 template <
typename F,
typename... Args>
642 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
644 using return_type = std::invoke_result_t<F, Args...>;
646 auto task = std::make_shared<std::packaged_task<return_type()>>(
647 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
649 std::future<return_type> result = task->get_future();
652 std::lock_guard<std::mutex> lock(queue_mutex_);
655 throw std::runtime_error(
"FastThreadPool is shutting down");
657 tasks_.emplace([task]() { (*task)(); });
660 condition_.notify_one();
667 template <
typename Iterator>
668 auto submit_batch(Iterator begin, Iterator end) -> std::vector<std::future<void>>
670 std::vector<std::future<void>> futures;
671 size_t const batch_size = std::distance(begin, end);
672 futures.reserve(batch_size);
675 std::lock_guard<std::mutex> lock(queue_mutex_);
678 throw std::runtime_error(
"FastThreadPool is shutting down");
681 for (
auto it = begin; it != end; ++it)
683 auto task = std::make_shared<std::packaged_task<void()>>(*it);
684 futures.push_back(task->get_future());
685 tasks_.emplace([task]() { (*task)(); });
690 condition_.notify_all();
697 std::lock_guard<std::mutex> lock(queue_mutex_);
703 condition_.notify_all();
705 for (
auto& worker : workers_)
707 if (worker.joinable())
716 auto configure_threads(std::string
const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
717 ThreadPriority priority = ThreadPriority::normal()) ->
bool
721 for (
size_t i = 0; i < workers_.size(); ++i)
723 std::string
const thread_name = name_prefix +
"_" + std::to_string(i);
725 if (!workers_[i].set_name(thread_name))
730 if (!workers_[i].set_scheduling_policy(policy, priority))
739 auto distribute_across_cpus() ->
bool
741 auto const cpu_count = std::thread::hardware_concurrency();
747 for (
size_t i = 0; i < workers_.size(); ++i)
749 ThreadAffinity affinity({
static_cast<int>(i % cpu_count)});
750 if (!workers_[i].set_affinity(affinity))
759 auto size() const noexcept ->
size_t
764 auto pending_tasks() const ->
size_t
766 std::lock_guard<std::mutex> lock(queue_mutex_);
767 return tasks_.size();
770 auto get_statistics() const -> Statistics
772 auto const now = std::chrono::steady_clock::now();
773 auto const elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - start_time_);
775 std::lock_guard<std::mutex> lock(queue_mutex_);
777 stats.total_threads = num_threads_;
778 stats.active_threads = active_tasks_.load(std::memory_order_acquire);
779 stats.pending_tasks = tasks_.size();
780 stats.completed_tasks = completed_tasks_.load(std::memory_order_acquire);
782 if (elapsed.count() > 0)
784 stats.tasks_per_second =
static_cast<double>(stats.completed_tasks) / elapsed.count();
788 stats.tasks_per_second = 0.0;
791 auto const total_task_time = total_task_time_.load(std::memory_order_acquire);
792 if (stats.completed_tasks > 0)
794 stats.avg_task_time = std::chrono::microseconds(total_task_time / stats.completed_tasks);
798 stats.avg_task_time = std::chrono::microseconds(0);
806 std::vector<ThreadWrapper> workers_;
807 std::queue<Task> tasks_;
809 mutable std::mutex queue_mutex_;
810 std::condition_variable condition_;
811 std::atomic<bool> stop_;
812 std::atomic<size_t> active_tasks_{0};
813 std::atomic<size_t> completed_tasks_{0};
814 std::atomic<uint64_t> total_task_time_{0};
816 std::chrono::steady_clock::time_point start_time_;
818 void worker_function(
size_t )
823 bool found_task =
false;
826 std::unique_lock<std::mutex> lock(queue_mutex_);
829 if (condition_.wait_for(lock, std::chrono::milliseconds(10),
830 [
this] { return stop_ || !tasks_.empty(); }))
832 if (stop_ && tasks_.empty())
839 task = std::move(tasks_.front());
852 active_tasks_.fetch_add(1, std::memory_order_relaxed);
854 auto const start_time = std::chrono::steady_clock::now();
863 auto const end_time = std::chrono::steady_clock::now();
865 auto const task_duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
866 total_task_time_.fetch_add(task_duration.count(), std::memory_order_relaxed);
868 active_tasks_.fetch_sub(1, std::memory_order_relaxed);
869 completed_tasks_.fetch_add(1, std::memory_order_relaxed);
890 using Task = std::function<void()>;
894 size_t total_threads;
895 size_t active_threads;
896 size_t pending_tasks;
897 size_t completed_tasks;
900 explicit ThreadPool(
size_t num_threads = std::thread::hardware_concurrency())
901 : num_threads_(num_threads == 0 ? 1 : num_threads), stop_(false)
903 workers_.reserve(num_threads_);
906 for (
size_t i = 0; i < num_threads_; ++i)
908 workers_.emplace_back(&ThreadPool::worker_function,
this);
912 ThreadPool(ThreadPool
const&) =
delete;
913 auto operator=(ThreadPool
const&) -> ThreadPool& =
delete;
923 template <
typename F,
typename... Args>
924 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
926 using return_type = std::invoke_result_t<F, Args...>;
928 auto task = std::make_shared<std::packaged_task<return_type()>>(
929 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
931 std::future<return_type> result = task->get_future();
934 std::lock_guard<std::mutex> lock(queue_mutex_);
938 throw std::runtime_error(
"ThreadPool is shutting down");
941 tasks_.emplace([task]() { (*task)(); });
944 condition_.notify_one();
951 template <
typename Iterator>
952 auto submit_range(Iterator begin, Iterator end) -> std::vector<std::future<void>>
954 std::vector<std::future<void>> futures;
955 futures.reserve(std::distance(begin, end));
957 for (
auto it = begin; it != end; ++it)
959 futures.push_back(
submit(*it));
968 template <
typename Iterator,
typename F>
971 std::vector<std::future<void>> futures;
972 futures.reserve(std::distance(begin, end));
974 for (
auto it = begin; it != end; ++it)
976 futures.push_back(
submit([func, it]() { func(*it); }));
980 for (
auto& future : futures)
986 auto size() const noexcept ->
size_t
991 auto pending_tasks() const ->
size_t
993 std::lock_guard<std::mutex> lock(queue_mutex_);
994 return tasks_.size();
1000 auto configure_threads(std::string
const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
1003 bool success =
true;
1005 for (
size_t i = 0; i < workers_.size(); ++i)
1007 std::string
const thread_name = name_prefix +
"_" + std::to_string(i);
1009 if (!workers_[i].set_name(thread_name))
1014 if (!workers_[i].set_scheduling_policy(policy, priority))
1025 bool success =
true;
1027 for (
auto& worker : workers_)
1029 if (!worker.set_affinity(affinity))
1038 auto distribute_across_cpus() ->
bool
1040 auto const cpu_count = std::thread::hardware_concurrency();
1044 bool success =
true;
1046 for (
size_t i = 0; i < workers_.size(); ++i)
1048 ThreadAffinity affinity({
static_cast<int>(i % cpu_count)});
1049 if (!workers_[i].set_affinity(affinity))
1058 void wait_for_tasks()
1060 std::unique_lock<std::mutex> lock(queue_mutex_);
1061 task_finished_condition_.wait(lock, [
this] {
return tasks_.empty() && active_tasks_ == 0; });
1067 std::lock_guard<std::mutex> lock(queue_mutex_);
1073 condition_.notify_all();
1075 for (
auto& worker : workers_)
1077 if (worker.joinable())
1086 auto get_statistics() const -> Statistics
1088 std::lock_guard<std::mutex> lock(queue_mutex_);
1090 stats.total_threads = num_threads_;
1091 stats.active_threads = active_tasks_;
1092 stats.pending_tasks = tasks_.size();
1093 stats.completed_tasks = completed_tasks_;
1098 size_t num_threads_;
1099 std::vector<ThreadWrapper> workers_;
1100 std::queue<Task> tasks_;
1102 mutable std::mutex queue_mutex_;
1103 std::condition_variable condition_;
1104 std::condition_variable task_finished_condition_;
1105 std::atomic<bool> stop_;
1106 std::atomic<size_t> active_tasks_{0};
1107 std::atomic<size_t> completed_tasks_{0};
1109 void worker_function()
1116 std::unique_lock<std::mutex> lock(queue_mutex_);
1118 condition_.wait(lock, [
this] {
return stop_ || !tasks_.empty(); });
1120 if (stop_ && tasks_.empty())
1125 task = std::move(tasks_.front());
1140 std::lock_guard<std::mutex> lock(queue_mutex_);
1145 task_finished_condition_.notify_all();
1187class GlobalHighPerformancePool
1196 template <
typename F,
typename... Args>
1197 static auto submit(F&& f, Args&&... args)
1199 return instance().
submit(std::forward<F>(f), std::forward<Args>(args)...);
1202 template <
typename Iterator>
1203 static auto submit_batch(Iterator begin, Iterator end)
1205 return instance().submit_batch(begin, end);
1208 template <
typename Iterator,
typename F>
1209 static void parallel_for_each(Iterator begin, Iterator end, F&& func)
1211 instance().parallel_for_each(begin, end, std::forward<F>(func));
1215 GlobalHighPerformancePool() =
default;