ThreadSchedule 2.0.0
Modern C++ thread management library
Loading...
Searching...
No Matches
thread_pool_with_errors.hpp
Go to the documentation of this file.
1#pragma once
2
7
8#include "error_handler.hpp"
9#include "thread_pool.hpp"
10#include <memory>
11
12namespace threadschedule
13{
14
31template <typename PoolType>
33{
34 public:
35 explicit PoolWithErrors(size_t num_threads = std::thread::hardware_concurrency())
36 : pool_(num_threads), error_handler_(std::make_shared<ErrorHandler>())
37 {
38 }
39
43 template <typename F, typename... Args>
44 auto submit(F&& f, Args&&... args) -> FutureWithErrorHandler<std::invoke_result_t<F, Args...>>
45 {
46 return submit_impl({}, std::forward<F>(f), std::forward<Args>(args)...);
47 }
48
52 template <typename F, typename... Args>
53 auto submit_with_description(std::string const& description, F&& f, Args&&... args)
54 -> FutureWithErrorHandler<std::invoke_result_t<F, Args...>>
55 {
56 return submit_impl(description, std::forward<F>(f), std::forward<Args>(args)...);
57 }
58
62 template <typename F, typename... Args>
63 auto try_submit(F&& f, Args&&... args)
64 -> expected<FutureWithErrorHandler<std::invoke_result_t<F, Args...>>, std::error_code>
65 {
66 return try_submit_impl({}, std::forward<F>(f), std::forward<Args>(args)...);
67 }
68
69 auto add_error_callback(ErrorCallback callback) -> size_t
70 {
71 return error_handler_->add_callback(std::move(callback));
72 }
73
74 auto remove_error_callback(size_t id) -> bool
75 {
76 return error_handler_->remove_callback(id);
77 }
78
80 {
81 error_handler_->clear_callbacks();
82 }
83
84 [[nodiscard]] auto error_count() const -> size_t
85 {
86 return error_handler_->error_count();
87 }
88
90 {
91 error_handler_->reset_error_count();
92 }
93
94 [[nodiscard]] auto pool() -> PoolType&
95 {
96 return pool_;
97 }
98
99 [[nodiscard]] auto get_statistics() const -> decltype(auto)
100 {
101 return pool_.get_statistics();
102 }
103
104 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
105 ThreadPriority priority = ThreadPriority::normal()) -> decltype(auto)
106 {
107 return pool_.configure_threads(name_prefix, policy, priority);
108 }
109
110 auto set_affinity(ThreadAffinity const& affinity) -> decltype(auto)
111 {
112 return pool_.set_affinity(affinity);
113 }
114
115 auto distribute_across_cpus() -> decltype(auto)
116 {
117 return pool_.distribute_across_cpus();
118 }
119
121 {
122 pool_.wait_for_tasks();
123 }
124
125 void shutdown()
126 {
127 pool_.shutdown();
128 }
129
130 [[nodiscard]] auto size() const noexcept -> size_t
131 {
132 return pool_.size();
133 }
134
135 [[nodiscard]] auto pending_tasks() const -> size_t
136 {
137 return pool_.pending_tasks();
138 }
139
140 private:
141 template <typename F, typename... Args>
142 auto submit_impl(std::string description, F&& f, Args&&... args)
143 -> FutureWithErrorHandler<std::invoke_result_t<F, Args...>>
144 {
145 auto handler = error_handler_;
146 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler,
147 desc = std::move(description)]() {
148 try
149 {
150 return std::apply(f, args);
151 }
152 catch (...)
153 {
154 handler->handle_error(TaskError::capture(desc));
155 throw;
156 }
157 };
158 auto future = pool_.submit(std::move(wrapped_task));
159 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(future));
160 }
161
162 template <typename F, typename... Args>
163 auto try_submit_impl(std::string description, F&& f, Args&&... args)
164 -> expected<FutureWithErrorHandler<std::invoke_result_t<F, Args...>>, std::error_code>
165 {
166 auto handler = error_handler_;
167 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler,
168 desc = std::move(description)]() {
169 try
170 {
171 return std::apply(f, args);
172 }
173 catch (...)
174 {
175 handler->handle_error(TaskError::capture(desc));
176 throw;
177 }
178 };
179 auto result = pool_.try_submit(std::move(wrapped_task));
180 if (!result.has_value())
181 return unexpected(result.error());
182 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(result.value()));
183 }
184
185 PoolType pool_;
186 std::shared_ptr<ErrorHandler> error_handler_;
187};
188
191
194
197
198} // namespace threadschedule
Central registry and dispatcher for task-error callbacks.
A move-only future wrapper that supports an error callback.
Thread pool wrapper that combines any pool type with an ErrorHandler.
auto try_submit(F &&f, Args &&... args) -> expected< FutureWithErrorHandler< std::invoke_result_t< F, Args... > >, std::error_code >
Submit a task, returning an error instead of throwing on shutdown.
auto submit_with_description(std::string const &description, F &&f, Args &&... args) -> FutureWithErrorHandler< std::invoke_result_t< F, Args... > >
Submit a task with a description for better error messages.
auto distribute_across_cpus() -> decltype(auto)
auto add_error_callback(ErrorCallback callback) -> size_t
auto get_statistics() const -> decltype(auto)
auto set_affinity(ThreadAffinity const &affinity) -> decltype(auto)
PoolWithErrors(size_t num_threads=std::thread::hardware_concurrency())
auto remove_error_callback(size_t id) -> bool
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal()) -> decltype(auto)
auto submit(F &&f, Args &&... args) -> FutureWithErrorHandler< std::invoke_result_t< F, Args... > >
Submit a task with automatic error handling.
auto size() const noexcept -> size_t
Manages a set of CPU indices to which a thread may be bound.
Value-semantic wrapper for a thread scheduling priority.
static constexpr auto normal() noexcept -> ThreadPriority
A result type that holds either a value of type T or an error of type E.
Definition expected.hpp:215
Error handling primitives: TaskError, ErrorHandler, and ErrorHandledTask.
SchedulingPolicy
Enumeration of available thread scheduling policies.
@ OTHER
Standard round-robin time-sharing.
PoolWithErrors< FastThreadPool > FastThreadPoolWithErrors
FastThreadPool with integrated error handling.
PoolWithErrors< ThreadPool > ThreadPoolWithErrors
ThreadPool with integrated error handling.
PoolWithErrors< HighPerformancePool > HighPerformancePoolWithErrors
HighPerformancePool with integrated error handling.
std::function< void(TaskError const &)> ErrorCallback
Signature for error-handling callbacks registered with ErrorHandler.
static auto capture(std::string description={}) -> TaskError
Capture the current in-flight exception into a TaskError.
Thread pools: HighPerformancePool, ThreadPoolBase, LightweightPoolT, and GlobalPool.