ThreadSchedule 2.0.0
Modern C++ thread management library
Loading...
Searching...
No Matches
task.hpp
Go to the documentation of this file.
1#pragma once
2
13
14#if defined(__cpp_impl_coroutine) && __cpp_impl_coroutine >= 201902L
15
16#include <atomic>
17#include <coroutine>
18#include <exception>
19#include <functional>
20#include <future>
21#include <optional>
22#include <type_traits>
23#include <utility>
24
25namespace threadschedule
26{
27
28template <typename T = void>
29class task;
30
37struct executor_base
38{
39 virtual void execute(std::coroutine_handle<>) = 0;
40 virtual ~executor_base() = default;
41};
42
48template <typename Pool>
49struct pool_executor : executor_base
50{
51 Pool& pool;
52 explicit pool_executor(Pool& p) : pool(p) {}
53
54 void execute(std::coroutine_handle<> h) override
55 {
56 pool.submit([h]() mutable { h.resume(); });
57 }
58};
59
60namespace detail
61{
62
73struct final_awaiter
74{
75 [[nodiscard]] auto await_ready() const noexcept -> bool
76 {
77 return false;
78 }
79
80 template <typename Promise>
81 auto await_suspend(std::coroutine_handle<Promise> h) const noexcept -> std::coroutine_handle<>
82 {
83 auto cont = h.promise().continuation_;
84 if (h.promise().executor_ && cont)
85 {
86 h.promise().executor_->execute(cont);
87 return std::noop_coroutine();
88 }
89 if (cont)
90 return cont;
91 return std::noop_coroutine();
92 }
93
94 void await_resume() const noexcept
95 {
96 }
97};
98
120template <typename T>
121class task_promise_base
122{
123 public:
124 task_promise_base() = default;
125
126 auto initial_suspend() noexcept -> std::suspend_always
127 {
128 return {};
129 }
130
131 auto final_suspend() noexcept -> final_awaiter
132 {
133 return {};
134 }
135
136 void unhandled_exception() noexcept
137 {
138 exception_ = std::current_exception();
139 }
140
141 void rethrow_if_exception()
142 {
143 if (exception_)
144 std::rethrow_exception(exception_);
145 }
146
147 std::coroutine_handle<> continuation_{};
148 executor_base* executor_{nullptr};
149
150 protected:
151 std::exception_ptr exception_{};
152};
153
154} // namespace detail
155
156// --- task<T> (non-void) ---
157
189template <typename T>
190class task
191{
192 public:
193 struct promise_type : detail::task_promise_base<T>
194 {
195 auto get_return_object() noexcept -> task
196 {
197 return task{std::coroutine_handle<promise_type>::from_promise(*this)};
198 }
199
200 void return_value(T value) noexcept(std::is_nothrow_move_constructible_v<T>)
201 {
202 result_.emplace(std::move(value));
203 }
204
205 auto result() -> T
206 {
207 this->rethrow_if_exception();
208 return std::move(*result_);
209 }
210
211 private:
212 std::optional<T> result_{};
213 };
214
215 task() noexcept = default;
216
217 task(task&& other) noexcept : handle_(std::exchange(other.handle_, nullptr))
218 {
219 }
220
221 auto operator=(task&& other) noexcept -> task&
222 {
223 if (this != &other)
224 {
225 destroy();
226 handle_ = std::exchange(other.handle_, nullptr);
227 }
228 return *this;
229 }
230
231 ~task()
232 {
233 destroy();
234 }
235
236 task(task const&) = delete;
237 auto operator=(task const&) -> task& = delete;
238
239 [[nodiscard]] auto handle() const noexcept -> std::coroutine_handle<promise_type>
240 {
241 return handle_;
242 }
243
244 struct awaiter
245 {
246 std::coroutine_handle<promise_type> handle_;
247
248 [[nodiscard]] auto await_ready() const noexcept -> bool
249 {
250 return false;
251 }
252
253 auto await_suspend(std::coroutine_handle<> continuation) noexcept -> std::coroutine_handle<>
254 {
255 handle_.promise().continuation_ = continuation;
256 return handle_;
257 }
258
259 auto await_resume() -> T
260 {
261 return handle_.promise().result();
262 }
263 };
264
265 auto operator co_await() const& noexcept -> awaiter
266 {
267 return awaiter{handle_};
268 }
269
270 auto operator co_await() const&& noexcept -> awaiter
271 {
272 return awaiter{handle_};
273 }
274
275 private:
276 explicit task(std::coroutine_handle<promise_type> h) noexcept : handle_(h)
277 {
278 }
279
280 void destroy()
281 {
282 if (handle_)
283 {
284 handle_.destroy();
285 handle_ = nullptr;
286 }
287 }
288
289 std::coroutine_handle<promise_type> handle_{};
290};
291
292// --- task<void> ---
293
321template <>
322class task<void>
323{
324 public:
325 struct promise_type : detail::task_promise_base<void>
326 {
327 auto get_return_object() noexcept -> task<void>
328 {
329 return task<void>{std::coroutine_handle<promise_type>::from_promise(*this)};
330 }
331
332 void return_void() noexcept
333 {
334 }
335
336 void result()
337 {
338 this->rethrow_if_exception();
339 }
340 };
341
342 task() noexcept = default;
343
344 task(task&& other) noexcept : handle_(std::exchange(other.handle_, nullptr))
345 {
346 }
347
348 auto operator=(task&& other) noexcept -> task<void>&
349 {
350 if (this != &other)
351 {
352 destroy();
353 handle_ = std::exchange(other.handle_, nullptr);
354 }
355 return *this;
356 }
357
358 ~task()
359 {
360 destroy();
361 }
362
363 task(task const&) = delete;
364 auto operator=(task const&) -> task<void>& = delete;
365
366 [[nodiscard]] auto handle() const noexcept -> std::coroutine_handle<promise_type>
367 {
368 return handle_;
369 }
370
371 struct awaiter
372 {
373 std::coroutine_handle<promise_type> handle_;
374
375 [[nodiscard]] auto await_ready() const noexcept -> bool
376 {
377 return false;
378 }
379
380 auto await_suspend(std::coroutine_handle<> continuation) noexcept -> std::coroutine_handle<>
381 {
382 handle_.promise().continuation_ = continuation;
383 return handle_;
384 }
385
386 void await_resume()
387 {
388 handle_.promise().result();
389 }
390 };
391
392 auto operator co_await() const& noexcept -> awaiter
393 {
394 return awaiter{handle_};
395 }
396
397 auto operator co_await() const&& noexcept -> awaiter
398 {
399 return awaiter{handle_};
400 }
401
402 private:
403 explicit task(std::coroutine_handle<promise_type> h) noexcept : handle_(h)
404 {
405 }
406
407 void destroy()
408 {
409 if (handle_)
410 {
411 handle_.destroy();
412 handle_ = nullptr;
413 }
414 }
415
416 std::coroutine_handle<promise_type> handle_{};
417};
418
419// --- sync_wait ---
420
421namespace detail
422{
423
444class sync_wait_task
445{
446 public:
447 struct promise_type
448 {
449 auto initial_suspend() noexcept -> std::suspend_always
450 {
451 return {};
452 }
453
454 auto final_suspend() noexcept
455 {
456 struct notifier
457 {
458 [[nodiscard]] auto await_ready() const noexcept -> bool
459 {
460 return false;
461 }
462
463 void await_suspend(std::coroutine_handle<promise_type> h) const noexcept
464 {
465 h.promise().finished_.store(true, std::memory_order_release);
466 h.promise().finished_.notify_one();
467 }
468
469 void await_resume() const noexcept
470 {
471 }
472 };
473 return notifier{};
474 }
475
476 auto get_return_object() -> sync_wait_task
477 {
478 return sync_wait_task{std::coroutine_handle<promise_type>::from_promise(*this)};
479 }
480
481 void return_void() noexcept
482 {
483 }
484
485 void unhandled_exception() noexcept
486 {
487 exception_ = std::current_exception();
488 }
489
490 std::atomic<bool> finished_{false};
491 std::exception_ptr exception_{};
492 };
493
494 explicit sync_wait_task(std::coroutine_handle<promise_type> h) : handle_(h)
495 {
496 }
497
498 sync_wait_task(sync_wait_task&& other) noexcept : handle_(std::exchange(other.handle_, nullptr))
499 {
500 }
501
502 ~sync_wait_task()
503 {
504 if (handle_)
505 handle_.destroy();
506 }
507
508 sync_wait_task(sync_wait_task const&) = delete;
509 auto operator=(sync_wait_task const&) -> sync_wait_task& = delete;
510 auto operator=(sync_wait_task&&) -> sync_wait_task& = delete;
511
512 void start()
513 {
514 handle_.resume();
515 }
516
517 void wait()
518 {
519 handle_.promise().finished_.wait(false, std::memory_order_acquire);
520 }
521
522 void rethrow()
523 {
524 if (handle_.promise().exception_)
525 std::rethrow_exception(handle_.promise().exception_);
526 }
527
528 private:
529 std::coroutine_handle<promise_type> handle_;
530};
531
532} // namespace detail
533
556template <typename T>
557auto sync_wait(task<T> t) -> T
558{
559 std::optional<T> result;
560 std::exception_ptr ex;
561
562 auto wrapper = [&result, &ex](task<T> inner) -> detail::sync_wait_task {
563 try
564 {
565 result.emplace(co_await inner);
566 }
567 catch (...)
568 {
569 ex = std::current_exception();
570 }
571 };
572
573 auto sw = wrapper(std::move(t));
574 sw.start();
575 sw.wait();
576 sw.rethrow();
577
578 if (ex)
579 std::rethrow_exception(ex);
580
581 return std::move(*result);
582}
583
597inline void sync_wait(task<void> t)
598{
599 std::exception_ptr ex;
600
601 auto wrapper = [&ex](task<void> inner) -> detail::sync_wait_task {
602 try
603 {
604 co_await inner;
605 }
606 catch (...)
607 {
608 ex = std::current_exception();
609 }
610 };
611
612 auto sw = wrapper(std::move(t));
613 sw.start();
614 sw.wait();
615 sw.rethrow();
616
617 if (ex)
618 std::rethrow_exception(ex);
619}
620
621// ---------------------------------------------------------------------------
622// schedule_on awaitable
623// ---------------------------------------------------------------------------
624
667template <typename Pool>
668struct schedule_on
669{
670 Pool& pool;
671
672 [[nodiscard]] auto await_ready() const noexcept -> bool { return false; }
673
674 void await_suspend(std::coroutine_handle<> h) const
675 {
676 pool.submit([h]() mutable { h.resume(); });
677 }
678
679 void await_resume() const noexcept {}
680};
681
682// ---------------------------------------------------------------------------
683// run_on convenience
684// ---------------------------------------------------------------------------
685
706template <typename Pool, typename F>
707auto run_on(Pool& pool, F&& coro_fn)
708 -> std::future<decltype(sync_wait(std::declval<std::invoke_result_t<F>>()))>
709{
710 return pool.submit([fn = std::forward<F>(coro_fn)]() mutable {
711 return sync_wait(fn());
712 });
713}
714
715} // namespace threadschedule
716
717#endif // __cpp_impl_coroutine