ThreadSchedule 1.0.0
Modern C++ thread management library
Loading...
Searching...
No Matches
thread_pool_with_errors.hpp
1#pragma once
2
3#include "error_handler.hpp"
4#include "thread_pool.hpp"
5#include <memory>
6
7namespace threadschedule
8{
9
15class HighPerformancePoolWithErrors
16{
17 public:
18 explicit HighPerformancePoolWithErrors(size_t num_threads = std::thread::hardware_concurrency())
19 : pool_(num_threads), error_handler_(std::make_shared<ErrorHandler>())
20 {
21 }
22
26 template <typename F, typename... Args>
27 auto submit(F&& f, Args&&... args) -> FutureWithErrorHandler<std::invoke_result_t<F, Args...>>
28 {
29 auto handler = error_handler_;
30 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler]() {
31 try
32 {
33 return std::apply(f, args);
34 }
35 catch (...)
36 {
37 TaskError error;
38 error.exception = std::current_exception();
39 error.thread_id = std::this_thread::get_id();
40 error.timestamp = std::chrono::steady_clock::now();
41 handler->handle_error(error);
42 throw;
43 }
44 };
45
46 auto future = pool_.submit(std::move(wrapped_task));
47 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(future));
48 }
49
53 template <typename F, typename... Args>
54 auto submit_with_description(std::string const& description, F&& f, Args&&... args)
55 -> FutureWithErrorHandler<std::invoke_result_t<F, Args...>>
56 {
57 auto handler = error_handler_;
58 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler,
59 description]() {
60 try
61 {
62 return std::apply(f, args);
63 }
64 catch (...)
65 {
66 TaskError error;
67 error.exception = std::current_exception();
68 error.task_description = description;
69 error.thread_id = std::this_thread::get_id();
70 error.timestamp = std::chrono::steady_clock::now();
71 handler->handle_error(error);
72 throw;
73 }
74 };
75
76 auto future = pool_.submit(std::move(wrapped_task));
77 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(future));
78 }
79
83 auto add_error_callback(ErrorCallback callback) -> size_t
84 {
85 return error_handler_->add_callback(std::move(callback));
86 }
87
92 {
93 error_handler_->clear_callbacks();
94 }
95
99 [[nodiscard]] auto error_count() const -> size_t
100 {
101 return error_handler_->error_count();
102 }
103
108 {
109 error_handler_->reset_error_count();
110 }
111
115 [[nodiscard]] auto pool() -> HighPerformancePool&
116 {
117 return pool_;
118 }
119
123 [[nodiscard]] auto get_statistics() const -> HighPerformancePool::Statistics
124 {
125 return pool_.get_statistics();
126 }
127
131 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
132 ThreadPriority priority = ThreadPriority::normal()) -> expected<void, std::error_code>
133 {
134 return pool_.configure_threads(name_prefix, policy, priority);
135 }
136
141 {
142 return pool_.distribute_across_cpus();
143 }
144
148 void shutdown()
149 {
150 pool_.shutdown();
151 }
152
157 {
158 pool_.wait_for_tasks();
159 }
160
164 [[nodiscard]] auto size() const noexcept -> size_t
165 {
166 return pool_.size();
167 }
168
172 [[nodiscard]] auto pending_tasks() const -> size_t
173 {
174 return pool_.pending_tasks();
175 }
176
177 private:
179 std::shared_ptr<ErrorHandler> error_handler_;
180};
181
185class FastThreadPoolWithErrors
186{
187 public:
188 explicit FastThreadPoolWithErrors(size_t num_threads = std::thread::hardware_concurrency())
189 : pool_(num_threads), error_handler_(std::make_shared<ErrorHandler>())
190 {
191 }
192
193 template <typename F, typename... Args>
194 auto submit(F&& f, Args&&... args) -> FutureWithErrorHandler<std::invoke_result_t<F, Args...>>
195 {
196 auto handler = error_handler_;
197 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler]() {
198 try
199 {
200 return std::apply(f, args);
201 }
202 catch (...)
203 {
204 TaskError error;
205 error.exception = std::current_exception();
206 error.thread_id = std::this_thread::get_id();
207 error.timestamp = std::chrono::steady_clock::now();
208 handler->handle_error(error);
209 throw;
210 }
211 };
212
213 auto future = pool_.submit(std::move(wrapped_task));
214 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(future));
215 }
216
217 template <typename F, typename... Args>
218 auto submit_with_description(std::string const& description, F&& f, Args&&... args)
219 -> FutureWithErrorHandler<std::invoke_result_t<F, Args...>>
220 {
221 auto handler = error_handler_;
222 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler,
223 description]() {
224 try
225 {
226 return std::apply(f, args);
227 }
228 catch (...)
229 {
230 TaskError error;
231 error.exception = std::current_exception();
232 error.task_description = description;
233 error.thread_id = std::this_thread::get_id();
234 error.timestamp = std::chrono::steady_clock::now();
235 handler->handle_error(error);
236 throw;
237 }
238 };
239
240 auto future = pool_.submit(std::move(wrapped_task));
241 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(future));
242 }
243
244 auto add_error_callback(ErrorCallback callback) -> size_t
245 {
246 return error_handler_->add_callback(std::move(callback));
247 }
248
249 void clear_error_callbacks()
250 {
251 error_handler_->clear_callbacks();
252 }
253
254 [[nodiscard]] auto error_count() const -> size_t
255 {
256 return error_handler_->error_count();
257 }
258
259 void reset_error_count()
260 {
261 error_handler_->reset_error_count();
262 }
263
264 [[nodiscard]] auto pool() -> FastThreadPool&
265 {
266 return pool_;
267 }
268
269 [[nodiscard]] auto get_statistics() const -> FastThreadPool::Statistics
270 {
271 return pool_.get_statistics();
272 }
273
274 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
275 ThreadPriority priority = ThreadPriority::normal()) -> bool
276 {
277 return pool_.configure_threads(name_prefix, policy, priority);
278 }
279
280 auto distribute_across_cpus() -> bool
281 {
282 return pool_.distribute_across_cpus();
283 }
284
285 void shutdown()
286 {
287 pool_.shutdown();
288 }
289
290 [[nodiscard]] auto size() const noexcept -> size_t
291 {
292 return pool_.size();
293 }
294
295 [[nodiscard]] auto pending_tasks() const -> size_t
296 {
297 return pool_.pending_tasks();
298 }
299
300 private:
301 FastThreadPool pool_;
302 std::shared_ptr<ErrorHandler> error_handler_;
303};
304
308class ThreadPoolWithErrors
309{
310 public:
311 explicit ThreadPoolWithErrors(size_t num_threads = std::thread::hardware_concurrency())
312 : pool_(num_threads), error_handler_(std::make_shared<ErrorHandler>())
313 {
314 }
315
316 template <typename F, typename... Args>
317 auto submit(F&& f, Args&&... args) -> FutureWithErrorHandler<std::invoke_result_t<F, Args...>>
318 {
319 auto handler = error_handler_;
320 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler]() {
321 try
322 {
323 return std::apply(f, args);
324 }
325 catch (...)
326 {
327 TaskError error;
328 error.exception = std::current_exception();
329 error.thread_id = std::this_thread::get_id();
330 error.timestamp = std::chrono::steady_clock::now();
331 handler->handle_error(error);
332 throw;
333 }
334 };
335
336 auto future = pool_.submit(std::move(wrapped_task));
337 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(future));
338 }
339
340 template <typename F, typename... Args>
341 auto submit_with_description(std::string const& description, F&& f, Args&&... args)
342 -> FutureWithErrorHandler<std::invoke_result_t<F, Args...>>
343 {
344 auto handler = error_handler_;
345 auto wrapped_task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...), handler,
346 description]() {
347 try
348 {
349 return std::apply(f, args);
350 }
351 catch (...)
352 {
353 TaskError error;
354 error.exception = std::current_exception();
355 error.task_description = description;
356 error.thread_id = std::this_thread::get_id();
357 error.timestamp = std::chrono::steady_clock::now();
358 handler->handle_error(error);
359 throw;
360 }
361 };
362
363 auto future = pool_.submit(std::move(wrapped_task));
364 return FutureWithErrorHandler<std::invoke_result_t<F, Args...>>(std::move(future));
365 }
366
367 auto add_error_callback(ErrorCallback callback) -> size_t
368 {
369 return error_handler_->add_callback(std::move(callback));
370 }
371
372 void clear_error_callbacks()
373 {
374 error_handler_->clear_callbacks();
375 }
376
377 [[nodiscard]] auto error_count() const -> size_t
378 {
379 return error_handler_->error_count();
380 }
381
382 void reset_error_count()
383 {
384 error_handler_->reset_error_count();
385 }
386
387 [[nodiscard]] auto pool() -> ThreadPool&
388 {
389 return pool_;
390 }
391
392 [[nodiscard]] auto get_statistics() const -> ThreadPool::Statistics
393 {
394 return pool_.get_statistics();
395 }
396
397 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
398 ThreadPriority priority = ThreadPriority::normal()) -> bool
399 {
400 return pool_.configure_threads(name_prefix, policy, priority);
401 }
402
403 auto set_affinity(ThreadAffinity const& affinity) -> bool
404 {
405 return pool_.set_affinity(affinity);
406 }
407
408 auto distribute_across_cpus() -> bool
409 {
410 return pool_.distribute_across_cpus();
411 }
412
413 void wait_for_tasks()
414 {
415 pool_.wait_for_tasks();
416 }
417
418 void shutdown()
419 {
420 pool_.shutdown();
421 }
422
423 [[nodiscard]] auto size() const noexcept -> size_t
424 {
425 return pool_.size();
426 }
427
428 [[nodiscard]] auto pending_tasks() const -> size_t
429 {
430 return pool_.pending_tasks();
431 }
432
433 private:
434 ThreadPool pool_;
435 std::shared_ptr<ErrorHandler> error_handler_;
436};
437
438} // namespace threadschedule
Simple high-performance thread pool using single queue with optimized locking.
Future wrapper that provides error callback support.
auto pending_tasks() const -> size_t
Get pending task count.
auto submit_with_description(std::string const &description, F &&f, Args &&... args) -> FutureWithErrorHandler< std::invoke_result_t< F, Args... > >
Submit a task with a description for better error messages.
void wait_for_tasks()
Wait for all tasks to complete.
auto add_error_callback(ErrorCallback callback) -> size_t
Add a global error callback for all tasks.
auto error_count() const -> size_t
Get total error count.
auto size() const noexcept -> size_t
Get pool size.
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal()) -> expected< void, std::error_code >
Configure threads.
auto get_statistics() const -> HighPerformancePool::Statistics
Get statistics from underlying pool.
auto distribute_across_cpus() -> expected< void, std::error_code >
Distribute threads across CPUs.
auto pool() -> HighPerformancePool &
Get the underlying pool.
auto submit(F &&f, Args &&... args) -> FutureWithErrorHandler< std::invoke_result_t< F, Args... > >
Submit a task with automatic error handling.
High-performance thread pool optimized for high-frequency task submission.
auto get_statistics() const -> Statistics
Get detailed performance statistics.
Simple thread pool for general-purpose use.
Thread priority wrapper with validation.
Information about a task exception.