ThreadSchedule 1.0.0
Modern C++ thread management library
Loading...
Searching...
No Matches
scheduled_pool.hpp
1#pragma once
2
3#include "expected.hpp"
4#include "thread_pool.hpp"
5#include <atomic>
6#include <chrono>
7#include <functional>
8#include <map>
9#include <memory>
10#include <mutex>
11#include <thread>
12
13namespace threadschedule
14{
15
28class ScheduledTaskHandle
29{
30 public:
31 explicit ScheduledTaskHandle(uint64_t id) : id_(id), cancelled_(std::make_shared<std::atomic<bool>>(false))
32 {
33 }
34
35 void cancel()
36 {
37 cancelled_->store(true, std::memory_order_release);
38 }
39
40 [[nodiscard]] auto is_cancelled() const noexcept -> bool
41 {
42 return cancelled_->load(std::memory_order_acquire);
43 }
44
45 [[nodiscard]] auto id() const noexcept -> uint64_t
46 {
47 return id_;
48 }
49
50 private:
51 uint64_t id_;
52 std::shared_ptr<std::atomic<bool>> cancelled_;
53
54 template <typename>
55 friend class ScheduledThreadPoolT;
56 [[nodiscard]] auto get_cancel_flag() const -> std::shared_ptr<std::atomic<bool>>
57 {
58 return cancelled_;
59 }
60};
61
131template <typename PoolType = ThreadPool>
133{
134 public:
135 using Task = std::function<void()>;
136 using TimePoint = std::chrono::steady_clock::time_point;
137 using Duration = std::chrono::steady_clock::duration;
138
140 {
141 uint64_t id;
142 TimePoint next_run;
143 Duration interval; // Zero for one-time tasks
144 Task task;
145 std::shared_ptr<std::atomic<bool>> cancelled;
146 bool periodic;
147 };
148
153 explicit ScheduledThreadPoolT(size_t worker_threads = std::thread::hardware_concurrency())
154 : pool_(worker_threads), stop_(false), next_task_id_(1)
155 {
156 scheduler_thread_ = std::thread(&ScheduledThreadPoolT::scheduler_loop, this);
157 }
158
160 auto operator=(ScheduledThreadPoolT const&) -> ScheduledThreadPoolT& = delete;
161
163 {
164 shutdown();
165 }
166
173 auto schedule_after(Duration delay, Task task) -> ScheduledTaskHandle
174 {
175 auto run_time = std::chrono::steady_clock::now() + delay;
176 return schedule_at(run_time, std::move(task));
177 }
178
185 auto schedule_at(TimePoint time_point, Task task) -> ScheduledTaskHandle
186 {
187 std::lock_guard<std::mutex> lock(mutex_);
188
189 uint64_t const task_id = next_task_id_++;
190 ScheduledTaskHandle handle(task_id);
191
192 ScheduledTaskInfo info;
193 info.id = task_id;
194 info.next_run = time_point;
195 info.interval = Duration::zero();
196 info.task = std::move(task);
197 info.cancelled = handle.get_cancel_flag();
198 info.periodic = false;
199
200 scheduled_tasks_.insert({time_point, std::move(info)});
201 condition_.notify_one();
202
203 return handle;
204 }
205
215 auto schedule_periodic(Duration interval, Task task) -> ScheduledTaskHandle
216 {
217 return schedule_periodic_after(Duration::zero(), interval, std::move(task));
218 }
219
227 auto schedule_periodic_after(Duration initial_delay, Duration interval, Task task) -> ScheduledTaskHandle
228 {
229 std::lock_guard<std::mutex> lock(mutex_);
230
231 uint64_t const task_id = next_task_id_++;
232 ScheduledTaskHandle handle(task_id);
233
234 ScheduledTaskInfo info;
235 info.id = task_id;
236 info.next_run = std::chrono::steady_clock::now() + initial_delay;
237 info.interval = interval;
238 info.task = std::move(task);
239 info.cancelled = handle.get_cancel_flag();
240 info.periodic = true;
241
242 scheduled_tasks_.insert({info.next_run, std::move(info)});
243 condition_.notify_one();
244
245 return handle;
246 }
247
254 static void cancel(ScheduledTaskHandle& handle)
255 {
256 handle.cancel();
257 }
258
262 [[nodiscard]] auto scheduled_count() const -> size_t
263 {
264 std::lock_guard<std::mutex> lock(mutex_);
265 return scheduled_tasks_.size();
266 }
267
271 [[nodiscard]] auto thread_pool() -> PoolType&
272 {
273 return pool_;
274 }
275
279 void shutdown()
280 {
281 {
282 std::lock_guard<std::mutex> lock(mutex_);
283 if (stop_)
284 return;
285 stop_ = true;
286 }
287
288 condition_.notify_one();
289
290 if (scheduler_thread_.joinable())
291 {
292 scheduler_thread_.join();
293 }
294
295 pool_.shutdown();
296 }
297
305 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
306 ThreadPriority priority = ThreadPriority::normal())
307 {
308 return pool_.configure_threads(name_prefix, policy, priority);
309 }
310
311 private:
312 PoolType pool_;
313 std::thread scheduler_thread_;
314
315 mutable std::mutex mutex_;
316 std::condition_variable condition_;
317 std::atomic<bool> stop_;
318
319 std::multimap<TimePoint, ScheduledTaskInfo> scheduled_tasks_;
320 std::atomic<uint64_t> next_task_id_;
321
322 void scheduler_loop()
323 {
324 while (true)
325 {
326 std::unique_lock<std::mutex> lock(mutex_);
327
328 // Wait until we have tasks or need to stop
329 if (scheduled_tasks_.empty())
330 {
331 condition_.wait(lock, [this] { return stop_ || !scheduled_tasks_.empty(); });
332
333 if (stop_)
334 return;
335 }
336
337 // Get the next task to execute
338 auto const now = std::chrono::steady_clock::now();
339 auto it = scheduled_tasks_.begin();
340
341 if (it == scheduled_tasks_.end())
342 {
343 continue;
344 }
345
346 // Wait until it's time to execute
347 if (it->first > now)
348 {
349 condition_.wait_until(lock, it->first, [this] { return stop_.load(); });
350
351 if (stop_)
352 return;
353
354 continue;
355 }
356
357 // Extract the task info
358 ScheduledTaskInfo info = std::move(it->second);
359 scheduled_tasks_.erase(it);
360
361 // Check if cancelled
362 if (info.cancelled->load(std::memory_order_acquire))
363 {
364 continue;
365 }
366
367 // Schedule for execution in the thread pool
368 try
369 {
370 // Capture task and periodic info
371 auto task_copy = info.task;
372 auto cancelled_flag = info.cancelled;
373
374 pool_.submit([task_copy, cancelled_flag]() {
375 if (!cancelled_flag->load(std::memory_order_acquire))
376 {
377 task_copy();
378 }
379 });
380
381 // Reschedule if periodic
382 if (info.periodic && !info.cancelled->load(std::memory_order_acquire))
383 {
384 info.next_run += info.interval;
385 scheduled_tasks_.insert({info.next_run, std::move(info)});
386 }
387 }
388 catch (...)
389 {
390 // Thread pool might be shutting down
391 }
392 }
393 }
394};
395
397using ScheduledThreadPool = ScheduledThreadPoolT<ThreadPool>;
399using ScheduledHighPerformancePool = ScheduledThreadPoolT<HighPerformancePool>;
401using ScheduledFastThreadPool = ScheduledThreadPoolT<FastThreadPool>;
402
403} // namespace threadschedule
Copyable handle for a cancellable scheduled task.
Thread pool augmented with delayed and periodic task scheduling.
auto scheduled_count() const -> size_t
Get number of scheduled tasks (including periodic)
void shutdown()
Shutdown the scheduler and wait for completion.
auto thread_pool() -> PoolType &
Get the underlying thread pool for direct task submission.
ScheduledThreadPoolT(size_t worker_threads=std::thread::hardware_concurrency())
Create a scheduled thread pool.
static void cancel(ScheduledTaskHandle &handle)
Cancel a scheduled task by handle.
auto schedule_periodic_after(Duration initial_delay, Duration interval, Task task) -> ScheduledTaskHandle
Schedule a task to run periodically after an initial delay.
auto schedule_after(Duration delay, Task task) -> ScheduledTaskHandle
Schedule a task to run after a delay.
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal())
Configure worker threads.
auto schedule_at(TimePoint time_point, Task task) -> ScheduledTaskHandle
Schedule a task to run at a specific time point.
auto schedule_periodic(Duration interval, Task task) -> ScheduledTaskHandle
Schedule a task to run periodically at fixed intervals.
Value-semantic wrapper for a thread scheduling priority.
Polyfill for std::expected (C++23) for pre-C++23 compilers.