19class ScheduledTaskHandle
22 explicit ScheduledTaskHandle(uint64_t
id) : id_(
id), cancelled_(std::make_shared<std::atomic<bool>>(
false))
28 cancelled_->store(
true, std::memory_order_release);
31 [[nodiscard]]
auto is_cancelled()
const ->
bool
33 return cancelled_->load(std::memory_order_acquire);
36 [[nodiscard]]
auto id()
const -> uint64_t
43 std::shared_ptr<std::atomic<bool>> cancelled_;
46 friend class ScheduledThreadPoolT;
47 [[nodiscard]]
auto get_cancel_flag()
const -> std::shared_ptr<std::atomic<bool>>
69 using Task = std::function<void()>;
70 using TimePoint = std::chrono::steady_clock::time_point;
71 using Duration = std::chrono::steady_clock::duration;
79 std::shared_ptr<std::atomic<bool>> cancelled;
88 : pool_(worker_threads), stop_(false), next_task_id_(1)
90 scheduler_thread_ = std::thread(&ScheduledThreadPoolT::scheduler_loop,
this);
109 auto run_time = std::chrono::steady_clock::now() + delay;
121 std::lock_guard<std::mutex> lock(mutex_);
123 uint64_t
const task_id = next_task_id_++;
126 ScheduledTaskInfo info;
128 info.next_run = time_point;
129 info.interval = Duration::zero();
130 info.task = std::move(task);
131 info.cancelled = handle.get_cancel_flag();
132 info.periodic =
false;
134 scheduled_tasks_.insert({time_point, std::move(info)});
135 condition_.notify_one();
163 std::lock_guard<std::mutex> lock(mutex_);
165 uint64_t
const task_id = next_task_id_++;
168 ScheduledTaskInfo info;
170 info.next_run = std::chrono::steady_clock::now() + initial_delay;
171 info.interval = interval;
172 info.task = std::move(task);
173 info.cancelled = handle.get_cancel_flag();
174 info.periodic =
true;
176 scheduled_tasks_.insert({info.next_run, std::move(info)});
177 condition_.notify_one();
198 std::lock_guard<std::mutex> lock(mutex_);
199 return scheduled_tasks_.size();
216 std::lock_guard<std::mutex> lock(mutex_);
222 condition_.notify_one();
224 if (scheduler_thread_.joinable())
226 scheduler_thread_.join();
239 auto configure_threads(std::string
const& name_prefix, SchedulingPolicy policy = SchedulingPolicy::OTHER,
242 return pool_.configure_threads(name_prefix, policy, priority);
247 std::thread scheduler_thread_;
249 mutable std::mutex mutex_;
250 std::condition_variable condition_;
251 std::atomic<bool> stop_;
253 std::multimap<TimePoint, ScheduledTaskInfo> scheduled_tasks_;
254 std::atomic<uint64_t> next_task_id_;
256 void scheduler_loop()
260 std::unique_lock<std::mutex> lock(mutex_);
263 if (scheduled_tasks_.empty())
265 condition_.wait(lock, [
this] {
return stop_ || !scheduled_tasks_.empty(); });
272 auto const now = std::chrono::steady_clock::now();
273 auto it = scheduled_tasks_.begin();
275 if (it == scheduled_tasks_.end())
283 condition_.wait_until(lock, it->first, [
this] { return stop_.load(); });
293 scheduled_tasks_.erase(it);
296 if (info.cancelled->load(std::memory_order_acquire))
305 auto task_copy = info.task;
306 auto cancelled_flag = info.cancelled;
308 pool_.submit([task_copy, cancelled_flag]() {
309 if (!cancelled_flag->load(std::memory_order_acquire))
316 if (info.periodic && !info.cancelled->load(std::memory_order_acquire))
318 info.next_run += info.interval;
319 scheduled_tasks_.insert({info.next_run, std::move(info)});