4#include "thread_pool.hpp"
13namespace threadschedule
28class ScheduledTaskHandle
31 explicit ScheduledTaskHandle(uint64_t
id) : id_(
id), cancelled_(std::make_shared<std::atomic<bool>>(
false))
37 cancelled_->store(
true, std::memory_order_release);
40 [[nodiscard]]
auto is_cancelled()
const noexcept ->
bool
42 return cancelled_->load(std::memory_order_acquire);
45 [[nodiscard]]
auto id()
const noexcept -> uint64_t
52 std::shared_ptr<std::atomic<bool>> cancelled_;
55 friend class ScheduledThreadPoolT;
56 [[nodiscard]]
auto get_cancel_flag()
const -> std::shared_ptr<std::atomic<bool>>
131template <
typename PoolType = ThreadPool>
135 using Task = std::function<void()>;
136 using TimePoint = std::chrono::steady_clock::time_point;
137 using Duration = std::chrono::steady_clock::duration;
145 std::shared_ptr<std::atomic<bool>> cancelled;
154 : pool_(worker_threads), stop_(false), next_task_id_(1)
156 scheduler_thread_ = std::thread(&ScheduledThreadPoolT::scheduler_loop,
this);
175 auto run_time = std::chrono::steady_clock::now() + delay;
187 std::lock_guard<std::mutex> lock(mutex_);
189 uint64_t
const task_id = next_task_id_++;
192 ScheduledTaskInfo info;
194 info.next_run = time_point;
195 info.interval = Duration::zero();
196 info.task = std::move(task);
197 info.cancelled = handle.get_cancel_flag();
198 info.periodic =
false;
200 scheduled_tasks_.insert({time_point, std::move(info)});
201 condition_.notify_one();
229 std::lock_guard<std::mutex> lock(mutex_);
231 uint64_t
const task_id = next_task_id_++;
234 ScheduledTaskInfo info;
236 info.next_run = std::chrono::steady_clock::now() + initial_delay;
237 info.interval = interval;
238 info.task = std::move(task);
239 info.cancelled = handle.get_cancel_flag();
240 info.periodic =
true;
242 scheduled_tasks_.insert({info.next_run, std::move(info)});
243 condition_.notify_one();
264 std::lock_guard<std::mutex> lock(mutex_);
265 return scheduled_tasks_.size();
282 std::lock_guard<std::mutex> lock(mutex_);
288 condition_.notify_one();
290 if (scheduler_thread_.joinable())
292 scheduler_thread_.join();
305 auto configure_threads(std::string
const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
308 return pool_.configure_threads(name_prefix, policy, priority);
313 std::thread scheduler_thread_;
315 mutable std::mutex mutex_;
316 std::condition_variable condition_;
317 std::atomic<bool> stop_;
319 std::multimap<TimePoint, ScheduledTaskInfo> scheduled_tasks_;
320 std::atomic<uint64_t> next_task_id_;
322 void scheduler_loop()
326 std::unique_lock<std::mutex> lock(mutex_);
329 if (scheduled_tasks_.empty())
331 condition_.wait(lock, [
this] {
return stop_ || !scheduled_tasks_.empty(); });
338 auto const now = std::chrono::steady_clock::now();
339 auto it = scheduled_tasks_.begin();
341 if (it == scheduled_tasks_.end())
349 condition_.wait_until(lock, it->first, [
this] { return stop_.load(); });
359 scheduled_tasks_.erase(it);
362 if (info.cancelled->load(std::memory_order_acquire))
371 auto task_copy = info.task;
372 auto cancelled_flag = info.cancelled;
374 pool_.submit([task_copy, cancelled_flag]() {
375 if (!cancelled_flag->load(std::memory_order_acquire))
382 if (info.periodic && !info.cancelled->load(std::memory_order_acquire))
384 info.next_run += info.interval;
385 scheduled_tasks_.insert({info.next_run, std::move(info)});
Copyable handle for a cancellable scheduled task.
Thread pool augmented with delayed and periodic task scheduling.
auto scheduled_count() const -> size_t
Get number of scheduled tasks (including periodic)
void shutdown()
Shutdown the scheduler and wait for completion.
auto thread_pool() -> PoolType &
Get the underlying thread pool for direct task submission.
ScheduledThreadPoolT(size_t worker_threads=std::thread::hardware_concurrency())
Create a scheduled thread pool.
static void cancel(ScheduledTaskHandle &handle)
Cancel a scheduled task by handle.
auto schedule_periodic_after(Duration initial_delay, Duration interval, Task task) -> ScheduledTaskHandle
Schedule a task to run periodically after an initial delay.
auto schedule_after(Duration delay, Task task) -> ScheduledTaskHandle
Schedule a task to run after a delay.
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal())
Configure worker threads.
auto schedule_at(TimePoint time_point, Task task) -> ScheduledTaskHandle
Schedule a task to run at a specific time point.
auto schedule_periodic(Duration interval, Task task) -> ScheduledTaskHandle
Schedule a task to run periodically at fixed intervals.
Value-semantic wrapper for a thread scheduling priority.
Polyfill for std::expected (C++23) for pre-C++23 compilers.