42 cancelled_->store(
true, std::memory_order_release);
47 return cancelled_->load(std::memory_order_acquire);
50 [[nodiscard]]
auto id() const noexcept -> uint64_t
57 std::shared_ptr<std::atomic<bool>> cancelled_;
61 [[nodiscard]]
auto get_cancel_flag() const -> std::shared_ptr<std::atomic<
bool>>
136template <
typename PoolType = ThreadPool>
140 using Task = std::function<void()>;
141 using TimePoint = std::chrono::steady_clock::time_point;
142 using Duration = std::chrono::steady_clock::duration;
159 : pool_(worker_threads), stop_(false), next_task_id_(1)
161 scheduler_thread_ = std::thread(&ScheduledThreadPoolT::scheduler_loop,
this);
180 auto run_time = std::chrono::steady_clock::now() + delay;
192 return insert_task(time_point, Duration::zero(), std::move(task),
false);
218 auto const run_time = std::chrono::steady_clock::now() + initial_delay;
219 return insert_task(run_time, interval, std::move(task),
true);
238 std::lock_guard<std::mutex> lock(mutex_);
239 return scheduled_tasks_.size();
256 std::lock_guard<std::mutex> lock(mutex_);
262 condition_.notify_one();
264 if (scheduler_thread_.joinable())
266 scheduler_thread_.join();
280 return pool_.configure_threads(name_prefix, policy, priority);
285 std::thread scheduler_thread_;
287 mutable std::mutex mutex_;
288 std::condition_variable condition_;
289 std::atomic<bool> stop_;
291 std::multimap<TimePoint, ScheduledTaskInfo> scheduled_tasks_;
292 std::atomic<uint64_t> next_task_id_;
296 std::lock_guard<std::mutex> lock(mutex_);
298 uint64_t
const task_id = next_task_id_++;
301 ScheduledTaskInfo info;
303 info.next_run = run_time;
304 info.interval = interval;
305 info.task = std::move(task);
306 info.cancelled = handle.get_cancel_flag();
307 info.periodic = periodic;
309 scheduled_tasks_.insert({run_time, std::move(info)});
310 condition_.notify_one();
315 void scheduler_loop()
319 std::unique_lock<std::mutex> lock(mutex_);
322 if (scheduled_tasks_.empty())
324 condition_.wait(lock, [
this] {
return stop_ || !scheduled_tasks_.empty(); });
331 auto const now = std::chrono::steady_clock::now();
332 auto it = scheduled_tasks_.begin();
334 if (it == scheduled_tasks_.end())
342 condition_.wait_until(lock, it->first, [
this] { return stop_.load(); });
352 scheduled_tasks_.erase(it);
355 if (info.cancelled->load(std::memory_order_acquire))
364 auto task_copy = info.task;
365 auto cancelled_flag = info.cancelled;
367 pool_.post([task_copy, cancelled_flag]() {
368 if (!cancelled_flag->load(std::memory_order_acquire))
375 if (info.periodic && !info.cancelled->load(std::memory_order_acquire))
377 info.next_run += info.interval;
378 scheduled_tasks_.insert({info.next_run, std::move(info)});
Copyable handle for a cancellable scheduled task.
auto id() const noexcept -> uint64_t
auto is_cancelled() const noexcept -> bool
friend class ScheduledThreadPoolT
ScheduledTaskHandle(uint64_t id)
Thread pool augmented with delayed and periodic task scheduling.
auto scheduled_count() const -> size_t
Get number of scheduled tasks (including periodic)
std::function< void()> Task
void shutdown()
Shutdown the scheduler and wait for completion.
auto thread_pool() -> PoolType &
Get the underlying thread pool for direct task submission.
auto operator=(ScheduledThreadPoolT const &) -> ScheduledThreadPoolT &=delete
ScheduledThreadPoolT(size_t worker_threads=std::thread::hardware_concurrency())
Create a scheduled thread pool.
static void cancel(ScheduledTaskHandle &handle)
Cancel a scheduled task by handle.
auto schedule_periodic_after(Duration initial_delay, Duration interval, Task task) -> ScheduledTaskHandle
Schedule a task to run periodically after an initial delay.
std::chrono::steady_clock::time_point TimePoint
std::chrono::steady_clock::duration Duration
ScheduledThreadPoolT(ScheduledThreadPoolT const &)=delete
auto schedule_after(Duration delay, Task task) -> ScheduledTaskHandle
Schedule a task to run after a delay.
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal())
Configure worker threads.
auto schedule_at(TimePoint time_point, Task task) -> ScheduledTaskHandle
Schedule a task to run at a specific time point.
auto schedule_periodic(Duration interval, Task task) -> ScheduledTaskHandle
Schedule a task to run periodically at fixed intervals.
Value-semantic wrapper for a thread scheduling priority.
static constexpr auto normal() noexcept -> ThreadPriority
Polyfill for std::expected (C++23) for pre-C++23 compilers.
SchedulingPolicy
Enumeration of available thread scheduling policies.
@ OTHER
Standard round-robin time-sharing.
ScheduledThreadPoolT< ThreadPool > ScheduledThreadPool
ScheduledThreadPoolT using the default ThreadPool backend.
ScheduledThreadPoolT< FastThreadPool > ScheduledFastThreadPool
ScheduledThreadPoolT using FastThreadPool as backend.
ScheduledThreadPoolT< LightweightPool > ScheduledLightweightPool
ScheduledThreadPoolT using LightweightPool as backend (minimal overhead).
ScheduledThreadPoolT< HighPerformancePool > ScheduledHighPerformancePool
ScheduledThreadPoolT using HighPerformancePool as backend.
constexpr bool Duration
Pre-C++20 fallback for Duration (constexpr bool).
std::shared_ptr< std::atomic< bool > > cancelled
Thread pools: HighPerformancePool, ThreadPoolBase, LightweightPoolT, and GlobalPool.