TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Steve Gerbino
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/corosio
9 : //
10 :
11 : #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 : #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13 :
14 : #include <boost/corosio/timer.hpp>
15 : #include <boost/corosio/detail/scheduler.hpp>
16 : #include <boost/corosio/detail/scheduler_op.hpp>
17 : #include <boost/corosio/detail/intrusive.hpp>
18 : #include <boost/corosio/detail/thread_local_ptr.hpp>
19 : #include <boost/capy/error.hpp>
20 : #include <boost/capy/ex/execution_context.hpp>
21 : #include <boost/capy/ex/executor_ref.hpp>
22 : #include <system_error>
23 :
24 : #include <atomic>
25 : #include <chrono>
26 : #include <coroutine>
27 : #include <cstddef>
28 : #include <limits>
29 : #include <mutex>
30 : #include <optional>
31 : #include <stop_token>
32 : #include <utility>
33 : #include <vector>
34 :
35 : namespace boost::corosio::detail {
36 :
37 : struct scheduler;
38 :
39 : /*
40 : Timer Service
41 : =============
42 :
43 : Data Structures
44 : ---------------
45 : waiter_node holds per-waiter state: coroutine handle, executor,
46 : error output, stop_token, embedded completion_op. Each concurrent
47 : co_await t.wait() allocates one waiter_node.
48 :
49 : timer_service::implementation holds per-timer state: expiry,
50 : heap index, and an intrusive_list of waiter_nodes. Multiple
51 : coroutines can wait on the same timer simultaneously.
52 :
53 : timer_service owns a min-heap of active timers, a free list
54 : of recycled impls, and a free list of recycled waiter_nodes. The
55 : heap is ordered by expiry time; the scheduler queries
56 : nearest_expiry() to set the epoll/timerfd timeout.
57 :
58 : Optimization Strategy
59 : ---------------------
60 : 1. Deferred heap insertion — expires_after() stores the expiry
61 : but does not insert into the heap. Insertion happens in wait().
62 : 2. Thread-local impl cache — single-slot per-thread cache.
63 : 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
64 : 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
65 : 5. might_have_pending_waits_ flag — skips lock when no wait issued.
66 : 6. Thread-local waiter cache — single-slot per-thread cache.
67 :
68 : Concurrency
69 : -----------
70 : stop_token callbacks can fire from any thread. The impl_
71 : pointer on waiter_node is used as a "still in list" marker.
72 : */
73 :
74 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
75 :
76 : inline void timer_service_invalidate_cache() noexcept;
77 :
78 : // timer_service class body — member function definitions are
79 : // out-of-class (after implementation and waiter_node are complete)
80 : class BOOST_COROSIO_DECL timer_service final
81 : : public capy::execution_context::service
82 : , public io_object::io_service
83 : {
84 : public:
85 : using clock_type = std::chrono::steady_clock;
86 : using time_point = clock_type::time_point;
87 :
88 : /// Type-erased callback for earliest-expiry-changed notifications.
89 : class callback
90 : {
91 : void* ctx_ = nullptr;
92 : void (*fn_)(void*) = nullptr;
93 :
94 : public:
95 : /// Construct an empty callback.
96 HIT 1244 : callback() = default;
97 :
98 : /// Construct a callback with the given context and function.
99 1244 : callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
100 :
101 : /// Return true if the callback is non-empty.
102 : explicit operator bool() const noexcept
103 : {
104 : return fn_ != nullptr;
105 : }
106 :
107 : /// Invoke the callback.
108 8687 : void operator()() const
109 : {
110 8687 : if (fn_)
111 8687 : fn_(ctx_);
112 8687 : }
113 : };
114 :
115 : struct implementation;
116 :
117 : private:
118 : struct heap_entry
119 : {
120 : time_point time_;
121 : implementation* timer_;
122 : };
123 :
124 : scheduler* sched_ = nullptr;
125 : BOOST_COROSIO_MSVC_WARNING_PUSH
126 : BOOST_COROSIO_MSVC_WARNING_DISABLE(4251) // std:: members, dll-interface
127 : mutable std::mutex mutex_;
128 : std::vector<heap_entry> heap_;
129 : implementation* free_list_ = nullptr;
130 : waiter_node* waiter_free_list_ = nullptr;
131 : callback on_earliest_changed_;
132 : bool shutting_down_ = false;
133 : // Avoids mutex in nearest_expiry() and empty()
134 : mutable std::atomic<std::int64_t> cached_nearest_ns_{
135 : (std::numeric_limits<std::int64_t>::max)()};
136 : BOOST_COROSIO_MSVC_WARNING_POP
137 :
138 : public:
139 : /// Construct the timer service bound to a scheduler.
140 1244 : inline timer_service(capy::execution_context&, scheduler& sched)
141 1244 : : sched_(&sched)
142 : {
143 1244 : }
144 :
145 : /// Return the associated scheduler.
146 17476 : inline scheduler& get_scheduler() noexcept
147 : {
148 17476 : return *sched_;
149 : }
150 :
151 : /// Destroy the timer service.
152 2488 : ~timer_service() override = default;
153 :
154 : timer_service(timer_service const&) = delete;
155 : timer_service& operator=(timer_service const&) = delete;
156 :
157 : /// Register a callback invoked when the earliest expiry changes.
158 1244 : inline void set_on_earliest_changed(callback cb)
159 : {
160 1244 : on_earliest_changed_ = cb;
161 1244 : }
162 :
163 : /// Return true if no timers are in the heap.
164 : inline bool empty() const noexcept
165 : {
166 : return cached_nearest_ns_.load(std::memory_order_acquire) ==
167 : (std::numeric_limits<std::int64_t>::max)();
168 : }
169 :
170 : /// Return the nearest timer expiry without acquiring the mutex.
171 237189 : inline time_point nearest_expiry() const noexcept
172 : {
173 237189 : auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
174 237189 : return time_point(time_point::duration(ns));
175 : }
176 :
177 : /// Cancel all pending timers and free cached resources.
178 : inline void shutdown() override;
179 :
180 : /// Construct a new timer implementation.
181 : inline io_object::implementation* construct() override;
182 :
183 : /// Destroy a timer implementation, cancelling pending waiters.
184 : inline void destroy(io_object::implementation* p) override;
185 :
186 : /// Cancel and recycle a timer implementation.
187 : inline void destroy_impl(implementation& impl);
188 :
189 : /// Create or recycle a waiter node.
190 : inline waiter_node* create_waiter();
191 :
192 : /// Return a waiter node to the cache or free list.
193 : inline void destroy_waiter(waiter_node* w);
194 :
195 : /// Update the timer expiry, cancelling existing waiters.
196 : inline std::size_t update_timer(implementation& impl, time_point new_time);
197 :
198 : /// Insert a waiter into the timer's waiter list and the heap.
199 : inline void insert_waiter(implementation& impl, waiter_node* w);
200 :
201 : /// Cancel all waiters on a timer.
202 : inline std::size_t cancel_timer(implementation& impl);
203 :
204 : /// Cancel a single waiter ( stop_token callback path ).
205 : inline void cancel_waiter(waiter_node* w);
206 :
207 : /// Cancel one waiter on a timer.
208 : inline std::size_t cancel_one_waiter(implementation& impl);
209 :
210 : /// Complete all waiters whose timers have expired.
211 : inline std::size_t process_expired();
212 :
213 : private:
214 265909 : inline void refresh_cached_nearest() noexcept
215 : {
216 265909 : auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
217 264504 : : heap_[0].time_.time_since_epoch().count();
218 265909 : cached_nearest_ns_.store(ns, std::memory_order_release);
219 265909 : }
220 :
221 : inline void remove_timer_impl(implementation& impl);
222 : inline void up_heap(std::size_t index);
223 : inline void down_heap(std::size_t index);
224 : inline void swap_heap(std::size_t i1, std::size_t i2);
225 : };
226 :
227 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
228 : : intrusive_list<waiter_node>::node
229 : {
230 : // Embedded completion op — avoids heap allocation per fire/cancel
231 : struct completion_op final : scheduler_op
232 : {
233 : waiter_node* waiter_ = nullptr;
234 :
235 : static void do_complete(
236 : void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
237 :
238 316 : completion_op() noexcept : scheduler_op(&do_complete) {}
239 :
240 : void operator()() override;
241 : void destroy() override;
242 : };
243 :
244 : // Per-waiter stop_token cancellation
245 : struct canceller
246 : {
247 : waiter_node* waiter_;
248 : void operator()() const;
249 : };
250 :
251 : // nullptr once removed from timer's waiter list (concurrency marker)
252 : timer_service::implementation* impl_ = nullptr;
253 : timer_service* svc_ = nullptr;
254 : std::coroutine_handle<> h_;
255 : capy::continuation* cont_ = nullptr;
256 : capy::executor_ref d_;
257 : std::error_code* ec_out_ = nullptr;
258 : std::stop_token token_;
259 : std::optional<std::stop_callback<canceller>> stop_cb_;
260 : completion_op op_;
261 : std::error_code ec_value_;
262 : waiter_node* next_free_ = nullptr;
263 :
264 316 : waiter_node() noexcept
265 316 : {
266 316 : op_.waiter_ = this;
267 316 : }
268 : };
269 :
270 : struct timer_service::implementation final : timer::implementation
271 : {
272 : using clock_type = std::chrono::steady_clock;
273 : using time_point = clock_type::time_point;
274 : using duration = clock_type::duration;
275 :
276 : timer_service* svc_ = nullptr;
277 : intrusive_list<waiter_node> waiters_;
278 :
279 : // Free list linkage (reused when impl is on free_list)
280 : implementation* next_free_ = nullptr;
281 :
282 : inline explicit implementation(timer_service& svc) noexcept;
283 :
284 : inline std::coroutine_handle<> wait(
285 : std::coroutine_handle<>,
286 : capy::executor_ref,
287 : std::stop_token,
288 : std::error_code*,
289 : capy::continuation*) override;
290 : };
291 :
292 : // Thread-local caches avoid hot-path mutex acquisitions:
293 : // 1. Impl cache — single-slot, validated by comparing svc_
294 : // 2. Waiter cache — single-slot, no service affinity
295 : // All caches are cleared by timer_service_invalidate_cache() during shutdown.
296 :
297 : inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
298 : inline thread_local_ptr<waiter_node> tl_cached_waiter;
299 :
300 : inline timer_service::implementation*
301 8959 : try_pop_tl_cache(timer_service* svc) noexcept
302 : {
303 8959 : auto* impl = tl_cached_impl.get();
304 8959 : if (impl)
305 : {
306 8562 : tl_cached_impl.set(nullptr);
307 8562 : if (impl->svc_ == svc)
308 8562 : return impl;
309 : // Stale impl from a destroyed service
310 MIS 0 : delete impl;
311 : }
312 HIT 397 : return nullptr;
313 : }
314 :
315 : inline bool
316 8951 : try_push_tl_cache(timer_service::implementation* impl) noexcept
317 : {
318 8951 : if (!tl_cached_impl.get())
319 : {
320 8839 : tl_cached_impl.set(impl);
321 8839 : return true;
322 : }
323 112 : return false;
324 : }
325 :
326 : inline waiter_node*
327 8742 : try_pop_waiter_tl_cache() noexcept
328 : {
329 8742 : auto* w = tl_cached_waiter.get();
330 8742 : if (w)
331 : {
332 8424 : tl_cached_waiter.set(nullptr);
333 8424 : return w;
334 : }
335 318 : return nullptr;
336 : }
337 :
338 : inline bool
339 8726 : try_push_waiter_tl_cache(waiter_node* w) noexcept
340 : {
341 8726 : if (!tl_cached_waiter.get())
342 : {
343 8640 : tl_cached_waiter.set(w);
344 8640 : return true;
345 : }
346 86 : return false;
347 : }
348 :
349 : inline void
350 1244 : timer_service_invalidate_cache() noexcept
351 : {
352 1244 : delete tl_cached_impl.get();
353 1244 : tl_cached_impl.set(nullptr);
354 :
355 1244 : delete tl_cached_waiter.get();
356 1244 : tl_cached_waiter.set(nullptr);
357 1244 : }
358 :
359 : // timer_service out-of-class member function definitions
360 :
361 397 : inline timer_service::implementation::implementation(
362 397 : timer_service& svc) noexcept
363 397 : : svc_(&svc)
364 : {
365 397 : }
366 :
367 : inline void
368 1244 : timer_service::shutdown()
369 : {
370 1244 : timer_service_invalidate_cache();
371 1244 : shutting_down_ = true;
372 :
373 : // Snapshot impls and detach them from the heap so that
374 : // coroutine-owned timer destructors (triggered by h.destroy()
375 : // below) cannot re-enter remove_timer_impl() and mutate the
376 : // vector during iteration.
377 1244 : std::vector<implementation*> impls;
378 1244 : impls.reserve(heap_.size());
379 1252 : for (auto& entry : heap_)
380 : {
381 8 : entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
382 8 : impls.push_back(entry.timer_);
383 : }
384 1244 : heap_.clear();
385 1244 : cached_nearest_ns_.store(
386 : (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
387 :
388 : // Cancel waiting timers. Each waiter called work_started()
389 : // in implementation::wait(). On IOCP the scheduler shutdown
390 : // loop exits when outstanding_work_ reaches zero, so we must
391 : // call work_finished() here to balance it. On other backends
392 : // this is harmless.
393 1252 : for (auto* impl : impls)
394 : {
395 16 : while (auto* w = impl->waiters_.pop_front())
396 : {
397 8 : w->stop_cb_.reset();
398 8 : auto h = std::exchange(w->h_, {});
399 8 : sched_->work_finished();
400 8 : if (h)
401 8 : h.destroy();
402 8 : delete w;
403 8 : }
404 8 : delete impl;
405 : }
406 :
407 : // Delete free-listed impls
408 1356 : while (free_list_)
409 : {
410 112 : auto* next = free_list_->next_free_;
411 112 : delete free_list_;
412 112 : free_list_ = next;
413 : }
414 :
415 : // Delete free-listed waiters
416 1328 : while (waiter_free_list_)
417 : {
418 84 : auto* next = waiter_free_list_->next_free_;
419 84 : delete waiter_free_list_;
420 84 : waiter_free_list_ = next;
421 : }
422 1244 : }
423 :
424 : inline io_object::implementation*
425 8959 : timer_service::construct()
426 : {
427 8959 : implementation* impl = try_pop_tl_cache(this);
428 8959 : if (impl)
429 : {
430 8562 : impl->svc_ = this;
431 8562 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
432 8562 : impl->might_have_pending_waits_ = false;
433 8562 : return impl;
434 : }
435 :
436 397 : std::lock_guard lock(mutex_);
437 397 : if (free_list_)
438 : {
439 MIS 0 : impl = free_list_;
440 0 : free_list_ = impl->next_free_;
441 0 : impl->next_free_ = nullptr;
442 0 : impl->svc_ = this;
443 0 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
444 0 : impl->might_have_pending_waits_ = false;
445 : }
446 : else
447 : {
448 HIT 397 : impl = new implementation(*this);
449 : }
450 397 : return impl;
451 397 : }
452 :
453 : inline void
454 8957 : timer_service::destroy(io_object::implementation* p)
455 : {
456 8957 : destroy_impl(static_cast<implementation&>(*p));
457 8957 : }
458 :
459 : inline void
460 8957 : timer_service::destroy_impl(implementation& impl)
461 : {
462 : // During shutdown the impl is owned by the shutdown loop.
463 : // Re-entering here (from a coroutine-owned timer destructor
464 : // triggered by h.destroy()) must not modify the heap or
465 : // recycle the impl — shutdown deletes it directly.
466 8957 : if (shutting_down_)
467 8845 : return;
468 :
469 8951 : cancel_timer(impl);
470 :
471 8951 : if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
472 : {
473 MIS 0 : std::lock_guard lock(mutex_);
474 0 : remove_timer_impl(impl);
475 0 : refresh_cached_nearest();
476 0 : }
477 :
478 HIT 8951 : if (try_push_tl_cache(&impl))
479 8839 : return;
480 :
481 112 : std::lock_guard lock(mutex_);
482 112 : impl.next_free_ = free_list_;
483 112 : free_list_ = &impl;
484 112 : }
485 :
486 : inline waiter_node*
487 8742 : timer_service::create_waiter()
488 : {
489 8742 : if (auto* w = try_pop_waiter_tl_cache())
490 8424 : return w;
491 :
492 318 : std::lock_guard lock(mutex_);
493 318 : if (waiter_free_list_)
494 : {
495 2 : auto* w = waiter_free_list_;
496 2 : waiter_free_list_ = w->next_free_;
497 2 : w->next_free_ = nullptr;
498 2 : return w;
499 : }
500 :
501 316 : return new waiter_node();
502 318 : }
503 :
504 : inline void
505 8726 : timer_service::destroy_waiter(waiter_node* w)
506 : {
507 8726 : if (try_push_waiter_tl_cache(w))
508 8640 : return;
509 :
510 86 : std::lock_guard lock(mutex_);
511 86 : w->next_free_ = waiter_free_list_;
512 86 : waiter_free_list_ = w;
513 86 : }
514 :
515 : inline std::size_t
516 8 : timer_service::update_timer(implementation& impl, time_point new_time)
517 : {
518 : bool in_heap =
519 8 : (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
520 8 : if (!in_heap && impl.waiters_.empty())
521 MIS 0 : return 0;
522 :
523 HIT 8 : bool notify = false;
524 8 : intrusive_list<waiter_node> canceled;
525 :
526 : {
527 8 : std::lock_guard lock(mutex_);
528 :
529 20 : while (auto* w = impl.waiters_.pop_front())
530 : {
531 12 : w->impl_ = nullptr;
532 12 : canceled.push_back(w);
533 12 : }
534 :
535 8 : if (impl.heap_index_ < heap_.size())
536 : {
537 8 : time_point old_time = heap_[impl.heap_index_].time_;
538 8 : heap_[impl.heap_index_].time_ = new_time;
539 :
540 8 : if (new_time < old_time)
541 6 : up_heap(impl.heap_index_);
542 : else
543 2 : down_heap(impl.heap_index_);
544 :
545 8 : notify = (impl.heap_index_ == 0);
546 : }
547 :
548 8 : refresh_cached_nearest();
549 8 : }
550 :
551 8 : std::size_t count = 0;
552 20 : while (auto* w = canceled.pop_front())
553 : {
554 12 : w->ec_value_ = make_error_code(capy::error::canceled);
555 12 : sched_->post(&w->op_);
556 12 : ++count;
557 12 : }
558 :
559 8 : if (notify)
560 6 : on_earliest_changed_();
561 :
562 8 : return count;
563 : }
564 :
565 : inline void
566 8742 : timer_service::insert_waiter(implementation& impl, waiter_node* w)
567 : {
568 8742 : bool notify = false;
569 : {
570 8742 : std::lock_guard lock(mutex_);
571 8742 : if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
572 : {
573 8720 : impl.heap_index_ = heap_.size();
574 8720 : heap_.push_back({impl.expiry_, &impl});
575 8720 : up_heap(heap_.size() - 1);
576 8720 : notify = (impl.heap_index_ == 0);
577 8720 : refresh_cached_nearest();
578 : }
579 8742 : impl.waiters_.push_back(w);
580 8742 : }
581 8742 : if (notify)
582 8681 : on_earliest_changed_();
583 8742 : }
584 :
585 : inline std::size_t
586 8959 : timer_service::cancel_timer(implementation& impl)
587 : {
588 8959 : if (!impl.might_have_pending_waits_)
589 8933 : return 0;
590 :
591 : // Not in heap and no waiters — just clear the flag
592 26 : if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
593 MIS 0 : impl.waiters_.empty())
594 : {
595 0 : impl.might_have_pending_waits_ = false;
596 0 : return 0;
597 : }
598 :
599 HIT 26 : intrusive_list<waiter_node> canceled;
600 :
601 : {
602 26 : std::lock_guard lock(mutex_);
603 26 : remove_timer_impl(impl);
604 54 : while (auto* w = impl.waiters_.pop_front())
605 : {
606 28 : w->impl_ = nullptr;
607 28 : canceled.push_back(w);
608 28 : }
609 26 : refresh_cached_nearest();
610 26 : }
611 :
612 26 : impl.might_have_pending_waits_ = false;
613 :
614 26 : std::size_t count = 0;
615 54 : while (auto* w = canceled.pop_front())
616 : {
617 28 : w->ec_value_ = make_error_code(capy::error::canceled);
618 28 : sched_->post(&w->op_);
619 28 : ++count;
620 28 : }
621 :
622 26 : return count;
623 : }
624 :
625 : inline void
626 36 : timer_service::cancel_waiter(waiter_node* w)
627 : {
628 : {
629 36 : std::lock_guard lock(mutex_);
630 : // Already removed by cancel_timer or process_expired
631 36 : if (!w->impl_)
632 MIS 0 : return;
633 HIT 36 : auto* impl = w->impl_;
634 36 : w->impl_ = nullptr;
635 36 : impl->waiters_.remove(w);
636 36 : if (impl->waiters_.empty())
637 : {
638 34 : remove_timer_impl(*impl);
639 34 : impl->might_have_pending_waits_ = false;
640 : }
641 36 : refresh_cached_nearest();
642 36 : }
643 :
644 36 : w->ec_value_ = make_error_code(capy::error::canceled);
645 36 : sched_->post(&w->op_);
646 : }
647 :
648 : inline std::size_t
649 2 : timer_service::cancel_one_waiter(implementation& impl)
650 : {
651 2 : if (!impl.might_have_pending_waits_)
652 MIS 0 : return 0;
653 :
654 HIT 2 : waiter_node* w = nullptr;
655 :
656 : {
657 2 : std::lock_guard lock(mutex_);
658 2 : w = impl.waiters_.pop_front();
659 2 : if (!w)
660 MIS 0 : return 0;
661 HIT 2 : w->impl_ = nullptr;
662 2 : if (impl.waiters_.empty())
663 : {
664 MIS 0 : remove_timer_impl(impl);
665 0 : impl.might_have_pending_waits_ = false;
666 : }
667 HIT 2 : refresh_cached_nearest();
668 2 : }
669 :
670 2 : w->ec_value_ = make_error_code(capy::error::canceled);
671 2 : sched_->post(&w->op_);
672 2 : return 1;
673 : }
674 :
675 : inline std::size_t
676 257117 : timer_service::process_expired()
677 : {
678 257117 : intrusive_list<waiter_node> expired;
679 :
680 : {
681 257117 : std::lock_guard lock(mutex_);
682 257117 : auto now = clock_type::now();
683 :
684 265769 : while (!heap_.empty() && heap_[0].time_ <= now)
685 : {
686 8652 : implementation* t = heap_[0].timer_;
687 8652 : remove_timer_impl(*t);
688 17308 : while (auto* w = t->waiters_.pop_front())
689 : {
690 8656 : w->impl_ = nullptr;
691 8656 : w->ec_value_ = {};
692 8656 : expired.push_back(w);
693 8656 : }
694 8652 : t->might_have_pending_waits_ = false;
695 : }
696 :
697 257117 : refresh_cached_nearest();
698 257117 : }
699 :
700 257117 : std::size_t count = 0;
701 265773 : while (auto* w = expired.pop_front())
702 : {
703 8656 : sched_->post(&w->op_);
704 8656 : ++count;
705 8656 : }
706 :
707 257117 : return count;
708 : }
709 :
710 : inline void
711 8712 : timer_service::remove_timer_impl(implementation& impl)
712 : {
713 8712 : std::size_t index = impl.heap_index_;
714 8712 : if (index >= heap_.size())
715 MIS 0 : return; // Not in heap
716 :
717 HIT 8712 : if (index == heap_.size() - 1)
718 : {
719 : // Last element, just pop
720 279 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
721 279 : heap_.pop_back();
722 : }
723 : else
724 : {
725 : // Swap with last and reheapify
726 8433 : swap_heap(index, heap_.size() - 1);
727 8433 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
728 8433 : heap_.pop_back();
729 :
730 8433 : if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
731 MIS 0 : up_heap(index);
732 : else
733 HIT 8433 : down_heap(index);
734 : }
735 : }
736 :
737 : inline void
738 8726 : timer_service::up_heap(std::size_t index)
739 : {
740 17132 : while (index > 0)
741 : {
742 8445 : std::size_t parent = (index - 1) / 2;
743 8445 : if (!(heap_[index].time_ < heap_[parent].time_))
744 39 : break;
745 8406 : swap_heap(index, parent);
746 8406 : index = parent;
747 : }
748 8726 : }
749 :
750 : inline void
751 8435 : timer_service::down_heap(std::size_t index)
752 : {
753 8435 : std::size_t child = index * 2 + 1;
754 8439 : while (child < heap_.size())
755 : {
756 10 : std::size_t min_child = (child + 1 == heap_.size() ||
757 2 : heap_[child].time_ < heap_[child + 1].time_)
758 12 : ? child
759 10 : : child + 1;
760 :
761 10 : if (heap_[index].time_ < heap_[min_child].time_)
762 6 : break;
763 :
764 4 : swap_heap(index, min_child);
765 4 : index = min_child;
766 4 : child = index * 2 + 1;
767 : }
768 8435 : }
769 :
770 : inline void
771 16843 : timer_service::swap_heap(std::size_t i1, std::size_t i2)
772 : {
773 16843 : heap_entry tmp = heap_[i1];
774 16843 : heap_[i1] = heap_[i2];
775 16843 : heap_[i2] = tmp;
776 16843 : heap_[i1].timer_->heap_index_ = i1;
777 16843 : heap_[i2].timer_->heap_index_ = i2;
778 16843 : }
779 :
780 : // waiter_node out-of-class member function definitions
781 :
782 : inline void
783 36 : waiter_node::canceller::operator()() const
784 : {
785 36 : waiter_->svc_->cancel_waiter(waiter_);
786 36 : }
787 :
788 : inline void
789 MIS 0 : waiter_node::completion_op::do_complete(
790 : [[maybe_unused]] void* owner,
791 : scheduler_op* base,
792 : std::uint32_t,
793 : std::uint32_t)
794 : {
795 : // owner is always non-null here. The destroy path (owner == nullptr)
796 : // is unreachable because completion_op overrides destroy() directly,
797 : // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
798 0 : BOOST_COROSIO_ASSERT(owner);
799 0 : static_cast<completion_op*>(base)->operator()();
800 0 : }
801 :
802 : inline void
803 HIT 8726 : waiter_node::completion_op::operator()()
804 : {
805 8726 : auto* w = waiter_;
806 8726 : w->stop_cb_.reset();
807 8726 : if (w->ec_out_)
808 8726 : *w->ec_out_ = w->ec_value_;
809 :
810 8726 : auto* cont = w->cont_;
811 8726 : auto d = w->d_;
812 8726 : auto* svc = w->svc_;
813 8726 : auto& sched = svc->get_scheduler();
814 :
815 8726 : svc->destroy_waiter(w);
816 :
817 8726 : d.post(*cont);
818 8726 : sched.work_finished();
819 8726 : }
820 :
821 : // GCC 14 false-positive: inlining ~optional<stop_callback> through
822 : // delete loses track that stop_cb_ was already .reset() above.
823 : #if defined(__GNUC__) && !defined(__clang__)
824 : #pragma GCC diagnostic push
825 : #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
826 : #endif
827 : inline void
828 8 : waiter_node::completion_op::destroy()
829 : {
830 : // Called during scheduler shutdown drain when this completion_op is
831 : // in the scheduler's ready queue (posted by cancel_timer() or
832 : // process_expired()). Balances the work_started() from
833 : // implementation::wait(). The scheduler drain loop separately
834 : // balances the work_started() from post(). On IOCP both decrements
835 : // are required for outstanding_work_ to reach zero; on other
836 : // backends this is harmless.
837 : //
838 : // This override also prevents scheduler_op::destroy() from calling
839 : // do_complete(nullptr, ...). See also: timer_service::shutdown()
840 : // which drains waiters still in the timer heap (the other path).
841 8 : auto* w = waiter_;
842 8 : w->stop_cb_.reset();
843 8 : auto h = std::exchange(w->h_, {});
844 8 : auto& sched = w->svc_->get_scheduler();
845 8 : delete w;
846 8 : sched.work_finished();
847 8 : if (h)
848 8 : h.destroy();
849 8 : }
850 : #if defined(__GNUC__) && !defined(__clang__)
851 : #pragma GCC diagnostic pop
852 : #endif
853 :
854 : inline std::coroutine_handle<>
855 8742 : timer_service::implementation::wait(
856 : std::coroutine_handle<> h,
857 : capy::executor_ref d,
858 : std::stop_token token,
859 : std::error_code* ec,
860 : capy::continuation* cont)
861 : {
862 : // Already-expired fast path — no waiter_node, no mutex.
863 : // Post instead of dispatch so the coroutine yields to the
864 : // scheduler, allowing other queued work to run.
865 8742 : if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
866 : {
867 8720 : if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
868 : {
869 MIS 0 : if (ec)
870 0 : *ec = {};
871 0 : d.post(*cont);
872 0 : return std::noop_coroutine();
873 : }
874 : }
875 :
876 HIT 8742 : auto* w = svc_->create_waiter();
877 8742 : w->impl_ = this;
878 8742 : w->svc_ = svc_;
879 8742 : w->h_ = h;
880 8742 : w->cont_ = cont;
881 8742 : w->d_ = d;
882 8742 : w->token_ = std::move(token);
883 8742 : w->ec_out_ = ec;
884 :
885 8742 : svc_->insert_waiter(*this, w);
886 8742 : might_have_pending_waits_ = true;
887 8742 : svc_->get_scheduler().work_started();
888 :
889 8742 : if (w->token_.stop_possible())
890 60 : w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
891 :
892 8742 : return std::noop_coroutine();
893 : }
894 :
895 : // Free functions
896 :
897 : inline std::size_t
898 8 : timer_service_update_expiry(timer::implementation& base)
899 : {
900 8 : auto& impl = static_cast<timer_service::implementation&>(base);
901 8 : return impl.svc_->update_timer(impl, impl.expiry_);
902 : }
903 :
904 : inline std::size_t
905 8 : timer_service_cancel(timer::implementation& base) noexcept
906 : {
907 8 : auto& impl = static_cast<timer_service::implementation&>(base);
908 8 : return impl.svc_->cancel_timer(impl);
909 : }
910 :
911 : inline std::size_t
912 2 : timer_service_cancel_one(timer::implementation& base) noexcept
913 : {
914 2 : auto& impl = static_cast<timer_service::implementation&>(base);
915 2 : return impl.svc_->cancel_one_waiter(impl);
916 : }
917 :
918 : inline timer_service&
919 1244 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
920 : {
921 1244 : return ctx.make_service<timer_service>(sched);
922 : }
923 :
924 : } // namespace boost::corosio::detail
925 :
926 : #endif
|