ThreadSchedule 2.0.0
Modern C++ thread management library
Loading...
Searching...
No Matches
scheduled_pool.hpp
Go to the documentation of this file.
1#pragma once
2
7
8#include "expected.hpp"
9#include "thread_pool.hpp"
10#include <atomic>
11#include <chrono>
12#include <functional>
13#include <map>
14#include <memory>
15#include <mutex>
16#include <thread>
17
18namespace threadschedule
19{
20
34{
35 public:
36 explicit ScheduledTaskHandle(uint64_t id) : id_(id), cancelled_(std::make_shared<std::atomic<bool>>(false))
37 {
38 }
39
40 void cancel()
41 {
42 cancelled_->store(true, std::memory_order_release);
43 }
44
45 [[nodiscard]] auto is_cancelled() const noexcept -> bool
46 {
47 return cancelled_->load(std::memory_order_acquire);
48 }
49
50 [[nodiscard]] auto id() const noexcept -> uint64_t
51 {
52 return id_;
53 }
54
55 private:
56 uint64_t id_;
57 std::shared_ptr<std::atomic<bool>> cancelled_;
58
59 template <typename>
61 [[nodiscard]] auto get_cancel_flag() const -> std::shared_ptr<std::atomic<bool>>
62 {
63 return cancelled_;
64 }
65};
66
136template <typename PoolType = ThreadPool>
138{
139 public:
140 using Task = std::function<void()>;
141 using TimePoint = std::chrono::steady_clock::time_point;
142 using Duration = std::chrono::steady_clock::duration;
143
145 {
146 uint64_t id;
148 Duration interval; // Zero for one-time tasks
150 std::shared_ptr<std::atomic<bool>> cancelled;
152 };
153
158 explicit ScheduledThreadPoolT(size_t worker_threads = std::thread::hardware_concurrency())
159 : pool_(worker_threads), stop_(false), next_task_id_(1)
160 {
161 scheduler_thread_ = std::thread(&ScheduledThreadPoolT::scheduler_loop, this);
162 }
163
166
168 {
169 shutdown();
170 }
171
179 {
180 auto run_time = std::chrono::steady_clock::now() + delay;
181 return schedule_at(run_time, std::move(task));
182 }
183
191 {
192 return insert_task(time_point, Duration::zero(), std::move(task), false);
193 }
194
205 {
206 return schedule_periodic_after(Duration::zero(), interval, std::move(task));
207 }
208
216 auto schedule_periodic_after(Duration initial_delay, Duration interval, Task task) -> ScheduledTaskHandle
217 {
218 auto const run_time = std::chrono::steady_clock::now() + initial_delay;
219 return insert_task(run_time, interval, std::move(task), true);
220 }
221
228 static void cancel(ScheduledTaskHandle& handle)
229 {
230 handle.cancel();
231 }
232
236 [[nodiscard]] auto scheduled_count() const -> size_t
237 {
238 std::lock_guard<std::mutex> lock(mutex_);
239 return scheduled_tasks_.size();
240 }
241
245 [[nodiscard]] auto thread_pool() -> PoolType&
246 {
247 return pool_;
248 }
249
253 void shutdown()
254 {
255 {
256 std::lock_guard<std::mutex> lock(mutex_);
257 if (stop_)
258 return;
259 stop_ = true;
260 }
261
262 condition_.notify_one();
263
264 if (scheduler_thread_.joinable())
265 {
266 scheduler_thread_.join();
267 }
268
269 pool_.shutdown();
270 }
271
277 auto configure_threads(std::string const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
279 {
280 return pool_.configure_threads(name_prefix, policy, priority);
281 }
282
283 private:
284 PoolType pool_;
285 std::thread scheduler_thread_;
286
287 mutable std::mutex mutex_;
288 std::condition_variable condition_;
289 std::atomic<bool> stop_;
290
291 std::multimap<TimePoint, ScheduledTaskInfo> scheduled_tasks_;
292 std::atomic<uint64_t> next_task_id_;
293
294 auto insert_task(TimePoint run_time, Duration interval, Task task, bool periodic) -> ScheduledTaskHandle
295 {
296 std::lock_guard<std::mutex> lock(mutex_);
297
298 uint64_t const task_id = next_task_id_++;
299 ScheduledTaskHandle handle(task_id);
300
301 ScheduledTaskInfo info;
302 info.id = task_id;
303 info.next_run = run_time;
304 info.interval = interval;
305 info.task = std::move(task);
306 info.cancelled = handle.get_cancel_flag();
307 info.periodic = periodic;
308
309 scheduled_tasks_.insert({run_time, std::move(info)});
310 condition_.notify_one();
311
312 return handle;
313 }
314
315 void scheduler_loop()
316 {
317 while (true)
318 {
319 std::unique_lock<std::mutex> lock(mutex_);
320
321 // Wait until we have tasks or need to stop
322 if (scheduled_tasks_.empty())
323 {
324 condition_.wait(lock, [this] { return stop_ || !scheduled_tasks_.empty(); });
325
326 if (stop_)
327 return;
328 }
329
330 // Get the next task to execute
331 auto const now = std::chrono::steady_clock::now();
332 auto it = scheduled_tasks_.begin();
333
334 if (it == scheduled_tasks_.end())
335 {
336 continue;
337 }
338
339 // Wait until it's time to execute
340 if (it->first > now)
341 {
342 condition_.wait_until(lock, it->first, [this] { return stop_.load(); });
343
344 if (stop_)
345 return;
346
347 continue;
348 }
349
350 // Extract the task info
351 ScheduledTaskInfo info = std::move(it->second);
352 scheduled_tasks_.erase(it);
353
354 // Check if cancelled
355 if (info.cancelled->load(std::memory_order_acquire))
356 {
357 continue;
358 }
359
360 // Schedule for execution in the thread pool
361 try
362 {
363 // Capture task and periodic info
364 auto task_copy = info.task;
365 auto cancelled_flag = info.cancelled;
366
367 pool_.post([task_copy, cancelled_flag]() {
368 if (!cancelled_flag->load(std::memory_order_acquire))
369 {
370 task_copy();
371 }
372 });
373
374 // Reschedule if periodic
375 if (info.periodic && !info.cancelled->load(std::memory_order_acquire))
376 {
377 info.next_run += info.interval;
378 scheduled_tasks_.insert({info.next_run, std::move(info)});
379 }
380 }
381 catch (...)
382 {
383 // Thread pool might be shutting down
384 }
385 }
386 }
387};
388
397
398} // namespace threadschedule
Copyable handle for a cancellable scheduled task.
auto id() const noexcept -> uint64_t
auto is_cancelled() const noexcept -> bool
Thread pool augmented with delayed and periodic task scheduling.
auto scheduled_count() const -> size_t
Get number of scheduled tasks (including periodic)
void shutdown()
Shutdown the scheduler and wait for completion.
auto thread_pool() -> PoolType &
Get the underlying thread pool for direct task submission.
auto operator=(ScheduledThreadPoolT const &) -> ScheduledThreadPoolT &=delete
ScheduledThreadPoolT(size_t worker_threads=std::thread::hardware_concurrency())
Create a scheduled thread pool.
static void cancel(ScheduledTaskHandle &handle)
Cancel a scheduled task by handle.
auto schedule_periodic_after(Duration initial_delay, Duration interval, Task task) -> ScheduledTaskHandle
Schedule a task to run periodically after an initial delay.
std::chrono::steady_clock::time_point TimePoint
std::chrono::steady_clock::duration Duration
ScheduledThreadPoolT(ScheduledThreadPoolT const &)=delete
auto schedule_after(Duration delay, Task task) -> ScheduledTaskHandle
Schedule a task to run after a delay.
auto configure_threads(std::string const &name_prefix, SchedulingPolicy policy=SchedulingPolicy::OTHER, ThreadPriority priority=ThreadPriority::normal())
Configure worker threads.
auto schedule_at(TimePoint time_point, Task task) -> ScheduledTaskHandle
Schedule a task to run at a specific time point.
auto schedule_periodic(Duration interval, Task task) -> ScheduledTaskHandle
Schedule a task to run periodically at fixed intervals.
Value-semantic wrapper for a thread scheduling priority.
static constexpr auto normal() noexcept -> ThreadPriority
Polyfill for std::expected (C++23) for pre-C++23 compilers.
SchedulingPolicy
Enumeration of available thread scheduling policies.
@ OTHER
Standard round-robin time-sharing.
ScheduledThreadPoolT< ThreadPool > ScheduledThreadPool
ScheduledThreadPoolT using the default ThreadPool backend.
ScheduledThreadPoolT< FastThreadPool > ScheduledFastThreadPool
ScheduledThreadPoolT using FastThreadPool as backend.
ScheduledThreadPoolT< LightweightPool > ScheduledLightweightPool
ScheduledThreadPoolT using LightweightPool as backend (minimal overhead).
ScheduledThreadPoolT< HighPerformancePool > ScheduledHighPerformancePool
ScheduledThreadPoolT using HighPerformancePool as backend.
constexpr bool Duration
Pre-C++20 fallback for Duration (constexpr bool).
Definition concepts.hpp:115
std::shared_ptr< std::atomic< bool > > cancelled
Thread pools: HighPerformancePool, ThreadPoolBase, LightweightPoolT, and GlobalPool.