31template <
typename PoolType>
35 explicit PoolWithErrors(
size_t num_threads = std::thread::hardware_concurrency())
36 : pool_(num_threads), error_handler_(std::make_shared<
ErrorHandler>())
43 template <
typename F,
typename... Args>
46 return submit_impl({}, std::forward<F>(f), std::forward<Args>(args)...);
52 template <
typename F,
typename... Args>
56 return submit_impl(description, std::forward<F>(f), std::forward<Args>(args)...);
62 template <
typename F,
typename... Args>
66 return try_submit_impl({}, std::forward<F>(f), std::forward<Args>(args)...);
71 return error_handler_->add_callback(std::move(callback));
76 return error_handler_->remove_callback(
id);
81 error_handler_->clear_callbacks();
86 return error_handler_->error_count();
91 error_handler_->reset_error_count();
94 [[nodiscard]]
auto pool() -> PoolType&
101 return pool_.get_statistics();
107 return pool_.configure_threads(name_prefix, policy, priority);
112 return pool_.set_affinity(affinity);
117 return pool_.distribute_across_cpus();
122 pool_.wait_for_tasks();
130 [[nodiscard]]
auto size() const noexcept ->
size_t
137 return pool_.pending_tasks();
141 template <
typename F,
typename... Args>
142 auto submit_impl(std::string description, F&& f, Args&&... args)
145 auto handler = error_handler_;
146 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler,
147 desc = std::move(description)]() {
150 return std::apply(f, args);
158 auto future = pool_.submit(std::move(wrapped_task));
159 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(future));
162 template <
typename F,
typename... Args>
163 auto try_submit_impl(std::string description, F&& f, Args&&... args)
164 -> expected<FutureWithErrorHandler<std::invoke_result_t<F, Args...>>, std::error_code>
166 auto handler = error_handler_;
167 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler,
168 desc = std::move(description)]() {
171 return std::apply(f, args);
179 auto result = pool_.try_submit(std::move(wrapped_task));
180 if (!result.has_value())
181 return unexpected(result.error());
182 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(result.value()));
186 std::shared_ptr<ErrorHandler> error_handler_;
Central registry and dispatcher for task-error callbacks.
A move-only future wrapper that supports an error callback.
Thread pool wrapper that combines any pool type with an ErrorHandler.
auto try_submit(F &&f, Args &&... args) -> expected< FutureWithErrorHandler< std::invoke_result_t< F, Args... > >, std::error_code >
Submit a task, returning an error instead of throwing on shutdown.
auto submit_with_description(std::string const &description, F &&f, Args &&... args) -> FutureWithErrorHandler< std::invoke_result_t< F, Args... > >
Submit a task with a description for better error messages.
auto distribute_across_cpus() -> decltype(auto)
auto add_error_callback(ErrorCallback callback) -> size_t
auto get_statistics() const -> decltype(auto)
auto pending_tasks() const -> size_t
auto error_count() const -> size_t
auto set_affinity(ThreadAffinity const &affinity) -> decltype(auto)
PoolWithErrors(size_t num_threads=std::thread::hardware_concurrency())
auto remove_error_callback(size_t id) -> bool
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal()) -> decltype(auto)
auto submit(F &&f, Args &&... args) -> FutureWithErrorHandler< std::invoke_result_t< F, Args... > >
Submit a task with automatic error handling.
auto pool() -> PoolType &
auto size() const noexcept -> size_t
void clear_error_callbacks()
Manages a set of CPU indices to which a thread may be bound.
Value-semantic wrapper for a thread scheduling priority.
static constexpr auto normal() noexcept -> ThreadPriority
A result type that holds either a value of type T or an error of type E.
Error handling primitives: TaskError, ErrorHandler, and ErrorHandledTask.
SchedulingPolicy
Enumeration of available thread scheduling policies.
@ OTHER
Standard round-robin time-sharing.
PoolWithErrors< FastThreadPool > FastThreadPoolWithErrors
FastThreadPool with integrated error handling.
PoolWithErrors< ThreadPool > ThreadPoolWithErrors
ThreadPool with integrated error handling.
PoolWithErrors< HighPerformancePool > HighPerformancePoolWithErrors
HighPerformancePool with integrated error handling.
std::function< void(TaskError const &)> ErrorCallback
Signature for error-handling callbacks registered with ErrorHandler.
static auto capture(std::string description={}) -> TaskError
Capture the current in-flight exception into a TaskError.
Thread pools: HighPerformancePool, ThreadPoolBase, LightweightPoolT, and GlobalPool.