ThreadSchedule 1.0.0
Modern C++ thread management library
Loading...
Searching...
No Matches
scheduled_pool.hpp
1#pragma once
2
3#include "expected.hpp"
4#include "thread_pool.hpp"
5#include <atomic>
6#include <chrono>
7#include <functional>
8#include <map>
9#include <memory>
10#include <mutex>
11#include <thread>
12
13namespace threadschedule
14{
15
19class ScheduledTaskHandle
20{
21 public:
22 explicit ScheduledTaskHandle(uint64_t id) : id_(id), cancelled_(std::make_shared<std::atomic<bool>>(false))
23 {
24 }
25
26 void cancel()
27 {
28 cancelled_->store(true, std::memory_order_release);
29 }
30
31 [[nodiscard]] auto is_cancelled() const -> bool
32 {
33 return cancelled_->load(std::memory_order_acquire);
34 }
35
36 [[nodiscard]] auto id() const -> uint64_t
37 {
38 return id_;
39 }
40
41 private:
42 uint64_t id_;
43 std::shared_ptr<std::atomic<bool>> cancelled_;
44
45 template <typename>
46 friend class ScheduledThreadPoolT;
47 [[nodiscard]] auto get_cancel_flag() const -> std::shared_ptr<std::atomic<bool>>
48 {
49 return cancelled_;
50 }
51};
52
65template <typename PoolType = ThreadPool>
67{
68 public:
69 using Task = std::function<void()>;
70 using TimePoint = std::chrono::steady_clock::time_point;
71 using Duration = std::chrono::steady_clock::duration;
72
74 {
75 uint64_t id;
76 TimePoint next_run;
77 Duration interval; // Zero for one-time tasks
78 Task task;
79 std::shared_ptr<std::atomic<bool>> cancelled;
80 bool periodic;
81 };
82
87 explicit ScheduledThreadPoolT(size_t worker_threads = std::thread::hardware_concurrency())
88 : pool_(worker_threads), stop_(false), next_task_id_(1)
89 {
90 scheduler_thread_ = std::thread(&ScheduledThreadPoolT::scheduler_loop, this);
91 }
92
94 auto operator=(ScheduledThreadPoolT const&) -> ScheduledThreadPoolT& = delete;
95
97 {
98 shutdown();
99 }
100
107 auto schedule_after(Duration delay, Task task) -> ScheduledTaskHandle
108 {
109 auto run_time = std::chrono::steady_clock::now() + delay;
110 return schedule_at(run_time, std::move(task));
111 }
112
119 auto schedule_at(TimePoint time_point, Task task) -> ScheduledTaskHandle
120 {
121 std::lock_guard<std::mutex> lock(mutex_);
122
123 uint64_t const task_id = next_task_id_++;
124 ScheduledTaskHandle handle(task_id);
125
126 ScheduledTaskInfo info;
127 info.id = task_id;
128 info.next_run = time_point;
129 info.interval = Duration::zero();
130 info.task = std::move(task);
131 info.cancelled = handle.get_cancel_flag();
132 info.periodic = false;
133
134 scheduled_tasks_.insert({time_point, std::move(info)});
135 condition_.notify_one();
136
137 return handle;
138 }
139
149 auto schedule_periodic(Duration interval, Task task) -> ScheduledTaskHandle
150 {
151 return schedule_periodic_after(Duration::zero(), interval, std::move(task));
152 }
153
161 auto schedule_periodic_after(Duration initial_delay, Duration interval, Task task) -> ScheduledTaskHandle
162 {
163 std::lock_guard<std::mutex> lock(mutex_);
164
165 uint64_t const task_id = next_task_id_++;
166 ScheduledTaskHandle handle(task_id);
167
168 ScheduledTaskInfo info;
169 info.id = task_id;
170 info.next_run = std::chrono::steady_clock::now() + initial_delay;
171 info.interval = interval;
172 info.task = std::move(task);
173 info.cancelled = handle.get_cancel_flag();
174 info.periodic = true;
175
176 scheduled_tasks_.insert({info.next_run, std::move(info)});
177 condition_.notify_one();
178
179 return handle;
180 }
181
188 static void cancel(ScheduledTaskHandle& handle)
189 {
190 handle.cancel();
191 }
192
196 [[nodiscard]] auto scheduled_count() const -> size_t
197 {
198 std::lock_guard<std::mutex> lock(mutex_);
199 return scheduled_tasks_.size();
200 }
201
205 [[nodiscard]] auto thread_pool() -> PoolType&
206 {
207 return pool_;
208 }
209
213 void shutdown()
214 {
215 {
216 std::lock_guard<std::mutex> lock(mutex_);
217 if (stop_)
218 return;
219 stop_ = true;
220 }
221
222 condition_.notify_one();
223
224 if (scheduler_thread_.joinable())
225 {
226 scheduler_thread_.join();
227 }
228
229 pool_.shutdown();
230 }
231
239 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
240 ThreadPriority priority = ThreadPriority::normal())
241 {
242 return pool_.configure_threads(name_prefix, policy, priority);
243 }
244
245 private:
246 PoolType pool_;
247 std::thread scheduler_thread_;
248
249 mutable std::mutex mutex_;
250 std::condition_variable condition_;
251 std::atomic<bool> stop_;
252
253 std::multimap<TimePoint, ScheduledTaskInfo> scheduled_tasks_;
254 std::atomic<uint64_t> next_task_id_;
255
256 void scheduler_loop()
257 {
258 while (true)
259 {
260 std::unique_lock<std::mutex> lock(mutex_);
261
262 // Wait until we have tasks or need to stop
263 if (scheduled_tasks_.empty())
264 {
265 condition_.wait(lock, [this] { return stop_ || !scheduled_tasks_.empty(); });
266
267 if (stop_)
268 return;
269 }
270
271 // Get the next task to execute
272 auto const now = std::chrono::steady_clock::now();
273 auto it = scheduled_tasks_.begin();
274
275 if (it == scheduled_tasks_.end())
276 {
277 continue;
278 }
279
280 // Wait until it's time to execute
281 if (it->first > now)
282 {
283 condition_.wait_until(lock, it->first, [this] { return stop_.load(); });
284
285 if (stop_)
286 return;
287
288 continue;
289 }
290
291 // Extract the task info
292 ScheduledTaskInfo info = std::move(it->second);
293 scheduled_tasks_.erase(it);
294
295 // Check if cancelled
296 if (info.cancelled->load(std::memory_order_acquire))
297 {
298 continue;
299 }
300
301 // Schedule for execution in the thread pool
302 try
303 {
304 // Capture task and periodic info
305 auto task_copy = info.task;
306 auto cancelled_flag = info.cancelled;
307
308 pool_.submit([task_copy, cancelled_flag]() {
309 if (!cancelled_flag->load(std::memory_order_acquire))
310 {
311 task_copy();
312 }
313 });
314
315 // Reschedule if periodic
316 if (info.periodic && !info.cancelled->load(std::memory_order_acquire))
317 {
318 info.next_run += info.interval;
319 scheduled_tasks_.insert({info.next_run, std::move(info)});
320 }
321 }
322 catch (...)
323 {
324 // Thread pool might be shutting down
325 }
326 }
327 }
328};
329
330// Convenience aliases
331using ScheduledThreadPool = ScheduledThreadPoolT<ThreadPool>;
332using ScheduledHighPerformancePool = ScheduledThreadPoolT<HighPerformancePool>;
333using ScheduledFastThreadPool = ScheduledThreadPoolT<FastThreadPool>;
334
335} // namespace threadschedule
Handle for scheduled tasks that can be used to cancel them.
Thread pool with support for scheduled and periodic tasks.
auto scheduled_count() const -> size_t
Get number of scheduled tasks (including periodic)
void shutdown()
Shutdown the scheduler and wait for completion.
auto thread_pool() -> PoolType &
Get the underlying thread pool for direct task submission.
ScheduledThreadPoolT(size_t worker_threads=std::thread::hardware_concurrency())
Create a scheduled thread pool.
static void cancel(ScheduledTaskHandle &handle)
Cancel a scheduled task by handle.
auto schedule_periodic_after(Duration initial_delay, Duration interval, Task task) -> ScheduledTaskHandle
Schedule a task to run periodically after an initial delay.
auto schedule_after(Duration delay, Task task) -> ScheduledTaskHandle
Schedule a task to run after a delay.
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal())
Configure worker threads.
auto schedule_at(TimePoint time_point, Task task) -> ScheduledTaskHandle
Schedule a task to run at a specific time point.
auto schedule_periodic(Duration interval, Task task) -> ScheduledTaskHandle
Schedule a task to run periodically at fixed intervals.
Thread priority wrapper with validation.