50class WorkStealingDeque
53 static constexpr size_t CACHE_LINE_SIZE = 64;
54 static constexpr size_t DEFAULT_CAPACITY = 1024;
57 struct alignas(CACHE_LINE_SIZE) AlignedItem
60 AlignedItem() =
default;
61 AlignedItem(T&& t) : item(std::move(t))
64 AlignedItem(T
const& t) : item(t)
69 std::unique_ptr<AlignedItem[]> buffer_;
72 alignas(CACHE_LINE_SIZE) std::atomic<size_t> top_{0};
73 alignas(CACHE_LINE_SIZE) std::atomic<size_t> bottom_{0};
74 alignas(CACHE_LINE_SIZE)
mutable std::mutex mutex_;
77 explicit WorkStealingDeque(
size_t capacity = DEFAULT_CAPACITY)
78 : buffer_(std::make_unique<AlignedItem[]>(capacity)), capacity_(capacity)
83 [[nodiscard]]
auto push(T&& item) ->
bool
85 std::lock_guard<std::mutex> lock(mutex_);
86 size_t const t = top_.load(std::memory_order_relaxed);
87 size_t const b = bottom_.load(std::memory_order_relaxed);
89 if (t - b >= capacity_)
94 buffer_[t % capacity_] = AlignedItem(std::move(item));
95 top_.store(t + 1, std::memory_order_release);
99 [[nodiscard]]
auto push(T
const& item) ->
bool
101 std::lock_guard<std::mutex> lock(mutex_);
102 size_t const t = top_.load(std::memory_order_relaxed);
103 size_t const b = bottom_.load(std::memory_order_relaxed);
105 if (t - b >= capacity_)
110 buffer_[t % capacity_] = AlignedItem(item);
111 top_.store(t + 1, std::memory_order_release);
115 [[nodiscard]]
auto pop(T& item) ->
bool
117 std::lock_guard<std::mutex> lock(mutex_);
118 size_t const t = top_.load(std::memory_order_relaxed);
119 size_t const b = bottom_.load(std::memory_order_relaxed);
126 size_t const new_top = t - 1;
127 item = std::move(buffer_[new_top % capacity_].item);
128 top_.store(new_top, std::memory_order_relaxed);
133 [[nodiscard]]
auto steal(T& item) ->
bool
135 std::lock_guard<std::mutex> lock(mutex_);
136 size_t const b = bottom_.load(std::memory_order_relaxed);
137 size_t const t = top_.load(std::memory_order_relaxed);
144 item = std::move(buffer_[b % capacity_].item);
145 bottom_.store(b + 1, std::memory_order_relaxed);
149 [[nodiscard]]
auto size()
const ->
size_t
151 size_t const t = top_.load(std::memory_order_relaxed);
152 size_t const b = bottom_.load(std::memory_order_relaxed);
153 return t > b ? t - b : 0;
156 [[nodiscard]]
auto empty()
const ->
bool
242class HighPerformancePool
245 using Task = std::function<void()>;
249 size_t total_threads;
250 size_t active_threads;
251 size_t pending_tasks;
252 size_t completed_tasks;
254 double tasks_per_second;
255 std::chrono::microseconds avg_task_time;
258 explicit HighPerformancePool(
size_t num_threads = std::thread::hardware_concurrency())
259 : num_threads_(num_threads == 0 ? 1 : num_threads), stop_(false), next_victim_(0),
260 start_time_(std::chrono::steady_clock::now())
263 worker_queues_.resize(num_threads_);
264 for (
size_t i = 0; i < num_threads_; ++i)
266 worker_queues_[i] = std::make_unique<WorkStealingDeque<Task>>();
269 workers_.reserve(num_threads_);
272 for (
size_t i = 0; i < num_threads_; ++i)
274 workers_.emplace_back(&HighPerformancePool::worker_function,
this, i);
278 HighPerformancePool(HighPerformancePool
const&) =
delete;
279 auto operator=(HighPerformancePool
const&) -> HighPerformancePool& =
delete;
281 ~HighPerformancePool()
289 template <
typename F,
typename... Args>
290 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
292 using return_type = std::invoke_result_t<F, Args...>;
294 auto task = std::make_shared<std::packaged_task<return_type()>>(
295 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
297 std::future<return_type> result = task->get_future();
299 if (stop_.load(std::memory_order_acquire))
301 throw std::runtime_error(
"ThreadPool is shutting down");
305 size_t const preferred_queue = next_victim_.fetch_add(1, std::memory_order_relaxed) % num_threads_;
308 if (worker_queues_[preferred_queue]->push([task]() { (*task)(); }))
310 wakeup_condition_.notify_one();
315 for (
size_t attempts = 0; attempts < (std::min)(num_threads_, size_t(3)); ++attempts)
317 size_t const idx = (preferred_queue + attempts + 1) % num_threads_;
318 if (worker_queues_[idx]->push([task]() { (*task)(); }))
320 wakeup_condition_.notify_one();
327 std::lock_guard<std::mutex> lock(overflow_mutex_);
328 if (stop_.load(std::memory_order_relaxed))
330 throw std::runtime_error(
"ThreadPool is shutting down");
332 overflow_tasks_.emplace([task]() { (*task)(); });
335 wakeup_condition_.notify_all();
342 template <
typename Iterator>
343 auto submit_batch(Iterator begin, Iterator end) -> std::vector<std::future<void>>
345 std::vector<std::future<void>> futures;
346 size_t const batch_size = std::distance(begin, end);
347 futures.reserve(batch_size);
349 if (stop_.load(std::memory_order_acquire))
351 throw std::runtime_error(
"ThreadPool is shutting down");
355 size_t queue_idx = next_victim_.fetch_add(batch_size, std::memory_order_relaxed) % num_threads_;
357 for (
auto it = begin; it != end; ++it)
359 auto task = std::make_shared<std::packaged_task<void()>>(*it);
360 futures.push_back(task->get_future());
364 for (
size_t attempts = 0; attempts < num_threads_; ++attempts)
366 if (worker_queues_[queue_idx]->push([task]() { (*task)(); }))
371 queue_idx = (queue_idx + 1) % num_threads_;
377 std::lock_guard<std::mutex> lock(overflow_mutex_);
378 overflow_tasks_.emplace([task]() { (*task)(); });
383 wakeup_condition_.notify_all();
390 template <
typename Iterator,
typename F>
393 size_t const total_items = std::distance(begin, end);
394 if (total_items == 0)
398 size_t const chunk_size = (std::max)(
size_t(1), total_items / (num_threads_ * 4));
399 std::vector<std::future<void>> futures;
401 for (
auto it = begin; it < end;)
403 auto chunk_end = (std::min)(it + chunk_size, end);
405 futures.push_back(
submit([func, it, chunk_end]() {
406 for (
auto chunk_it = it; chunk_it != chunk_end; ++chunk_it)
416 for (
auto& future : futures)
422 [[nodiscard]]
auto size() const noexcept ->
size_t
427 [[nodiscard]]
auto pending_tasks() const ->
size_t
430 for (
auto const& queue : worker_queues_)
432 total += queue->size();
435 std::lock_guard<std::mutex> lock(overflow_mutex_);
436 total += overflow_tasks_.size();
443 auto configure_threads(std::string
const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
448 for (
size_t i = 0; i < workers_.size(); ++i)
450 std::string
const thread_name = name_prefix +
"_" + std::to_string(i);
452 if (!workers_[i].set_name(thread_name).has_value())
457 if (!workers_[i].set_scheduling_policy(policy, priority).has_value())
464 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
471 for (
auto& worker : workers_)
473 if (!worker.set_affinity(affinity).has_value())
480 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
483 auto distribute_across_cpus() -> expected<void, std::error_code>
485 auto const cpu_count = std::thread::hardware_concurrency();
487 return unexpected(std::make_error_code(std::errc::invalid_argument));
491 for (
size_t i = 0; i < workers_.size(); ++i)
493 ThreadAffinity affinity({
static_cast<int>(i % cpu_count)});
494 if (!workers_[i].set_affinity(affinity).has_value())
501 return unexpected(std::make_error_code(std::errc::operation_not_permitted));
504 void wait_for_tasks()
506 std::unique_lock<std::mutex> lock(completion_mutex_);
507 completion_condition_.wait(
508 lock, [
this] {
return pending_tasks() == 0 && active_tasks_.load(std::memory_order_acquire) == 0; });
514 std::lock_guard<std::mutex> lock(overflow_mutex_);
515 if (stop_.exchange(
true, std::memory_order_acq_rel))
521 wakeup_condition_.notify_all();
523 for (
auto& worker : workers_)
525 if (worker.joinable())
539 auto const now = std::chrono::steady_clock::now();
540 auto const elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - start_time_);
543 stats.total_threads = num_threads_;
544 stats.active_threads = active_tasks_.load(std::memory_order_acquire);
545 stats.pending_tasks = pending_tasks();
546 stats.completed_tasks = completed_tasks_.load(std::memory_order_acquire);
547 stats.stolen_tasks = stolen_tasks_.load(std::memory_order_acquire);
549 if (elapsed.count() > 0)
551 stats.tasks_per_second =
static_cast<double>(stats.completed_tasks) / elapsed.count();
555 stats.tasks_per_second = 0.0;
558 auto const total_task_time = total_task_time_.load(std::memory_order_acquire);
559 if (stats.completed_tasks > 0)
561 stats.avg_task_time = std::chrono::microseconds(total_task_time / stats.completed_tasks);
565 stats.avg_task_time = std::chrono::microseconds(0);
573 std::vector<ThreadWrapper> workers_;
574 std::vector<std::unique_ptr<WorkStealingDeque<Task>>> worker_queues_;
577 std::queue<Task> overflow_tasks_;
578 mutable std::mutex overflow_mutex_;
581 std::atomic<bool> stop_;
582 std::condition_variable wakeup_condition_;
583 std::mutex wakeup_mutex_;
585 std::condition_variable completion_condition_;
586 std::mutex completion_mutex_;
589 std::atomic<size_t> next_victim_;
590 std::atomic<size_t> active_tasks_{0};
591 std::atomic<size_t> completed_tasks_{0};
592 std::atomic<size_t> stolen_tasks_{0};
593 std::atomic<uint64_t> total_task_time_{0};
595 std::chrono::steady_clock::time_point start_time_;
598 void worker_function(
size_t worker_id)
601 thread_local std::mt19937 gen = []() {
602 std::random_device device;
603 return std::mt19937(device());
607 std::uniform_int_distribution<size_t> dist(0, num_threads_ - 1);
611 bool found_task =
false;
614 if (worker_queues_[worker_id]->pop(task))
621 size_t const max_steal_attempts = (std::min)(num_threads_,
size_t(4));
622 for (
size_t attempts = 0; attempts < max_steal_attempts; ++attempts)
624 size_t const victim_id = dist(gen);
625 if (victim_id != worker_id && worker_queues_[victim_id]->steal(task))
628 stolen_tasks_.fetch_add(1, std::memory_order_relaxed);
637 std::lock_guard<std::mutex> lock(overflow_mutex_);
638 if (!overflow_tasks_.empty())
640 task = std::move(overflow_tasks_.front());
641 overflow_tasks_.pop();
649 active_tasks_.fetch_add(1, std::memory_order_relaxed);
651 auto const start_time = std::chrono::steady_clock::now();
660 auto const end_time = std::chrono::steady_clock::now();
662 auto const task_duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
663 total_task_time_.fetch_add(task_duration.count(), std::memory_order_relaxed);
665 active_tasks_.fetch_sub(1, std::memory_order_relaxed);
666 completed_tasks_.fetch_add(1, std::memory_order_relaxed);
668 completion_condition_.notify_all();
673 if (stop_.load(std::memory_order_acquire))
679 std::unique_lock<std::mutex> lock(wakeup_mutex_);
680 wakeup_condition_.wait_for(lock, std::chrono::microseconds(100));
750 using Task = std::function<void()>;
754 size_t total_threads;
755 size_t active_threads;
756 size_t pending_tasks;
757 size_t completed_tasks;
758 double tasks_per_second;
759 std::chrono::microseconds avg_task_time;
762 explicit FastThreadPool(
size_t num_threads = std::thread::hardware_concurrency())
763 : num_threads_(num_threads == 0 ? 1 : num_threads), stop_(false), start_time_(std::chrono::steady_clock::now())
765 workers_.reserve(num_threads_);
768 for (
size_t i = 0; i < num_threads_; ++i)
770 workers_.emplace_back(&FastThreadPool::worker_function,
this, i);
774 FastThreadPool(FastThreadPool
const&) =
delete;
775 auto operator=(FastThreadPool
const&) -> FastThreadPool& =
delete;
785 template <
typename F,
typename... Args>
786 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
788 using return_type = std::invoke_result_t<F, Args...>;
790 auto task = std::make_shared<std::packaged_task<return_type()>>(
791 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
793 std::future<return_type> result = task->get_future();
796 std::lock_guard<std::mutex> lock(queue_mutex_);
799 throw std::runtime_error(
"FastThreadPool is shutting down");
801 tasks_.emplace([task]() { (*task)(); });
804 condition_.notify_one();
811 template <
typename Iterator>
812 auto submit_batch(Iterator begin, Iterator end) -> std::vector<std::future<void>>
814 std::vector<std::future<void>> futures;
815 size_t const batch_size = std::distance(begin, end);
816 futures.reserve(batch_size);
819 std::lock_guard<std::mutex> lock(queue_mutex_);
822 throw std::runtime_error(
"FastThreadPool is shutting down");
825 for (
auto it = begin; it != end; ++it)
827 auto task = std::make_shared<std::packaged_task<void()>>(*it);
828 futures.push_back(task->get_future());
829 tasks_.emplace([task]() { (*task)(); });
834 condition_.notify_all();
841 std::lock_guard<std::mutex> lock(queue_mutex_);
847 condition_.notify_all();
849 for (
auto& worker : workers_)
851 if (worker.joinable())
860 auto configure_threads(std::string
const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
861 ThreadPriority priority = ThreadPriority::normal()) ->
bool
865 for (
size_t i = 0; i < workers_.size(); ++i)
867 std::string
const thread_name = name_prefix +
"_" + std::to_string(i);
869 if (!workers_[i].set_name(thread_name))
874 if (!workers_[i].set_scheduling_policy(policy, priority))
883 auto set_affinity(ThreadAffinity
const& affinity) ->
bool
887 for (
auto& worker : workers_)
889 if (!worker.set_affinity(affinity))
898 auto distribute_across_cpus() ->
bool
900 auto const cpu_count = std::thread::hardware_concurrency();
906 for (
size_t i = 0; i < workers_.size(); ++i)
908 ThreadAffinity affinity({
static_cast<int>(i % cpu_count)});
909 if (!workers_[i].set_affinity(affinity))
918 [[nodiscard]]
auto size() const noexcept ->
size_t
923 [[nodiscard]]
auto pending_tasks() const ->
size_t
925 std::lock_guard<std::mutex> lock(queue_mutex_);
926 return tasks_.size();
929 void wait_for_tasks()
931 std::unique_lock<std::mutex> lock(queue_mutex_);
932 task_finished_condition_.wait(
933 lock, [
this] {
return tasks_.empty() && active_tasks_.load(std::memory_order_acquire) == 0; });
936 [[nodiscard]]
auto get_statistics() const -> Statistics
938 auto const now = std::chrono::steady_clock::now();
939 auto const elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - start_time_);
941 std::lock_guard<std::mutex> lock(queue_mutex_);
943 stats.total_threads = num_threads_;
944 stats.active_threads = active_tasks_.load(std::memory_order_acquire);
945 stats.pending_tasks = tasks_.size();
946 stats.completed_tasks = completed_tasks_.load(std::memory_order_acquire);
948 if (elapsed.count() > 0)
950 stats.tasks_per_second =
static_cast<double>(stats.completed_tasks) / elapsed.count();
954 stats.tasks_per_second = 0.0;
957 auto const total_task_time = total_task_time_.load(std::memory_order_acquire);
958 if (stats.completed_tasks > 0)
960 stats.avg_task_time = std::chrono::microseconds(total_task_time / stats.completed_tasks);
964 stats.avg_task_time = std::chrono::microseconds(0);
972 std::vector<ThreadWrapper> workers_;
973 std::queue<Task> tasks_;
975 mutable std::mutex queue_mutex_;
976 std::condition_variable condition_;
977 std::condition_variable task_finished_condition_;
978 std::atomic<bool> stop_;
979 std::atomic<size_t> active_tasks_{0};
980 std::atomic<size_t> completed_tasks_{0};
981 std::atomic<uint64_t> total_task_time_{0};
983 std::chrono::steady_clock::time_point start_time_;
985 void worker_function(
size_t )
990 bool found_task =
false;
993 std::unique_lock<std::mutex> lock(queue_mutex_);
995 if (condition_.wait_for(lock, std::chrono::milliseconds(10),
996 [
this] { return stop_ || !tasks_.empty(); }))
998 if (stop_ && tasks_.empty())
1003 if (!tasks_.empty())
1005 task = std::move(tasks_.front());
1008 active_tasks_.fetch_add(1, std::memory_order_relaxed);
1019 auto const start_time = std::chrono::steady_clock::now();
1027 auto const end_time = std::chrono::steady_clock::now();
1029 auto const task_duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
1030 total_task_time_.fetch_add(task_duration.count(), std::memory_order_relaxed);
1032 active_tasks_.fetch_sub(1, std::memory_order_relaxed);
1033 completed_tasks_.fetch_add(1, std::memory_order_relaxed);
1035 task_finished_condition_.notify_all();
1108 using Task = std::function<void()>;
1112 size_t total_threads;
1113 size_t active_threads;
1114 size_t pending_tasks;
1115 size_t completed_tasks;
1118 explicit ThreadPool(
size_t num_threads = std::thread::hardware_concurrency())
1119 : num_threads_(num_threads == 0 ? 1 : num_threads), stop_(false)
1121 workers_.reserve(num_threads_);
1124 for (
size_t i = 0; i < num_threads_; ++i)
1126 workers_.emplace_back(&ThreadPool::worker_function,
this);
1130 ThreadPool(ThreadPool
const&) =
delete;
1131 auto operator=(ThreadPool
const&) -> ThreadPool& =
delete;
1141 template <
typename F,
typename... Args>
1142 auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
1144 using return_type = std::invoke_result_t<F, Args...>;
1146 auto task = std::make_shared<std::packaged_task<return_type()>>(
1147 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
1149 std::future<return_type> result = task->get_future();
1152 std::lock_guard<std::mutex> lock(queue_mutex_);
1156 throw std::runtime_error(
"ThreadPool is shutting down");
1159 tasks_.emplace([task]() { (*task)(); });
1162 condition_.notify_one();
1169 template <
typename Iterator>
1170 auto submit_range(Iterator begin, Iterator end) -> std::vector<std::future<void>>
1172 std::vector<std::future<void>> futures;
1173 futures.reserve(std::distance(begin, end));
1175 for (
auto it = begin; it != end; ++it)
1177 futures.push_back(
submit(*it));
1186 template <
typename Iterator,
typename F>
1189 std::vector<std::future<void>> futures;
1190 futures.reserve(std::distance(begin, end));
1192 for (
auto it = begin; it != end; ++it)
1194 futures.push_back(
submit([func, it]() { func(*it); }));
1198 for (
auto& future : futures)
1204 [[nodiscard]]
auto size() const noexcept ->
size_t
1206 return num_threads_;
1209 [[nodiscard]]
auto pending_tasks() const ->
size_t
1211 std::lock_guard<std::mutex> lock(queue_mutex_);
1212 return tasks_.size();
1218 auto configure_threads(std::string
const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
1221 bool success =
true;
1223 for (
size_t i = 0; i < workers_.size(); ++i)
1225 std::string
const thread_name = name_prefix +
"_" + std::to_string(i);
1227 if (!workers_[i].set_name(thread_name))
1232 if (!workers_[i].set_scheduling_policy(policy, priority))
1243 bool success =
true;
1245 for (
auto& worker : workers_)
1247 if (!worker.set_affinity(affinity))
1256 auto distribute_across_cpus() ->
bool
1258 auto const cpu_count = std::thread::hardware_concurrency();
1262 bool success =
true;
1264 for (
size_t i = 0; i < workers_.size(); ++i)
1266 ThreadAffinity affinity({
static_cast<int>(i % cpu_count)});
1267 if (!workers_[i].set_affinity(affinity))
1276 void wait_for_tasks()
1278 std::unique_lock<std::mutex> lock(queue_mutex_);
1279 task_finished_condition_.wait(lock, [
this] {
return tasks_.empty() && active_tasks_ == 0; });
1285 std::lock_guard<std::mutex> lock(queue_mutex_);
1291 condition_.notify_all();
1293 for (
auto& worker : workers_)
1295 if (worker.joinable())
1304 [[nodiscard]]
auto get_statistics() const -> Statistics
1306 std::lock_guard<std::mutex> lock(queue_mutex_);
1308 stats.total_threads = num_threads_;
1309 stats.active_threads = active_tasks_;
1310 stats.pending_tasks = tasks_.size();
1311 stats.completed_tasks = completed_tasks_;
1316 size_t num_threads_;
1317 std::vector<ThreadWrapper> workers_;
1318 std::queue<Task> tasks_;
1320 mutable std::mutex queue_mutex_;
1321 std::condition_variable condition_;
1322 std::condition_variable task_finished_condition_;
1323 std::atomic<bool> stop_;
1324 std::atomic<size_t> active_tasks_{0};
1325 std::atomic<size_t> completed_tasks_{0};
1327 void worker_function()
1334 std::unique_lock<std::mutex> lock(queue_mutex_);
1336 condition_.wait(lock, [
this] {
return stop_ || !tasks_.empty(); });
1338 if (stop_ && tasks_.empty())
1343 task = std::move(tasks_.front());
1358 std::lock_guard<std::mutex> lock(queue_mutex_);
1363 task_finished_condition_.notify_all();
1456class GlobalHighPerformancePool
1465 template <
typename F,
typename... Args>
1466 static auto submit(F&& f, Args&&... args)
1468 return instance().
submit(std::forward<F>(f), std::forward<Args>(args)...);
1471 template <
typename Iterator>
1472 static auto submit_batch(Iterator begin, Iterator end)
1474 return instance().submit_batch(begin, end);
1477 template <
typename Iterator,
typename F>
1478 static void parallel_for_each(Iterator begin, Iterator end, F&& func)
1480 instance().parallel_for_each(begin, end, std::forward<F>(func));
1484 GlobalHighPerformancePool() =
default;