ThreadSchedule 1.0.0
Modern C++ thread management library
Loading...
Searching...
No Matches
thread_pool_with_errors.hpp
1#pragma once
2
3#include "error_handler.hpp"
4#include "thread_pool.hpp"
5#include <memory>
6
7namespace threadschedule
8{
9
24class HighPerformancePoolWithErrors
25{
26 public:
27 explicit HighPerformancePoolWithErrors(size_t num_threads = std::thread::hardware_concurrency())
28 : pool_(num_threads), error_handler_(std::make_shared<ErrorHandler>())
29 {
30 }
31
35 template <typename F, typename... Args>
36 auto submit(F&& f, Args&&... args) -> FutureWithErrorHandler<std::invoke_result_t<F, Args...>>
37 {
38 auto handler = error_handler_;
39 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler]() {
40 try
41 {
42 return std::apply(f, args);
43 }
44 catch (...)
45 {
46 TaskError error;
47 error.exception = std::current_exception();
48 error.thread_id = std::this_thread::get_id();
49 error.timestamp = std::chrono::steady_clock::now();
50 handler->handle_error(error);
51 throw;
52 }
53 };
54
55 auto future = pool_.submit(std::move(wrapped_task));
56 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(future));
57 }
58
62 template <typename F, typename... Args>
63 auto submit_with_description(std::string const& description, F&& f, Args&&... args)
64 -> FutureWithErrorHandler<std::invoke_result_t<F, Args...>>
65 {
66 auto handler = error_handler_;
67 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler,
68 description]() {
69 try
70 {
71 return std::apply(f, args);
72 }
73 catch (...)
74 {
75 TaskError error;
76 error.exception = std::current_exception();
77 error.task_description = description;
78 error.thread_id = std::this_thread::get_id();
79 error.timestamp = std::chrono::steady_clock::now();
80 handler->handle_error(error);
81 throw;
82 }
83 };
84
85 auto future = pool_.submit(std::move(wrapped_task));
86 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(future));
87 }
88
92 auto add_error_callback(ErrorCallback callback) -> size_t
93 {
94 return error_handler_->add_callback(std::move(callback));
95 }
96
101 {
102 error_handler_->clear_callbacks();
103 }
104
108 [[nodiscard]] auto error_count() const -> size_t
109 {
110 return error_handler_->error_count();
111 }
112
117 {
118 error_handler_->reset_error_count();
119 }
120
124 [[nodiscard]] auto pool() -> HighPerformancePool&
125 {
126 return pool_;
127 }
128
132 [[nodiscard]] auto get_statistics() const -> HighPerformancePool::Statistics
133 {
134 return pool_.get_statistics();
135 }
136
140 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
141 ThreadPriority priority = ThreadPriority::normal()) -> expected<void, std::error_code>
142 {
143 return pool_.configure_threads(name_prefix, policy, priority);
144 }
145
146 auto set_affinity(ThreadAffinity const& affinity) -> expected<void, std::error_code>
147 {
148 return pool_.set_affinity(affinity);
149 }
150
151 auto distribute_across_cpus() -> expected<void, std::error_code>
152 {
153 return pool_.distribute_across_cpus();
154 }
155
156 void shutdown()
157 {
158 pool_.shutdown();
159 }
160
161 void wait_for_tasks()
162 {
163 pool_.wait_for_tasks();
164 }
165
166 [[nodiscard]] auto size() const noexcept -> size_t
167 {
168 return pool_.size();
169 }
170
171 [[nodiscard]] auto pending_tasks() const -> size_t
172 {
173 return pool_.pending_tasks();
174 }
175
176 private:
177 HighPerformancePool pool_;
178 std::shared_ptr<ErrorHandler> error_handler_;
179};
180
191class FastThreadPoolWithErrors
192{
193 public:
194 explicit FastThreadPoolWithErrors(size_t num_threads = std::thread::hardware_concurrency())
195 : pool_(num_threads), error_handler_(std::make_shared<ErrorHandler>())
196 {
197 }
198
199 template <typename F, typename... Args>
200 auto submit(F&& f, Args&&... args) -> FutureWithErrorHandler<std::invoke_result_t<F, Args...>>
201 {
202 auto handler = error_handler_;
203 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler]() {
204 try
205 {
206 return std::apply(f, args);
207 }
208 catch (...)
209 {
210 TaskError error;
211 error.exception = std::current_exception();
212 error.thread_id = std::this_thread::get_id();
213 error.timestamp = std::chrono::steady_clock::now();
214 handler->handle_error(error);
215 throw;
216 }
217 };
218
219 auto future = pool_.submit(std::move(wrapped_task));
220 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(future));
221 }
222
223 template <typename F, typename... Args>
224 auto submit_with_description(std::string const& description, F&& f, Args&&... args)
225 -> FutureWithErrorHandler<std::invoke_result_t<F, Args...>>
226 {
227 auto handler = error_handler_;
228 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler,
229 description]() {
230 try
231 {
232 return std::apply(f, args);
233 }
234 catch (...)
235 {
236 TaskError error;
237 error.exception = std::current_exception();
238 error.task_description = description;
239 error.thread_id = std::this_thread::get_id();
240 error.timestamp = std::chrono::steady_clock::now();
241 handler->handle_error(error);
242 throw;
243 }
244 };
245
246 auto future = pool_.submit(std::move(wrapped_task));
247 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(future));
248 }
249
250 auto add_error_callback(ErrorCallback callback) -> size_t
251 {
252 return error_handler_->add_callback(std::move(callback));
253 }
254
255 void clear_error_callbacks()
256 {
257 error_handler_->clear_callbacks();
258 }
259
260 [[nodiscard]] auto error_count() const -> size_t
261 {
262 return error_handler_->error_count();
263 }
264
265 void reset_error_count()
266 {
267 error_handler_->reset_error_count();
268 }
269
270 [[nodiscard]] auto pool() -> FastThreadPool&
271 {
272 return pool_;
273 }
274
275 [[nodiscard]] auto get_statistics() const -> FastThreadPool::Statistics
276 {
277 return pool_.get_statistics();
278 }
279
280 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
281 ThreadPriority priority = ThreadPriority::normal()) -> bool
282 {
283 return pool_.configure_threads(name_prefix, policy, priority);
284 }
285
286 auto set_affinity(ThreadAffinity const& affinity) -> bool
287 {
288 return pool_.set_affinity(affinity);
289 }
290
291 auto distribute_across_cpus() -> bool
292 {
293 return pool_.distribute_across_cpus();
294 }
295
296 void wait_for_tasks()
297 {
298 pool_.wait_for_tasks();
299 }
300
301 void shutdown()
302 {
303 pool_.shutdown();
304 }
305
306 [[nodiscard]] auto size() const noexcept -> size_t
307 {
308 return pool_.size();
309 }
310
311 [[nodiscard]] auto pending_tasks() const -> size_t
312 {
313 return pool_.pending_tasks();
314 }
315
316 private:
317 FastThreadPool pool_;
318 std::shared_ptr<ErrorHandler> error_handler_;
319};
320
331class ThreadPoolWithErrors
332{
333 public:
334 explicit ThreadPoolWithErrors(size_t num_threads = std::thread::hardware_concurrency())
335 : pool_(num_threads), error_handler_(std::make_shared<ErrorHandler>())
336 {
337 }
338
339 template <typename F, typename... Args>
340 auto submit(F&& f, Args&&... args) -> FutureWithErrorHandler<std::invoke_result_t<F, Args...>>
341 {
342 auto handler = error_handler_;
343 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler]() {
344 try
345 {
346 return std::apply(f, args);
347 }
348 catch (...)
349 {
350 TaskError error;
351 error.exception = std::current_exception();
352 error.thread_id = std::this_thread::get_id();
353 error.timestamp = std::chrono::steady_clock::now();
354 handler->handle_error(error);
355 throw;
356 }
357 };
358
359 auto future = pool_.submit(std::move(wrapped_task));
360 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(future));
361 }
362
363 template <typename F, typename... Args>
364 auto submit_with_description(std::string const& description, F&& f, Args&&... args)
365 -> FutureWithErrorHandler<std::invoke_result_t<F, Args...>>
366 {
367 auto handler = error_handler_;
368 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler,
369 description]() {
370 try
371 {
372 return std::apply(f, args);
373 }
374 catch (...)
375 {
376 TaskError error;
377 error.exception = std::current_exception();
378 error.task_description = description;
379 error.thread_id = std::this_thread::get_id();
380 error.timestamp = std::chrono::steady_clock::now();
381 handler->handle_error(error);
382 throw;
383 }
384 };
385
386 auto future = pool_.submit(std::move(wrapped_task));
387 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(future));
388 }
389
390 auto add_error_callback(ErrorCallback callback) -> size_t
391 {
392 return error_handler_->add_callback(std::move(callback));
393 }
394
395 void clear_error_callbacks()
396 {
397 error_handler_->clear_callbacks();
398 }
399
400 [[nodiscard]] auto error_count() const -> size_t
401 {
402 return error_handler_->error_count();
403 }
404
405 void reset_error_count()
406 {
407 error_handler_->reset_error_count();
408 }
409
410 [[nodiscard]] auto pool() -> ThreadPool&
411 {
412 return pool_;
413 }
414
415 [[nodiscard]] auto get_statistics() const -> ThreadPool::Statistics
416 {
417 return pool_.get_statistics();
418 }
419
420 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
421 ThreadPriority priority = ThreadPriority::normal()) -> bool
422 {
423 return pool_.configure_threads(name_prefix, policy, priority);
424 }
425
426 auto set_affinity(ThreadAffinity const& affinity) -> bool
427 {
428 return pool_.set_affinity(affinity);
429 }
430
431 auto distribute_across_cpus() -> bool
432 {
433 return pool_.distribute_across_cpus();
434 }
435
436 void wait_for_tasks()
437 {
438 pool_.wait_for_tasks();
439 }
440
441 void shutdown()
442 {
443 pool_.shutdown();
444 }
445
446 [[nodiscard]] auto size() const noexcept -> size_t
447 {
448 return pool_.size();
449 }
450
451 [[nodiscard]] auto pending_tasks() const -> size_t
452 {
453 return pool_.pending_tasks();
454 }
455
456 private:
457 ThreadPool pool_;
458 std::shared_ptr<ErrorHandler> error_handler_;
459};
460
461} // namespace threadschedule
Single-queue thread pool with optimized locking for medium workloads.
A move-only future wrapper that supports an error callback.
auto submit_with_description(std::string const &description, F &&f, Args &&... args) -> FutureWithErrorHandler< std::invoke_result_t< F, Args... > >
Submit a task with a description for better error messages.
auto add_error_callback(ErrorCallback callback) -> size_t
Add a global error callback for all tasks.
auto error_count() const -> size_t
Get total error count.
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal()) -> expected< void, std::error_code >
Configure threads.
auto get_statistics() const -> HighPerformancePool::Statistics
Get statistics from underlying pool.
auto pool() -> HighPerformancePool &
Get the underlying pool.
auto submit(F &&f, Args &&... args) -> FutureWithErrorHandler< std::invoke_result_t< F, Args... > >
Submit a task with automatic error handling.
High-performance thread pool optimized for high-frequency task submission.
auto get_statistics() const -> Statistics
Get detailed performance statistics.
Manages a set of CPU indices to which a thread may be bound.
Simple, general-purpose thread pool.
Value-semantic wrapper for a thread scheduling priority.
A result type that holds either a value of type T or an error of type E.
Definition expected.hpp:215
Holds diagnostic information captured from a failed task.
std::string task_description
Optional human-readable label supplied when the task was submitted.
std::exception_ptr exception
The captured exception. Never null when produced by the library.
std::thread::id thread_id
Id of the thread on which the exception was thrown.
std::chrono::steady_clock::time_point timestamp
Monotonic timestamp recorded immediately after the exception was caught.