LCOV - code coverage report
Current view: top level - corosio/detail - timer_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 91.8 % 367 337 30
Test Date: 2026-06-30 19:55:18 Functions: 97.7 % 43 42 1

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Steve Gerbino
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/cppalliance/corosio
       9                 : //
      10                 : 
      11                 : #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
      12                 : #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
      13                 : 
      14                 : #include <boost/corosio/timer.hpp>
      15                 : #include <boost/corosio/detail/scheduler.hpp>
      16                 : #include <boost/corosio/detail/scheduler_op.hpp>
      17                 : #include <boost/corosio/detail/intrusive.hpp>
      18                 : #include <boost/corosio/detail/thread_local_ptr.hpp>
      19                 : #include <boost/capy/error.hpp>
      20                 : #include <boost/capy/ex/execution_context.hpp>
      21                 : #include <boost/capy/ex/executor_ref.hpp>
      22                 : #include <system_error>
      23                 : 
      24                 : #include <atomic>
      25                 : #include <chrono>
      26                 : #include <coroutine>
      27                 : #include <cstddef>
      28                 : #include <limits>
      29                 : #include <mutex>
      30                 : #include <optional>
      31                 : #include <stop_token>
      32                 : #include <utility>
      33                 : #include <vector>
      34                 : 
      35                 : namespace boost::corosio::detail {
      36                 : 
      37                 : struct scheduler;
      38                 : 
      39                 : /*
      40                 :     Timer Service
      41                 :     =============
      42                 : 
      43                 :     Data Structures
      44                 :     ---------------
      45                 :     waiter_node holds per-waiter state: coroutine handle, executor,
      46                 :     error output, stop_token, embedded completion_op. Each concurrent
      47                 :     co_await t.wait() allocates one waiter_node.
      48                 : 
      49                 :     timer_service::implementation holds per-timer state: expiry,
      50                 :     heap index, and an intrusive_list of waiter_nodes. Multiple
      51                 :     coroutines can wait on the same timer simultaneously.
      52                 : 
      53                 :     timer_service owns a min-heap of active timers, a free list
      54                 :     of recycled impls, and a free list of recycled waiter_nodes. The
      55                 :     heap is ordered by expiry time; the scheduler queries
      56                 :     nearest_expiry() to set the epoll/timerfd timeout.
      57                 : 
      58                 :     Optimization Strategy
      59                 :     ---------------------
      60                 :     1. Deferred heap insertion — expires_after() stores the expiry
      61                 :        but does not insert into the heap. Insertion happens in wait().
      62                 :     2. Thread-local impl cache — single-slot per-thread cache.
      63                 :     3. Embedded completion_op — eliminates heap allocation per fire/cancel.
      64                 :     4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
      65                 :     5. might_have_pending_waits_ flag — skips lock when no wait issued.
      66                 :     6. Thread-local waiter cache — single-slot per-thread cache.
      67                 : 
      68                 :     Concurrency
      69                 :     -----------
      70                 :     stop_token callbacks can fire from any thread. The impl_
      71                 :     pointer on waiter_node is used as a "still in list" marker.
      72                 : */
      73                 : 
      74                 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
      75                 : 
      76                 : inline void timer_service_invalidate_cache() noexcept;
      77                 : 
      78                 : // timer_service class body — member function definitions are
      79                 : // out-of-class (after implementation and waiter_node are complete)
      80                 : class BOOST_COROSIO_DECL timer_service final
      81                 :     : public capy::execution_context::service
      82                 :     , public io_object::io_service
      83                 : {
      84                 : public:
      85                 :     using clock_type = std::chrono::steady_clock;
      86                 :     using time_point = clock_type::time_point;
      87                 : 
      88                 :     /// Type-erased callback for earliest-expiry-changed notifications.
      89                 :     class callback
      90                 :     {
      91                 :         void* ctx_         = nullptr;
      92                 :         void (*fn_)(void*) = nullptr;
      93                 : 
      94                 :     public:
      95                 :         /// Construct an empty callback.
      96 HIT        1244 :         callback() = default;
      97                 : 
      98                 :         /// Construct a callback with the given context and function.
      99            1244 :         callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
     100                 : 
     101                 :         /// Return true if the callback is non-empty.
     102                 :         explicit operator bool() const noexcept
     103                 :         {
     104                 :             return fn_ != nullptr;
     105                 :         }
     106                 : 
     107                 :         /// Invoke the callback.
     108            8687 :         void operator()() const
     109                 :         {
     110            8687 :             if (fn_)
     111            8687 :                 fn_(ctx_);
     112            8687 :         }
     113                 :     };
     114                 : 
     115                 :     struct implementation;
     116                 : 
     117                 : private:
     118                 :     struct heap_entry
     119                 :     {
     120                 :         time_point time_;
     121                 :         implementation* timer_;
     122                 :     };
     123                 : 
     124                 :     scheduler* sched_ = nullptr;
     125                 :     BOOST_COROSIO_MSVC_WARNING_PUSH
     126                 :     BOOST_COROSIO_MSVC_WARNING_DISABLE(4251) // std:: members, dll-interface
     127                 :     mutable std::mutex mutex_;
     128                 :     std::vector<heap_entry> heap_;
     129                 :     implementation* free_list_     = nullptr;
     130                 :     waiter_node* waiter_free_list_ = nullptr;
     131                 :     callback on_earliest_changed_;
     132                 :     bool shutting_down_ = false;
     133                 :     // Avoids mutex in nearest_expiry() and empty()
     134                 :     mutable std::atomic<std::int64_t> cached_nearest_ns_{
     135                 :         (std::numeric_limits<std::int64_t>::max)()};
     136                 :     BOOST_COROSIO_MSVC_WARNING_POP
     137                 : 
     138                 : public:
     139                 :     /// Construct the timer service bound to a scheduler.
     140            1244 :     inline timer_service(capy::execution_context&, scheduler& sched)
     141            1244 :         : sched_(&sched)
     142                 :     {
     143            1244 :     }
     144                 : 
     145                 :     /// Return the associated scheduler.
     146           17476 :     inline scheduler& get_scheduler() noexcept
     147                 :     {
     148           17476 :         return *sched_;
     149                 :     }
     150                 : 
     151                 :     /// Destroy the timer service.
     152            2488 :     ~timer_service() override = default;
     153                 : 
     154                 :     timer_service(timer_service const&)            = delete;
     155                 :     timer_service& operator=(timer_service const&) = delete;
     156                 : 
     157                 :     /// Register a callback invoked when the earliest expiry changes.
     158            1244 :     inline void set_on_earliest_changed(callback cb)
     159                 :     {
     160            1244 :         on_earliest_changed_ = cb;
     161            1244 :     }
     162                 : 
     163                 :     /// Return true if no timers are in the heap.
     164                 :     inline bool empty() const noexcept
     165                 :     {
     166                 :         return cached_nearest_ns_.load(std::memory_order_acquire) ==
     167                 :             (std::numeric_limits<std::int64_t>::max)();
     168                 :     }
     169                 : 
     170                 :     /// Return the nearest timer expiry without acquiring the mutex.
     171          237189 :     inline time_point nearest_expiry() const noexcept
     172                 :     {
     173          237189 :         auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
     174          237189 :         return time_point(time_point::duration(ns));
     175                 :     }
     176                 : 
     177                 :     /// Cancel all pending timers and free cached resources.
     178                 :     inline void shutdown() override;
     179                 : 
     180                 :     /// Construct a new timer implementation.
     181                 :     inline io_object::implementation* construct() override;
     182                 : 
     183                 :     /// Destroy a timer implementation, cancelling pending waiters.
     184                 :     inline void destroy(io_object::implementation* p) override;
     185                 : 
     186                 :     /// Cancel and recycle a timer implementation.
     187                 :     inline void destroy_impl(implementation& impl);
     188                 : 
     189                 :     /// Create or recycle a waiter node.
     190                 :     inline waiter_node* create_waiter();
     191                 : 
     192                 :     /// Return a waiter node to the cache or free list.
     193                 :     inline void destroy_waiter(waiter_node* w);
     194                 : 
     195                 :     /// Update the timer expiry, cancelling existing waiters.
     196                 :     inline std::size_t update_timer(implementation& impl, time_point new_time);
     197                 : 
     198                 :     /// Insert a waiter into the timer's waiter list and the heap.
     199                 :     inline void insert_waiter(implementation& impl, waiter_node* w);
     200                 : 
     201                 :     /// Cancel all waiters on a timer.
     202                 :     inline std::size_t cancel_timer(implementation& impl);
     203                 : 
     204                 :     /// Cancel a single waiter ( stop_token callback path ).
     205                 :     inline void cancel_waiter(waiter_node* w);
     206                 : 
     207                 :     /// Cancel one waiter on a timer.
     208                 :     inline std::size_t cancel_one_waiter(implementation& impl);
     209                 : 
     210                 :     /// Complete all waiters whose timers have expired.
     211                 :     inline std::size_t process_expired();
     212                 : 
     213                 : private:
     214          265909 :     inline void refresh_cached_nearest() noexcept
     215                 :     {
     216          265909 :         auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
     217          264504 :                                 : heap_[0].time_.time_since_epoch().count();
     218          265909 :         cached_nearest_ns_.store(ns, std::memory_order_release);
     219          265909 :     }
     220                 : 
     221                 :     inline void remove_timer_impl(implementation& impl);
     222                 :     inline void up_heap(std::size_t index);
     223                 :     inline void down_heap(std::size_t index);
     224                 :     inline void swap_heap(std::size_t i1, std::size_t i2);
     225                 : };
     226                 : 
     227                 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
     228                 :     : intrusive_list<waiter_node>::node
     229                 : {
     230                 :     // Embedded completion op — avoids heap allocation per fire/cancel
     231                 :     struct completion_op final : scheduler_op
     232                 :     {
     233                 :         waiter_node* waiter_ = nullptr;
     234                 : 
     235                 :         static void do_complete(
     236                 :             void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
     237                 : 
     238             316 :         completion_op() noexcept : scheduler_op(&do_complete) {}
     239                 : 
     240                 :         void operator()() override;
     241                 :         void destroy() override;
     242                 :     };
     243                 : 
     244                 :     // Per-waiter stop_token cancellation
     245                 :     struct canceller
     246                 :     {
     247                 :         waiter_node* waiter_;
     248                 :         void operator()() const;
     249                 :     };
     250                 : 
     251                 :     // nullptr once removed from timer's waiter list (concurrency marker)
     252                 :     timer_service::implementation* impl_ = nullptr;
     253                 :     timer_service* svc_                  = nullptr;
     254                 :     std::coroutine_handle<> h_;
     255                 :     capy::continuation* cont_            = nullptr;
     256                 :     capy::executor_ref d_;
     257                 :     std::error_code* ec_out_ = nullptr;
     258                 :     std::stop_token token_;
     259                 :     std::optional<std::stop_callback<canceller>> stop_cb_;
     260                 :     completion_op op_;
     261                 :     std::error_code ec_value_;
     262                 :     waiter_node* next_free_ = nullptr;
     263                 : 
     264             316 :     waiter_node() noexcept
     265             316 :     {
     266             316 :         op_.waiter_ = this;
     267             316 :     }
     268                 : };
     269                 : 
     270                 : struct timer_service::implementation final : timer::implementation
     271                 : {
     272                 :     using clock_type = std::chrono::steady_clock;
     273                 :     using time_point = clock_type::time_point;
     274                 :     using duration   = clock_type::duration;
     275                 : 
     276                 :     timer_service* svc_ = nullptr;
     277                 :     intrusive_list<waiter_node> waiters_;
     278                 : 
     279                 :     // Free list linkage (reused when impl is on free_list)
     280                 :     implementation* next_free_ = nullptr;
     281                 : 
     282                 :     inline explicit implementation(timer_service& svc) noexcept;
     283                 : 
     284                 :     inline std::coroutine_handle<> wait(
     285                 :         std::coroutine_handle<>,
     286                 :         capy::executor_ref,
     287                 :         std::stop_token,
     288                 :         std::error_code*,
     289                 :         capy::continuation*) override;
     290                 : };
     291                 : 
     292                 : // Thread-local caches avoid hot-path mutex acquisitions:
     293                 : // 1. Impl cache — single-slot, validated by comparing svc_
     294                 : // 2. Waiter cache — single-slot, no service affinity
     295                 : // All caches are cleared by timer_service_invalidate_cache() during shutdown.
     296                 : 
     297                 : inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
     298                 : inline thread_local_ptr<waiter_node> tl_cached_waiter;
     299                 : 
     300                 : inline timer_service::implementation*
     301            8959 : try_pop_tl_cache(timer_service* svc) noexcept
     302                 : {
     303            8959 :     auto* impl = tl_cached_impl.get();
     304            8959 :     if (impl)
     305                 :     {
     306            8562 :         tl_cached_impl.set(nullptr);
     307            8562 :         if (impl->svc_ == svc)
     308            8562 :             return impl;
     309                 :         // Stale impl from a destroyed service
     310 MIS           0 :         delete impl;
     311                 :     }
     312 HIT         397 :     return nullptr;
     313                 : }
     314                 : 
     315                 : inline bool
     316            8951 : try_push_tl_cache(timer_service::implementation* impl) noexcept
     317                 : {
     318            8951 :     if (!tl_cached_impl.get())
     319                 :     {
     320            8839 :         tl_cached_impl.set(impl);
     321            8839 :         return true;
     322                 :     }
     323             112 :     return false;
     324                 : }
     325                 : 
     326                 : inline waiter_node*
     327            8742 : try_pop_waiter_tl_cache() noexcept
     328                 : {
     329            8742 :     auto* w = tl_cached_waiter.get();
     330            8742 :     if (w)
     331                 :     {
     332            8424 :         tl_cached_waiter.set(nullptr);
     333            8424 :         return w;
     334                 :     }
     335             318 :     return nullptr;
     336                 : }
     337                 : 
     338                 : inline bool
     339            8726 : try_push_waiter_tl_cache(waiter_node* w) noexcept
     340                 : {
     341            8726 :     if (!tl_cached_waiter.get())
     342                 :     {
     343            8640 :         tl_cached_waiter.set(w);
     344            8640 :         return true;
     345                 :     }
     346              86 :     return false;
     347                 : }
     348                 : 
     349                 : inline void
     350            1244 : timer_service_invalidate_cache() noexcept
     351                 : {
     352            1244 :     delete tl_cached_impl.get();
     353            1244 :     tl_cached_impl.set(nullptr);
     354                 : 
     355            1244 :     delete tl_cached_waiter.get();
     356            1244 :     tl_cached_waiter.set(nullptr);
     357            1244 : }
     358                 : 
     359                 : // timer_service out-of-class member function definitions
     360                 : 
     361             397 : inline timer_service::implementation::implementation(
     362             397 :     timer_service& svc) noexcept
     363             397 :     : svc_(&svc)
     364                 : {
     365             397 : }
     366                 : 
     367                 : inline void
     368            1244 : timer_service::shutdown()
     369                 : {
     370            1244 :     timer_service_invalidate_cache();
     371            1244 :     shutting_down_ = true;
     372                 : 
     373                 :     // Snapshot impls and detach them from the heap so that
     374                 :     // coroutine-owned timer destructors (triggered by h.destroy()
     375                 :     // below) cannot re-enter remove_timer_impl() and mutate the
     376                 :     // vector during iteration.
     377            1244 :     std::vector<implementation*> impls;
     378            1244 :     impls.reserve(heap_.size());
     379            1252 :     for (auto& entry : heap_)
     380                 :     {
     381               8 :         entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     382               8 :         impls.push_back(entry.timer_);
     383                 :     }
     384            1244 :     heap_.clear();
     385            1244 :     cached_nearest_ns_.store(
     386                 :         (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
     387                 : 
     388                 :     // Cancel waiting timers. Each waiter called work_started()
     389                 :     // in implementation::wait(). On IOCP the scheduler shutdown
     390                 :     // loop exits when outstanding_work_ reaches zero, so we must
     391                 :     // call work_finished() here to balance it. On other backends
     392                 :     // this is harmless.
     393            1252 :     for (auto* impl : impls)
     394                 :     {
     395              16 :         while (auto* w = impl->waiters_.pop_front())
     396                 :         {
     397               8 :             w->stop_cb_.reset();
     398               8 :             auto h = std::exchange(w->h_, {});
     399               8 :             sched_->work_finished();
     400               8 :             if (h)
     401               8 :                 h.destroy();
     402               8 :             delete w;
     403               8 :         }
     404               8 :         delete impl;
     405                 :     }
     406                 : 
     407                 :     // Delete free-listed impls
     408            1356 :     while (free_list_)
     409                 :     {
     410             112 :         auto* next = free_list_->next_free_;
     411             112 :         delete free_list_;
     412             112 :         free_list_ = next;
     413                 :     }
     414                 : 
     415                 :     // Delete free-listed waiters
     416            1328 :     while (waiter_free_list_)
     417                 :     {
     418              84 :         auto* next = waiter_free_list_->next_free_;
     419              84 :         delete waiter_free_list_;
     420              84 :         waiter_free_list_ = next;
     421                 :     }
     422            1244 : }
     423                 : 
     424                 : inline io_object::implementation*
     425            8959 : timer_service::construct()
     426                 : {
     427            8959 :     implementation* impl = try_pop_tl_cache(this);
     428            8959 :     if (impl)
     429                 :     {
     430            8562 :         impl->svc_        = this;
     431            8562 :         impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     432            8562 :         impl->might_have_pending_waits_ = false;
     433            8562 :         return impl;
     434                 :     }
     435                 : 
     436             397 :     std::lock_guard lock(mutex_);
     437             397 :     if (free_list_)
     438                 :     {
     439 MIS           0 :         impl              = free_list_;
     440               0 :         free_list_        = impl->next_free_;
     441               0 :         impl->next_free_  = nullptr;
     442               0 :         impl->svc_        = this;
     443               0 :         impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     444               0 :         impl->might_have_pending_waits_ = false;
     445                 :     }
     446                 :     else
     447                 :     {
     448 HIT         397 :         impl = new implementation(*this);
     449                 :     }
     450             397 :     return impl;
     451             397 : }
     452                 : 
     453                 : inline void
     454            8957 : timer_service::destroy(io_object::implementation* p)
     455                 : {
     456            8957 :     destroy_impl(static_cast<implementation&>(*p));
     457            8957 : }
     458                 : 
     459                 : inline void
     460            8957 : timer_service::destroy_impl(implementation& impl)
     461                 : {
     462                 :     // During shutdown the impl is owned by the shutdown loop.
     463                 :     // Re-entering here (from a coroutine-owned timer destructor
     464                 :     // triggered by h.destroy()) must not modify the heap or
     465                 :     // recycle the impl — shutdown deletes it directly.
     466            8957 :     if (shutting_down_)
     467            8845 :         return;
     468                 : 
     469            8951 :     cancel_timer(impl);
     470                 : 
     471            8951 :     if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
     472                 :     {
     473 MIS           0 :         std::lock_guard lock(mutex_);
     474               0 :         remove_timer_impl(impl);
     475               0 :         refresh_cached_nearest();
     476               0 :     }
     477                 : 
     478 HIT        8951 :     if (try_push_tl_cache(&impl))
     479            8839 :         return;
     480                 : 
     481             112 :     std::lock_guard lock(mutex_);
     482             112 :     impl.next_free_ = free_list_;
     483             112 :     free_list_      = &impl;
     484             112 : }
     485                 : 
     486                 : inline waiter_node*
     487            8742 : timer_service::create_waiter()
     488                 : {
     489            8742 :     if (auto* w = try_pop_waiter_tl_cache())
     490            8424 :         return w;
     491                 : 
     492             318 :     std::lock_guard lock(mutex_);
     493             318 :     if (waiter_free_list_)
     494                 :     {
     495               2 :         auto* w           = waiter_free_list_;
     496               2 :         waiter_free_list_ = w->next_free_;
     497               2 :         w->next_free_     = nullptr;
     498               2 :         return w;
     499                 :     }
     500                 : 
     501             316 :     return new waiter_node();
     502             318 : }
     503                 : 
     504                 : inline void
     505            8726 : timer_service::destroy_waiter(waiter_node* w)
     506                 : {
     507            8726 :     if (try_push_waiter_tl_cache(w))
     508            8640 :         return;
     509                 : 
     510              86 :     std::lock_guard lock(mutex_);
     511              86 :     w->next_free_     = waiter_free_list_;
     512              86 :     waiter_free_list_ = w;
     513              86 : }
     514                 : 
     515                 : inline std::size_t
     516               8 : timer_service::update_timer(implementation& impl, time_point new_time)
     517                 : {
     518                 :     bool in_heap =
     519               8 :         (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
     520               8 :     if (!in_heap && impl.waiters_.empty())
     521 MIS           0 :         return 0;
     522                 : 
     523 HIT           8 :     bool notify = false;
     524               8 :     intrusive_list<waiter_node> canceled;
     525                 : 
     526                 :     {
     527               8 :         std::lock_guard lock(mutex_);
     528                 : 
     529              20 :         while (auto* w = impl.waiters_.pop_front())
     530                 :         {
     531              12 :             w->impl_ = nullptr;
     532              12 :             canceled.push_back(w);
     533              12 :         }
     534                 : 
     535               8 :         if (impl.heap_index_ < heap_.size())
     536                 :         {
     537               8 :             time_point old_time           = heap_[impl.heap_index_].time_;
     538               8 :             heap_[impl.heap_index_].time_ = new_time;
     539                 : 
     540               8 :             if (new_time < old_time)
     541               6 :                 up_heap(impl.heap_index_);
     542                 :             else
     543               2 :                 down_heap(impl.heap_index_);
     544                 : 
     545               8 :             notify = (impl.heap_index_ == 0);
     546                 :         }
     547                 : 
     548               8 :         refresh_cached_nearest();
     549               8 :     }
     550                 : 
     551               8 :     std::size_t count = 0;
     552              20 :     while (auto* w = canceled.pop_front())
     553                 :     {
     554              12 :         w->ec_value_ = make_error_code(capy::error::canceled);
     555              12 :         sched_->post(&w->op_);
     556              12 :         ++count;
     557              12 :     }
     558                 : 
     559               8 :     if (notify)
     560               6 :         on_earliest_changed_();
     561                 : 
     562               8 :     return count;
     563                 : }
     564                 : 
     565                 : inline void
     566            8742 : timer_service::insert_waiter(implementation& impl, waiter_node* w)
     567                 : {
     568            8742 :     bool notify = false;
     569                 :     {
     570            8742 :         std::lock_guard lock(mutex_);
     571            8742 :         if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
     572                 :         {
     573            8720 :             impl.heap_index_ = heap_.size();
     574            8720 :             heap_.push_back({impl.expiry_, &impl});
     575            8720 :             up_heap(heap_.size() - 1);
     576            8720 :             notify = (impl.heap_index_ == 0);
     577            8720 :             refresh_cached_nearest();
     578                 :         }
     579            8742 :         impl.waiters_.push_back(w);
     580            8742 :     }
     581            8742 :     if (notify)
     582            8681 :         on_earliest_changed_();
     583            8742 : }
     584                 : 
     585                 : inline std::size_t
     586            8959 : timer_service::cancel_timer(implementation& impl)
     587                 : {
     588            8959 :     if (!impl.might_have_pending_waits_)
     589            8933 :         return 0;
     590                 : 
     591                 :     // Not in heap and no waiters — just clear the flag
     592              26 :     if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
     593 MIS           0 :         impl.waiters_.empty())
     594                 :     {
     595               0 :         impl.might_have_pending_waits_ = false;
     596               0 :         return 0;
     597                 :     }
     598                 : 
     599 HIT          26 :     intrusive_list<waiter_node> canceled;
     600                 : 
     601                 :     {
     602              26 :         std::lock_guard lock(mutex_);
     603              26 :         remove_timer_impl(impl);
     604              54 :         while (auto* w = impl.waiters_.pop_front())
     605                 :         {
     606              28 :             w->impl_ = nullptr;
     607              28 :             canceled.push_back(w);
     608              28 :         }
     609              26 :         refresh_cached_nearest();
     610              26 :     }
     611                 : 
     612              26 :     impl.might_have_pending_waits_ = false;
     613                 : 
     614              26 :     std::size_t count = 0;
     615              54 :     while (auto* w = canceled.pop_front())
     616                 :     {
     617              28 :         w->ec_value_ = make_error_code(capy::error::canceled);
     618              28 :         sched_->post(&w->op_);
     619              28 :         ++count;
     620              28 :     }
     621                 : 
     622              26 :     return count;
     623                 : }
     624                 : 
     625                 : inline void
     626              36 : timer_service::cancel_waiter(waiter_node* w)
     627                 : {
     628                 :     {
     629              36 :         std::lock_guard lock(mutex_);
     630                 :         // Already removed by cancel_timer or process_expired
     631              36 :         if (!w->impl_)
     632 MIS           0 :             return;
     633 HIT          36 :         auto* impl = w->impl_;
     634              36 :         w->impl_   = nullptr;
     635              36 :         impl->waiters_.remove(w);
     636              36 :         if (impl->waiters_.empty())
     637                 :         {
     638              34 :             remove_timer_impl(*impl);
     639              34 :             impl->might_have_pending_waits_ = false;
     640                 :         }
     641              36 :         refresh_cached_nearest();
     642              36 :     }
     643                 : 
     644              36 :     w->ec_value_ = make_error_code(capy::error::canceled);
     645              36 :     sched_->post(&w->op_);
     646                 : }
     647                 : 
     648                 : inline std::size_t
     649               2 : timer_service::cancel_one_waiter(implementation& impl)
     650                 : {
     651               2 :     if (!impl.might_have_pending_waits_)
     652 MIS           0 :         return 0;
     653                 : 
     654 HIT           2 :     waiter_node* w = nullptr;
     655                 : 
     656                 :     {
     657               2 :         std::lock_guard lock(mutex_);
     658               2 :         w = impl.waiters_.pop_front();
     659               2 :         if (!w)
     660 MIS           0 :             return 0;
     661 HIT           2 :         w->impl_ = nullptr;
     662               2 :         if (impl.waiters_.empty())
     663                 :         {
     664 MIS           0 :             remove_timer_impl(impl);
     665               0 :             impl.might_have_pending_waits_ = false;
     666                 :         }
     667 HIT           2 :         refresh_cached_nearest();
     668               2 :     }
     669                 : 
     670               2 :     w->ec_value_ = make_error_code(capy::error::canceled);
     671               2 :     sched_->post(&w->op_);
     672               2 :     return 1;
     673                 : }
     674                 : 
     675                 : inline std::size_t
     676          257117 : timer_service::process_expired()
     677                 : {
     678          257117 :     intrusive_list<waiter_node> expired;
     679                 : 
     680                 :     {
     681          257117 :         std::lock_guard lock(mutex_);
     682          257117 :         auto now = clock_type::now();
     683                 : 
     684          265769 :         while (!heap_.empty() && heap_[0].time_ <= now)
     685                 :         {
     686            8652 :             implementation* t = heap_[0].timer_;
     687            8652 :             remove_timer_impl(*t);
     688           17308 :             while (auto* w = t->waiters_.pop_front())
     689                 :             {
     690            8656 :                 w->impl_     = nullptr;
     691            8656 :                 w->ec_value_ = {};
     692            8656 :                 expired.push_back(w);
     693            8656 :             }
     694            8652 :             t->might_have_pending_waits_ = false;
     695                 :         }
     696                 : 
     697          257117 :         refresh_cached_nearest();
     698          257117 :     }
     699                 : 
     700          257117 :     std::size_t count = 0;
     701          265773 :     while (auto* w = expired.pop_front())
     702                 :     {
     703            8656 :         sched_->post(&w->op_);
     704            8656 :         ++count;
     705            8656 :     }
     706                 : 
     707          257117 :     return count;
     708                 : }
     709                 : 
     710                 : inline void
     711            8712 : timer_service::remove_timer_impl(implementation& impl)
     712                 : {
     713            8712 :     std::size_t index = impl.heap_index_;
     714            8712 :     if (index >= heap_.size())
     715 MIS           0 :         return; // Not in heap
     716                 : 
     717 HIT        8712 :     if (index == heap_.size() - 1)
     718                 :     {
     719                 :         // Last element, just pop
     720             279 :         impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     721             279 :         heap_.pop_back();
     722                 :     }
     723                 :     else
     724                 :     {
     725                 :         // Swap with last and reheapify
     726            8433 :         swap_heap(index, heap_.size() - 1);
     727            8433 :         impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     728            8433 :         heap_.pop_back();
     729                 : 
     730            8433 :         if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
     731 MIS           0 :             up_heap(index);
     732                 :         else
     733 HIT        8433 :             down_heap(index);
     734                 :     }
     735                 : }
     736                 : 
     737                 : inline void
     738            8726 : timer_service::up_heap(std::size_t index)
     739                 : {
     740           17132 :     while (index > 0)
     741                 :     {
     742            8445 :         std::size_t parent = (index - 1) / 2;
     743            8445 :         if (!(heap_[index].time_ < heap_[parent].time_))
     744              39 :             break;
     745            8406 :         swap_heap(index, parent);
     746            8406 :         index = parent;
     747                 :     }
     748            8726 : }
     749                 : 
     750                 : inline void
     751            8435 : timer_service::down_heap(std::size_t index)
     752                 : {
     753            8435 :     std::size_t child = index * 2 + 1;
     754            8439 :     while (child < heap_.size())
     755                 :     {
     756              10 :         std::size_t min_child = (child + 1 == heap_.size() ||
     757               2 :                                  heap_[child].time_ < heap_[child + 1].time_)
     758              12 :             ? child
     759              10 :             : child + 1;
     760                 : 
     761              10 :         if (heap_[index].time_ < heap_[min_child].time_)
     762               6 :             break;
     763                 : 
     764               4 :         swap_heap(index, min_child);
     765               4 :         index = min_child;
     766               4 :         child = index * 2 + 1;
     767                 :     }
     768            8435 : }
     769                 : 
     770                 : inline void
     771           16843 : timer_service::swap_heap(std::size_t i1, std::size_t i2)
     772                 : {
     773           16843 :     heap_entry tmp                = heap_[i1];
     774           16843 :     heap_[i1]                     = heap_[i2];
     775           16843 :     heap_[i2]                     = tmp;
     776           16843 :     heap_[i1].timer_->heap_index_ = i1;
     777           16843 :     heap_[i2].timer_->heap_index_ = i2;
     778           16843 : }
     779                 : 
     780                 : // waiter_node out-of-class member function definitions
     781                 : 
     782                 : inline void
     783              36 : waiter_node::canceller::operator()() const
     784                 : {
     785              36 :     waiter_->svc_->cancel_waiter(waiter_);
     786              36 : }
     787                 : 
     788                 : inline void
     789 MIS           0 : waiter_node::completion_op::do_complete(
     790                 :     [[maybe_unused]] void* owner,
     791                 :     scheduler_op* base,
     792                 :     std::uint32_t,
     793                 :     std::uint32_t)
     794                 : {
     795                 :     // owner is always non-null here. The destroy path (owner == nullptr)
     796                 :     // is unreachable because completion_op overrides destroy() directly,
     797                 :     // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
     798               0 :     BOOST_COROSIO_ASSERT(owner);
     799               0 :     static_cast<completion_op*>(base)->operator()();
     800               0 : }
     801                 : 
     802                 : inline void
     803 HIT        8726 : waiter_node::completion_op::operator()()
     804                 : {
     805            8726 :     auto* w = waiter_;
     806            8726 :     w->stop_cb_.reset();
     807            8726 :     if (w->ec_out_)
     808            8726 :         *w->ec_out_ = w->ec_value_;
     809                 : 
     810            8726 :     auto* cont  = w->cont_;
     811            8726 :     auto d      = w->d_;
     812            8726 :     auto* svc   = w->svc_;
     813            8726 :     auto& sched = svc->get_scheduler();
     814                 : 
     815            8726 :     svc->destroy_waiter(w);
     816                 : 
     817            8726 :     d.post(*cont);
     818            8726 :     sched.work_finished();
     819            8726 : }
     820                 : 
     821                 : // GCC 14 false-positive: inlining ~optional<stop_callback> through
     822                 : // delete loses track that stop_cb_ was already .reset() above.
     823                 : #if defined(__GNUC__) && !defined(__clang__)
     824                 : #pragma GCC diagnostic push
     825                 : #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
     826                 : #endif
     827                 : inline void
     828               8 : waiter_node::completion_op::destroy()
     829                 : {
     830                 :     // Called during scheduler shutdown drain when this completion_op is
     831                 :     // in the scheduler's ready queue (posted by cancel_timer() or
     832                 :     // process_expired()). Balances the work_started() from
     833                 :     // implementation::wait(). The scheduler drain loop separately
     834                 :     // balances the work_started() from post(). On IOCP both decrements
     835                 :     // are required for outstanding_work_ to reach zero; on other
     836                 :     // backends this is harmless.
     837                 :     //
     838                 :     // This override also prevents scheduler_op::destroy() from calling
     839                 :     // do_complete(nullptr, ...). See also: timer_service::shutdown()
     840                 :     // which drains waiters still in the timer heap (the other path).
     841               8 :     auto* w = waiter_;
     842               8 :     w->stop_cb_.reset();
     843               8 :     auto h      = std::exchange(w->h_, {});
     844               8 :     auto& sched = w->svc_->get_scheduler();
     845               8 :     delete w;
     846               8 :     sched.work_finished();
     847               8 :     if (h)
     848               8 :         h.destroy();
     849               8 : }
     850                 : #if defined(__GNUC__) && !defined(__clang__)
     851                 : #pragma GCC diagnostic pop
     852                 : #endif
     853                 : 
     854                 : inline std::coroutine_handle<>
     855            8742 : timer_service::implementation::wait(
     856                 :     std::coroutine_handle<> h,
     857                 :     capy::executor_ref d,
     858                 :     std::stop_token token,
     859                 :     std::error_code* ec,
     860                 :     capy::continuation* cont)
     861                 : {
     862                 :     // Already-expired fast path — no waiter_node, no mutex.
     863                 :     // Post instead of dispatch so the coroutine yields to the
     864                 :     // scheduler, allowing other queued work to run.
     865            8742 :     if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
     866                 :     {
     867            8720 :         if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
     868                 :         {
     869 MIS           0 :             if (ec)
     870               0 :                 *ec = {};
     871               0 :             d.post(*cont);
     872               0 :             return std::noop_coroutine();
     873                 :         }
     874                 :     }
     875                 : 
     876 HIT        8742 :     auto* w    = svc_->create_waiter();
     877            8742 :     w->impl_   = this;
     878            8742 :     w->svc_    = svc_;
     879            8742 :     w->h_      = h;
     880            8742 :     w->cont_   = cont;
     881            8742 :     w->d_      = d;
     882            8742 :     w->token_  = std::move(token);
     883            8742 :     w->ec_out_ = ec;
     884                 : 
     885            8742 :     svc_->insert_waiter(*this, w);
     886            8742 :     might_have_pending_waits_ = true;
     887            8742 :     svc_->get_scheduler().work_started();
     888                 : 
     889            8742 :     if (w->token_.stop_possible())
     890              60 :         w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
     891                 : 
     892            8742 :     return std::noop_coroutine();
     893                 : }
     894                 : 
     895                 : // Free functions
     896                 : 
     897                 : inline std::size_t
     898               8 : timer_service_update_expiry(timer::implementation& base)
     899                 : {
     900               8 :     auto& impl = static_cast<timer_service::implementation&>(base);
     901               8 :     return impl.svc_->update_timer(impl, impl.expiry_);
     902                 : }
     903                 : 
     904                 : inline std::size_t
     905               8 : timer_service_cancel(timer::implementation& base) noexcept
     906                 : {
     907               8 :     auto& impl = static_cast<timer_service::implementation&>(base);
     908               8 :     return impl.svc_->cancel_timer(impl);
     909                 : }
     910                 : 
     911                 : inline std::size_t
     912               2 : timer_service_cancel_one(timer::implementation& base) noexcept
     913                 : {
     914               2 :     auto& impl = static_cast<timer_service::implementation&>(base);
     915               2 :     return impl.svc_->cancel_one_waiter(impl);
     916                 : }
     917                 : 
     918                 : inline timer_service&
     919            1244 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
     920                 : {
     921            1244 :     return ctx.make_service<timer_service>(sched);
     922                 : }
     923                 : 
     924                 : } // namespace boost::corosio::detail
     925                 : 
     926                 : #endif
        

Generated by: LCOV version 2.3