include/boost/corosio/detail/timer_service.hpp

91.8% Lines (337/367) 97.6% List of functions (41/42)
timer_service.hpp
f(x) Functions (42)
Function Calls Lines Blocks
boost::corosio::detail::timer_service::callback::callback() :96 1244x 100.0% 100.0% boost::corosio::detail::timer_service::callback::callback(void*, void (*)(void*)) :99 1244x 100.0% 100.0% boost::corosio::detail::timer_service::callback::operator()() const :108 8687x 100.0% 100.0% boost::corosio::detail::timer_service::timer_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :140 1244x 100.0% 100.0% boost::corosio::detail::timer_service::get_scheduler() :146 17476x 100.0% 100.0% boost::corosio::detail::timer_service::~timer_service() :152 2488x 100.0% 100.0% boost::corosio::detail::timer_service::set_on_earliest_changed(boost::corosio::detail::timer_service::callback) :158 1244x 100.0% 100.0% boost::corosio::detail::timer_service::nearest_expiry() const :171 237189x 100.0% 73.0% boost::corosio::detail::timer_service::refresh_cached_nearest() :214 265909x 100.0% 70.0% boost::corosio::detail::waiter_node::completion_op::completion_op() :238 316x 100.0% 100.0% boost::corosio::detail::waiter_node::waiter_node() :264 316x 100.0% 100.0% boost::corosio::detail::try_pop_tl_cache(boost::corosio::detail::timer_service*) :301 8959x 87.5% 78.0% boost::corosio::detail::try_push_tl_cache(boost::corosio::detail::timer_service::implementation*) :316 8951x 100.0% 100.0% boost::corosio::detail::try_pop_waiter_tl_cache() :327 8742x 100.0% 100.0% boost::corosio::detail::try_push_waiter_tl_cache(boost::corosio::detail::waiter_node*) :339 8726x 100.0% 100.0% boost::corosio::detail::timer_service_invalidate_cache() :350 1244x 100.0% 100.0% boost::corosio::detail::timer_service::implementation::implementation(boost::corosio::detail::timer_service&) :361 397x 100.0% 100.0% boost::corosio::detail::timer_service::shutdown() :368 1244x 100.0% 82.0% boost::corosio::detail::timer_service::construct() :425 8959x 66.7% 76.0% boost::corosio::detail::timer_service::destroy(boost::corosio::io_object::implementation*) :454 8957x 100.0% 100.0% boost::corosio::detail::timer_service::destroy_impl(boost::corosio::detail::timer_service::implementation&) :460 8957x 73.3% 64.0% boost::corosio::detail::timer_service::create_waiter() :487 8742x 100.0% 87.0% boost::corosio::detail::timer_service::destroy_waiter(boost::corosio::detail::waiter_node*) :505 8726x 100.0% 100.0% boost::corosio::detail::timer_service::update_timer(boost::corosio::detail::timer_service::implementation&, std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long, std::ratio<1l, 1000000000l> > >) :516 8x 96.6% 81.0% boost::corosio::detail::timer_service::insert_waiter(boost::corosio::detail::timer_service::implementation&, boost::corosio::detail::waiter_node*) :566 8742x 100.0% 82.0% boost::corosio::detail::timer_service::cancel_timer(boost::corosio::detail::timer_service::implementation&) :586 8959x 87.5% 78.0% boost::corosio::detail::timer_service::cancel_waiter(boost::corosio::detail::waiter_node*) :626 36x 92.9% 80.0% boost::corosio::detail::timer_service::cancel_one_waiter(boost::corosio::detail::timer_service::implementation&) :649 2x 76.5% 70.0% boost::corosio::detail::timer_service::process_expired() :676 257117x 100.0% 91.0% boost::corosio::detail::timer_service::remove_timer_impl(boost::corosio::detail::timer_service::implementation&) :711 8712x 84.6% 65.0% boost::corosio::detail::timer_service::up_heap(unsigned long) :738 8726x 100.0% 100.0% boost::corosio::detail::timer_service::down_heap(unsigned long) :751 8435x 100.0% 100.0% boost::corosio::detail::timer_service::swap_heap(unsigned long, unsigned long) :771 16843x 100.0% 100.0% boost::corosio::detail::waiter_node::canceller::operator()() const :783 36x 100.0% 100.0% boost::corosio::detail::waiter_node::completion_op::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :789 0 0.0% 0.0% boost::corosio::detail::waiter_node::completion_op::operator()() :803 8726x 100.0% 100.0% boost::corosio::detail::waiter_node::completion_op::destroy() :828 8x 100.0% 100.0% boost::corosio::detail::timer_service::implementation::wait(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::capy::continuation*) :855 8742x 81.0% 74.0% boost::corosio::detail::timer_service_update_expiry(boost::corosio::io_timer::implementation&) :898 8x 100.0% 100.0% boost::corosio::detail::timer_service_cancel(boost::corosio::io_timer::implementation&) :905 8x 100.0% 100.0% boost::corosio::detail::timer_service_cancel_one(boost::corosio::io_timer::implementation&) :912 2x 100.0% 100.0% boost::corosio::detail::get_timer_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :919 1244x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13
14 #include <boost/corosio/timer.hpp>
15 #include <boost/corosio/detail/scheduler.hpp>
16 #include <boost/corosio/detail/scheduler_op.hpp>
17 #include <boost/corosio/detail/intrusive.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19 #include <boost/capy/error.hpp>
20 #include <boost/capy/ex/execution_context.hpp>
21 #include <boost/capy/ex/executor_ref.hpp>
22 #include <system_error>
23
24 #include <atomic>
25 #include <chrono>
26 #include <coroutine>
27 #include <cstddef>
28 #include <limits>
29 #include <mutex>
30 #include <optional>
31 #include <stop_token>
32 #include <utility>
33 #include <vector>
34
35 namespace boost::corosio::detail {
36
37 struct scheduler;
38
39 /*
40 Timer Service
41 =============
42
43 Data Structures
44 ---------------
45 waiter_node holds per-waiter state: coroutine handle, executor,
46 error output, stop_token, embedded completion_op. Each concurrent
47 co_await t.wait() allocates one waiter_node.
48
49 timer_service::implementation holds per-timer state: expiry,
50 heap index, and an intrusive_list of waiter_nodes. Multiple
51 coroutines can wait on the same timer simultaneously.
52
53 timer_service owns a min-heap of active timers, a free list
54 of recycled impls, and a free list of recycled waiter_nodes. The
55 heap is ordered by expiry time; the scheduler queries
56 nearest_expiry() to set the epoll/timerfd timeout.
57
58 Optimization Strategy
59 ---------------------
60 1. Deferred heap insertion — expires_after() stores the expiry
61 but does not insert into the heap. Insertion happens in wait().
62 2. Thread-local impl cache — single-slot per-thread cache.
63 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
64 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
65 5. might_have_pending_waits_ flag — skips lock when no wait issued.
66 6. Thread-local waiter cache — single-slot per-thread cache.
67
68 Concurrency
69 -----------
70 stop_token callbacks can fire from any thread. The impl_
71 pointer on waiter_node is used as a "still in list" marker.
72 */
73
74 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
75
76 inline void timer_service_invalidate_cache() noexcept;
77
78 // timer_service class body — member function definitions are
79 // out-of-class (after implementation and waiter_node are complete)
80 class BOOST_COROSIO_DECL timer_service final
81 : public capy::execution_context::service
82 , public io_object::io_service
83 {
84 public:
85 using clock_type = std::chrono::steady_clock;
86 using time_point = clock_type::time_point;
87
88 /// Type-erased callback for earliest-expiry-changed notifications.
89 class callback
90 {
91 void* ctx_ = nullptr;
92 void (*fn_)(void*) = nullptr;
93
94 public:
95 /// Construct an empty callback.
96 1244x callback() = default;
97
98 /// Construct a callback with the given context and function.
99 1244x callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
100
101 /// Return true if the callback is non-empty.
102 explicit operator bool() const noexcept
103 {
104 return fn_ != nullptr;
105 }
106
107 /// Invoke the callback.
108 8687x void operator()() const
109 {
110 8687x if (fn_)
111 8687x fn_(ctx_);
112 8687x }
113 };
114
115 struct implementation;
116
117 private:
118 struct heap_entry
119 {
120 time_point time_;
121 implementation* timer_;
122 };
123
124 scheduler* sched_ = nullptr;
125 BOOST_COROSIO_MSVC_WARNING_PUSH
126 BOOST_COROSIO_MSVC_WARNING_DISABLE(4251) // std:: members, dll-interface
127 mutable std::mutex mutex_;
128 std::vector<heap_entry> heap_;
129 implementation* free_list_ = nullptr;
130 waiter_node* waiter_free_list_ = nullptr;
131 callback on_earliest_changed_;
132 bool shutting_down_ = false;
133 // Avoids mutex in nearest_expiry() and empty()
134 mutable std::atomic<std::int64_t> cached_nearest_ns_{
135 (std::numeric_limits<std::int64_t>::max)()};
136 BOOST_COROSIO_MSVC_WARNING_POP
137
138 public:
139 /// Construct the timer service bound to a scheduler.
140 1244x inline timer_service(capy::execution_context&, scheduler& sched)
141 1244x : sched_(&sched)
142 {
143 1244x }
144
145 /// Return the associated scheduler.
146 17476x inline scheduler& get_scheduler() noexcept
147 {
148 17476x return *sched_;
149 }
150
151 /// Destroy the timer service.
152 2488x ~timer_service() override = default;
153
154 timer_service(timer_service const&) = delete;
155 timer_service& operator=(timer_service const&) = delete;
156
157 /// Register a callback invoked when the earliest expiry changes.
158 1244x inline void set_on_earliest_changed(callback cb)
159 {
160 1244x on_earliest_changed_ = cb;
161 1244x }
162
163 /// Return true if no timers are in the heap.
164 inline bool empty() const noexcept
165 {
166 return cached_nearest_ns_.load(std::memory_order_acquire) ==
167 (std::numeric_limits<std::int64_t>::max)();
168 }
169
170 /// Return the nearest timer expiry without acquiring the mutex.
171 237189x inline time_point nearest_expiry() const noexcept
172 {
173 237189x auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
174 237189x return time_point(time_point::duration(ns));
175 }
176
177 /// Cancel all pending timers and free cached resources.
178 inline void shutdown() override;
179
180 /// Construct a new timer implementation.
181 inline io_object::implementation* construct() override;
182
183 /// Destroy a timer implementation, cancelling pending waiters.
184 inline void destroy(io_object::implementation* p) override;
185
186 /// Cancel and recycle a timer implementation.
187 inline void destroy_impl(implementation& impl);
188
189 /// Create or recycle a waiter node.
190 inline waiter_node* create_waiter();
191
192 /// Return a waiter node to the cache or free list.
193 inline void destroy_waiter(waiter_node* w);
194
195 /// Update the timer expiry, cancelling existing waiters.
196 inline std::size_t update_timer(implementation& impl, time_point new_time);
197
198 /// Insert a waiter into the timer's waiter list and the heap.
199 inline void insert_waiter(implementation& impl, waiter_node* w);
200
201 /// Cancel all waiters on a timer.
202 inline std::size_t cancel_timer(implementation& impl);
203
204 /// Cancel a single waiter ( stop_token callback path ).
205 inline void cancel_waiter(waiter_node* w);
206
207 /// Cancel one waiter on a timer.
208 inline std::size_t cancel_one_waiter(implementation& impl);
209
210 /// Complete all waiters whose timers have expired.
211 inline std::size_t process_expired();
212
213 private:
214 265909x inline void refresh_cached_nearest() noexcept
215 {
216 265909x auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
217 264504x : heap_[0].time_.time_since_epoch().count();
218 265909x cached_nearest_ns_.store(ns, std::memory_order_release);
219 265909x }
220
221 inline void remove_timer_impl(implementation& impl);
222 inline void up_heap(std::size_t index);
223 inline void down_heap(std::size_t index);
224 inline void swap_heap(std::size_t i1, std::size_t i2);
225 };
226
227 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
228 : intrusive_list<waiter_node>::node
229 {
230 // Embedded completion op — avoids heap allocation per fire/cancel
231 struct completion_op final : scheduler_op
232 {
233 waiter_node* waiter_ = nullptr;
234
235 static void do_complete(
236 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
237
238 316x completion_op() noexcept : scheduler_op(&do_complete) {}
239
240 void operator()() override;
241 void destroy() override;
242 };
243
244 // Per-waiter stop_token cancellation
245 struct canceller
246 {
247 waiter_node* waiter_;
248 void operator()() const;
249 };
250
251 // nullptr once removed from timer's waiter list (concurrency marker)
252 timer_service::implementation* impl_ = nullptr;
253 timer_service* svc_ = nullptr;
254 std::coroutine_handle<> h_;
255 capy::continuation* cont_ = nullptr;
256 capy::executor_ref d_;
257 std::error_code* ec_out_ = nullptr;
258 std::stop_token token_;
259 std::optional<std::stop_callback<canceller>> stop_cb_;
260 completion_op op_;
261 std::error_code ec_value_;
262 waiter_node* next_free_ = nullptr;
263
264 316x waiter_node() noexcept
265 316x {
266 316x op_.waiter_ = this;
267 316x }
268 };
269
270 struct timer_service::implementation final : timer::implementation
271 {
272 using clock_type = std::chrono::steady_clock;
273 using time_point = clock_type::time_point;
274 using duration = clock_type::duration;
275
276 timer_service* svc_ = nullptr;
277 intrusive_list<waiter_node> waiters_;
278
279 // Free list linkage (reused when impl is on free_list)
280 implementation* next_free_ = nullptr;
281
282 inline explicit implementation(timer_service& svc) noexcept;
283
284 inline std::coroutine_handle<> wait(
285 std::coroutine_handle<>,
286 capy::executor_ref,
287 std::stop_token,
288 std::error_code*,
289 capy::continuation*) override;
290 };
291
292 // Thread-local caches avoid hot-path mutex acquisitions:
293 // 1. Impl cache — single-slot, validated by comparing svc_
294 // 2. Waiter cache — single-slot, no service affinity
295 // All caches are cleared by timer_service_invalidate_cache() during shutdown.
296
297 inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
298 inline thread_local_ptr<waiter_node> tl_cached_waiter;
299
300 inline timer_service::implementation*
301 8959x try_pop_tl_cache(timer_service* svc) noexcept
302 {
303 8959x auto* impl = tl_cached_impl.get();
304 8959x if (impl)
305 {
306 8562x tl_cached_impl.set(nullptr);
307 8562x if (impl->svc_ == svc)
308 8562x return impl;
309 // Stale impl from a destroyed service
310 delete impl;
311 }
312 397x return nullptr;
313 }
314
315 inline bool
316 8951x try_push_tl_cache(timer_service::implementation* impl) noexcept
317 {
318 8951x if (!tl_cached_impl.get())
319 {
320 8839x tl_cached_impl.set(impl);
321 8839x return true;
322 }
323 112x return false;
324 }
325
326 inline waiter_node*
327 8742x try_pop_waiter_tl_cache() noexcept
328 {
329 8742x auto* w = tl_cached_waiter.get();
330 8742x if (w)
331 {
332 8424x tl_cached_waiter.set(nullptr);
333 8424x return w;
334 }
335 318x return nullptr;
336 }
337
338 inline bool
339 8726x try_push_waiter_tl_cache(waiter_node* w) noexcept
340 {
341 8726x if (!tl_cached_waiter.get())
342 {
343 8640x tl_cached_waiter.set(w);
344 8640x return true;
345 }
346 86x return false;
347 }
348
349 inline void
350 1244x timer_service_invalidate_cache() noexcept
351 {
352 1244x delete tl_cached_impl.get();
353 1244x tl_cached_impl.set(nullptr);
354
355 1244x delete tl_cached_waiter.get();
356 1244x tl_cached_waiter.set(nullptr);
357 1244x }
358
359 // timer_service out-of-class member function definitions
360
361 397x inline timer_service::implementation::implementation(
362 397x timer_service& svc) noexcept
363 397x : svc_(&svc)
364 {
365 397x }
366
367 inline void
368 1244x timer_service::shutdown()
369 {
370 1244x timer_service_invalidate_cache();
371 1244x shutting_down_ = true;
372
373 // Snapshot impls and detach them from the heap so that
374 // coroutine-owned timer destructors (triggered by h.destroy()
375 // below) cannot re-enter remove_timer_impl() and mutate the
376 // vector during iteration.
377 1244x std::vector<implementation*> impls;
378 1244x impls.reserve(heap_.size());
379 1252x for (auto& entry : heap_)
380 {
381 8x entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
382 8x impls.push_back(entry.timer_);
383 }
384 1244x heap_.clear();
385 1244x cached_nearest_ns_.store(
386 (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
387
388 // Cancel waiting timers. Each waiter called work_started()
389 // in implementation::wait(). On IOCP the scheduler shutdown
390 // loop exits when outstanding_work_ reaches zero, so we must
391 // call work_finished() here to balance it. On other backends
392 // this is harmless.
393 1252x for (auto* impl : impls)
394 {
395 16x while (auto* w = impl->waiters_.pop_front())
396 {
397 8x w->stop_cb_.reset();
398 8x auto h = std::exchange(w->h_, {});
399 8x sched_->work_finished();
400 8x if (h)
401 8x h.destroy();
402 8x delete w;
403 8x }
404 8x delete impl;
405 }
406
407 // Delete free-listed impls
408 1356x while (free_list_)
409 {
410 112x auto* next = free_list_->next_free_;
411 112x delete free_list_;
412 112x free_list_ = next;
413 }
414
415 // Delete free-listed waiters
416 1328x while (waiter_free_list_)
417 {
418 84x auto* next = waiter_free_list_->next_free_;
419 84x delete waiter_free_list_;
420 84x waiter_free_list_ = next;
421 }
422 1244x }
423
424 inline io_object::implementation*
425 8959x timer_service::construct()
426 {
427 8959x implementation* impl = try_pop_tl_cache(this);
428 8959x if (impl)
429 {
430 8562x impl->svc_ = this;
431 8562x impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
432 8562x impl->might_have_pending_waits_ = false;
433 8562x return impl;
434 }
435
436 397x std::lock_guard lock(mutex_);
437 397x if (free_list_)
438 {
439 impl = free_list_;
440 free_list_ = impl->next_free_;
441 impl->next_free_ = nullptr;
442 impl->svc_ = this;
443 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
444 impl->might_have_pending_waits_ = false;
445 }
446 else
447 {
448 397x impl = new implementation(*this);
449 }
450 397x return impl;
451 397x }
452
453 inline void
454 8957x timer_service::destroy(io_object::implementation* p)
455 {
456 8957x destroy_impl(static_cast<implementation&>(*p));
457 8957x }
458
459 inline void
460 8957x timer_service::destroy_impl(implementation& impl)
461 {
462 // During shutdown the impl is owned by the shutdown loop.
463 // Re-entering here (from a coroutine-owned timer destructor
464 // triggered by h.destroy()) must not modify the heap or
465 // recycle the impl — shutdown deletes it directly.
466 8957x if (shutting_down_)
467 8845x return;
468
469 8951x cancel_timer(impl);
470
471 8951x if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
472 {
473 std::lock_guard lock(mutex_);
474 remove_timer_impl(impl);
475 refresh_cached_nearest();
476 }
477
478 8951x if (try_push_tl_cache(&impl))
479 8839x return;
480
481 112x std::lock_guard lock(mutex_);
482 112x impl.next_free_ = free_list_;
483 112x free_list_ = &impl;
484 112x }
485
486 inline waiter_node*
487 8742x timer_service::create_waiter()
488 {
489 8742x if (auto* w = try_pop_waiter_tl_cache())
490 8424x return w;
491
492 318x std::lock_guard lock(mutex_);
493 318x if (waiter_free_list_)
494 {
495 2x auto* w = waiter_free_list_;
496 2x waiter_free_list_ = w->next_free_;
497 2x w->next_free_ = nullptr;
498 2x return w;
499 }
500
501 316x return new waiter_node();
502 318x }
503
504 inline void
505 8726x timer_service::destroy_waiter(waiter_node* w)
506 {
507 8726x if (try_push_waiter_tl_cache(w))
508 8640x return;
509
510 86x std::lock_guard lock(mutex_);
511 86x w->next_free_ = waiter_free_list_;
512 86x waiter_free_list_ = w;
513 86x }
514
515 inline std::size_t
516 8x timer_service::update_timer(implementation& impl, time_point new_time)
517 {
518 bool in_heap =
519 8x (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
520 8x if (!in_heap && impl.waiters_.empty())
521 return 0;
522
523 8x bool notify = false;
524 8x intrusive_list<waiter_node> canceled;
525
526 {
527 8x std::lock_guard lock(mutex_);
528
529 20x while (auto* w = impl.waiters_.pop_front())
530 {
531 12x w->impl_ = nullptr;
532 12x canceled.push_back(w);
533 12x }
534
535 8x if (impl.heap_index_ < heap_.size())
536 {
537 8x time_point old_time = heap_[impl.heap_index_].time_;
538 8x heap_[impl.heap_index_].time_ = new_time;
539
540 8x if (new_time < old_time)
541 6x up_heap(impl.heap_index_);
542 else
543 2x down_heap(impl.heap_index_);
544
545 8x notify = (impl.heap_index_ == 0);
546 }
547
548 8x refresh_cached_nearest();
549 8x }
550
551 8x std::size_t count = 0;
552 20x while (auto* w = canceled.pop_front())
553 {
554 12x w->ec_value_ = make_error_code(capy::error::canceled);
555 12x sched_->post(&w->op_);
556 12x ++count;
557 12x }
558
559 8x if (notify)
560 6x on_earliest_changed_();
561
562 8x return count;
563 }
564
565 inline void
566 8742x timer_service::insert_waiter(implementation& impl, waiter_node* w)
567 {
568 8742x bool notify = false;
569 {
570 8742x std::lock_guard lock(mutex_);
571 8742x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
572 {
573 8720x impl.heap_index_ = heap_.size();
574 8720x heap_.push_back({impl.expiry_, &impl});
575 8720x up_heap(heap_.size() - 1);
576 8720x notify = (impl.heap_index_ == 0);
577 8720x refresh_cached_nearest();
578 }
579 8742x impl.waiters_.push_back(w);
580 8742x }
581 8742x if (notify)
582 8681x on_earliest_changed_();
583 8742x }
584
585 inline std::size_t
586 8959x timer_service::cancel_timer(implementation& impl)
587 {
588 8959x if (!impl.might_have_pending_waits_)
589 8933x return 0;
590
591 // Not in heap and no waiters — just clear the flag
592 26x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
593 impl.waiters_.empty())
594 {
595 impl.might_have_pending_waits_ = false;
596 return 0;
597 }
598
599 26x intrusive_list<waiter_node> canceled;
600
601 {
602 26x std::lock_guard lock(mutex_);
603 26x remove_timer_impl(impl);
604 54x while (auto* w = impl.waiters_.pop_front())
605 {
606 28x w->impl_ = nullptr;
607 28x canceled.push_back(w);
608 28x }
609 26x refresh_cached_nearest();
610 26x }
611
612 26x impl.might_have_pending_waits_ = false;
613
614 26x std::size_t count = 0;
615 54x while (auto* w = canceled.pop_front())
616 {
617 28x w->ec_value_ = make_error_code(capy::error::canceled);
618 28x sched_->post(&w->op_);
619 28x ++count;
620 28x }
621
622 26x return count;
623 }
624
625 inline void
626 36x timer_service::cancel_waiter(waiter_node* w)
627 {
628 {
629 36x std::lock_guard lock(mutex_);
630 // Already removed by cancel_timer or process_expired
631 36x if (!w->impl_)
632 return;
633 36x auto* impl = w->impl_;
634 36x w->impl_ = nullptr;
635 36x impl->waiters_.remove(w);
636 36x if (impl->waiters_.empty())
637 {
638 34x remove_timer_impl(*impl);
639 34x impl->might_have_pending_waits_ = false;
640 }
641 36x refresh_cached_nearest();
642 36x }
643
644 36x w->ec_value_ = make_error_code(capy::error::canceled);
645 36x sched_->post(&w->op_);
646 }
647
648 inline std::size_t
649 2x timer_service::cancel_one_waiter(implementation& impl)
650 {
651 2x if (!impl.might_have_pending_waits_)
652 return 0;
653
654 2x waiter_node* w = nullptr;
655
656 {
657 2x std::lock_guard lock(mutex_);
658 2x w = impl.waiters_.pop_front();
659 2x if (!w)
660 return 0;
661 2x w->impl_ = nullptr;
662 2x if (impl.waiters_.empty())
663 {
664 remove_timer_impl(impl);
665 impl.might_have_pending_waits_ = false;
666 }
667 2x refresh_cached_nearest();
668 2x }
669
670 2x w->ec_value_ = make_error_code(capy::error::canceled);
671 2x sched_->post(&w->op_);
672 2x return 1;
673 }
674
675 inline std::size_t
676 257117x timer_service::process_expired()
677 {
678 257117x intrusive_list<waiter_node> expired;
679
680 {
681 257117x std::lock_guard lock(mutex_);
682 257117x auto now = clock_type::now();
683
684 265769x while (!heap_.empty() && heap_[0].time_ <= now)
685 {
686 8652x implementation* t = heap_[0].timer_;
687 8652x remove_timer_impl(*t);
688 17308x while (auto* w = t->waiters_.pop_front())
689 {
690 8656x w->impl_ = nullptr;
691 8656x w->ec_value_ = {};
692 8656x expired.push_back(w);
693 8656x }
694 8652x t->might_have_pending_waits_ = false;
695 }
696
697 257117x refresh_cached_nearest();
698 257117x }
699
700 257117x std::size_t count = 0;
701 265773x while (auto* w = expired.pop_front())
702 {
703 8656x sched_->post(&w->op_);
704 8656x ++count;
705 8656x }
706
707 257117x return count;
708 }
709
710 inline void
711 8712x timer_service::remove_timer_impl(implementation& impl)
712 {
713 8712x std::size_t index = impl.heap_index_;
714 8712x if (index >= heap_.size())
715 return; // Not in heap
716
717 8712x if (index == heap_.size() - 1)
718 {
719 // Last element, just pop
720 279x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
721 279x heap_.pop_back();
722 }
723 else
724 {
725 // Swap with last and reheapify
726 8433x swap_heap(index, heap_.size() - 1);
727 8433x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
728 8433x heap_.pop_back();
729
730 8433x if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
731 up_heap(index);
732 else
733 8433x down_heap(index);
734 }
735 }
736
737 inline void
738 8726x timer_service::up_heap(std::size_t index)
739 {
740 17132x while (index > 0)
741 {
742 8445x std::size_t parent = (index - 1) / 2;
743 8445x if (!(heap_[index].time_ < heap_[parent].time_))
744 39x break;
745 8406x swap_heap(index, parent);
746 8406x index = parent;
747 }
748 8726x }
749
750 inline void
751 8435x timer_service::down_heap(std::size_t index)
752 {
753 8435x std::size_t child = index * 2 + 1;
754 8439x while (child < heap_.size())
755 {
756 10x std::size_t min_child = (child + 1 == heap_.size() ||
757 2x heap_[child].time_ < heap_[child + 1].time_)
758 12x ? child
759 10x : child + 1;
760
761 10x if (heap_[index].time_ < heap_[min_child].time_)
762 6x break;
763
764 4x swap_heap(index, min_child);
765 4x index = min_child;
766 4x child = index * 2 + 1;
767 }
768 8435x }
769
770 inline void
771 16843x timer_service::swap_heap(std::size_t i1, std::size_t i2)
772 {
773 16843x heap_entry tmp = heap_[i1];
774 16843x heap_[i1] = heap_[i2];
775 16843x heap_[i2] = tmp;
776 16843x heap_[i1].timer_->heap_index_ = i1;
777 16843x heap_[i2].timer_->heap_index_ = i2;
778 16843x }
779
780 // waiter_node out-of-class member function definitions
781
782 inline void
783 36x waiter_node::canceller::operator()() const
784 {
785 36x waiter_->svc_->cancel_waiter(waiter_);
786 36x }
787
788 inline void
789 waiter_node::completion_op::do_complete(
790 [[maybe_unused]] void* owner,
791 scheduler_op* base,
792 std::uint32_t,
793 std::uint32_t)
794 {
795 // owner is always non-null here. The destroy path (owner == nullptr)
796 // is unreachable because completion_op overrides destroy() directly,
797 // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
798 BOOST_COROSIO_ASSERT(owner);
799 static_cast<completion_op*>(base)->operator()();
800 }
801
802 inline void
803 8726x waiter_node::completion_op::operator()()
804 {
805 8726x auto* w = waiter_;
806 8726x w->stop_cb_.reset();
807 8726x if (w->ec_out_)
808 8726x *w->ec_out_ = w->ec_value_;
809
810 8726x auto* cont = w->cont_;
811 8726x auto d = w->d_;
812 8726x auto* svc = w->svc_;
813 8726x auto& sched = svc->get_scheduler();
814
815 8726x svc->destroy_waiter(w);
816
817 8726x d.post(*cont);
818 8726x sched.work_finished();
819 8726x }
820
821 // GCC 14 false-positive: inlining ~optional<stop_callback> through
822 // delete loses track that stop_cb_ was already .reset() above.
823 #if defined(__GNUC__) && !defined(__clang__)
824 #pragma GCC diagnostic push
825 #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
826 #endif
827 inline void
828 8x waiter_node::completion_op::destroy()
829 {
830 // Called during scheduler shutdown drain when this completion_op is
831 // in the scheduler's ready queue (posted by cancel_timer() or
832 // process_expired()). Balances the work_started() from
833 // implementation::wait(). The scheduler drain loop separately
834 // balances the work_started() from post(). On IOCP both decrements
835 // are required for outstanding_work_ to reach zero; on other
836 // backends this is harmless.
837 //
838 // This override also prevents scheduler_op::destroy() from calling
839 // do_complete(nullptr, ...). See also: timer_service::shutdown()
840 // which drains waiters still in the timer heap (the other path).
841 8x auto* w = waiter_;
842 8x w->stop_cb_.reset();
843 8x auto h = std::exchange(w->h_, {});
844 8x auto& sched = w->svc_->get_scheduler();
845 8x delete w;
846 8x sched.work_finished();
847 8x if (h)
848 8x h.destroy();
849 8x }
850 #if defined(__GNUC__) && !defined(__clang__)
851 #pragma GCC diagnostic pop
852 #endif
853
854 inline std::coroutine_handle<>
855 8742x timer_service::implementation::wait(
856 std::coroutine_handle<> h,
857 capy::executor_ref d,
858 std::stop_token token,
859 std::error_code* ec,
860 capy::continuation* cont)
861 {
862 // Already-expired fast path — no waiter_node, no mutex.
863 // Post instead of dispatch so the coroutine yields to the
864 // scheduler, allowing other queued work to run.
865 8742x if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
866 {
867 8720x if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
868 {
869 if (ec)
870 *ec = {};
871 d.post(*cont);
872 return std::noop_coroutine();
873 }
874 }
875
876 8742x auto* w = svc_->create_waiter();
877 8742x w->impl_ = this;
878 8742x w->svc_ = svc_;
879 8742x w->h_ = h;
880 8742x w->cont_ = cont;
881 8742x w->d_ = d;
882 8742x w->token_ = std::move(token);
883 8742x w->ec_out_ = ec;
884
885 8742x svc_->insert_waiter(*this, w);
886 8742x might_have_pending_waits_ = true;
887 8742x svc_->get_scheduler().work_started();
888
889 8742x if (w->token_.stop_possible())
890 60x w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
891
892 8742x return std::noop_coroutine();
893 }
894
895 // Free functions
896
897 inline std::size_t
898 8x timer_service_update_expiry(timer::implementation& base)
899 {
900 8x auto& impl = static_cast<timer_service::implementation&>(base);
901 8x return impl.svc_->update_timer(impl, impl.expiry_);
902 }
903
904 inline std::size_t
905 8x timer_service_cancel(timer::implementation& base) noexcept
906 {
907 8x auto& impl = static_cast<timer_service::implementation&>(base);
908 8x return impl.svc_->cancel_timer(impl);
909 }
910
911 inline std::size_t
912 2x timer_service_cancel_one(timer::implementation& base) noexcept
913 {
914 2x auto& impl = static_cast<timer_service::implementation&>(base);
915 2x return impl.svc_->cancel_one_waiter(impl);
916 }
917
918 inline timer_service&
919 1244x get_timer_service(capy::execution_context& ctx, scheduler& sched)
920 {
921 1244x return ctx.make_service<timer_service>(sched);
922 }
923
924 } // namespace boost::corosio::detail
925
926 #endif
927