91.83% Lines (337/367) 97.62% Functions (41/42)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // Copyright (c) 2026 Steve Gerbino 3   // Copyright (c) 2026 Steve Gerbino
4   // 4   //
5   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7   // 7   //
8   // Official repository: https://github.com/cppalliance/corosio 8   // Official repository: https://github.com/cppalliance/corosio
9   // 9   //
10   10  
11   #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP 11   #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12   #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP 12   #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13   13  
14   #include <boost/corosio/timer.hpp> 14   #include <boost/corosio/timer.hpp>
15   #include <boost/corosio/detail/scheduler.hpp> 15   #include <boost/corosio/detail/scheduler.hpp>
16   #include <boost/corosio/detail/scheduler_op.hpp> 16   #include <boost/corosio/detail/scheduler_op.hpp>
17   #include <boost/corosio/detail/intrusive.hpp> 17   #include <boost/corosio/detail/intrusive.hpp>
18   #include <boost/corosio/detail/thread_local_ptr.hpp> 18   #include <boost/corosio/detail/thread_local_ptr.hpp>
19   #include <boost/capy/error.hpp> 19   #include <boost/capy/error.hpp>
20   #include <boost/capy/ex/execution_context.hpp> 20   #include <boost/capy/ex/execution_context.hpp>
21   #include <boost/capy/ex/executor_ref.hpp> 21   #include <boost/capy/ex/executor_ref.hpp>
22   #include <system_error> 22   #include <system_error>
23   23  
24   #include <atomic> 24   #include <atomic>
25   #include <chrono> 25   #include <chrono>
26   #include <coroutine> 26   #include <coroutine>
27   #include <cstddef> 27   #include <cstddef>
28   #include <limits> 28   #include <limits>
29   #include <mutex> 29   #include <mutex>
30   #include <optional> 30   #include <optional>
31   #include <stop_token> 31   #include <stop_token>
32   #include <utility> 32   #include <utility>
33   #include <vector> 33   #include <vector>
34   34  
35   namespace boost::corosio::detail { 35   namespace boost::corosio::detail {
36   36  
37   struct scheduler; 37   struct scheduler;
38   38  
39   /* 39   /*
40   Timer Service 40   Timer Service
41   ============= 41   =============
42   42  
43   Data Structures 43   Data Structures
44   --------------- 44   ---------------
45   waiter_node holds per-waiter state: coroutine handle, executor, 45   waiter_node holds per-waiter state: coroutine handle, executor,
46   error output, stop_token, embedded completion_op. Each concurrent 46   error output, stop_token, embedded completion_op. Each concurrent
47   co_await t.wait() allocates one waiter_node. 47   co_await t.wait() allocates one waiter_node.
48   48  
49   timer_service::implementation holds per-timer state: expiry, 49   timer_service::implementation holds per-timer state: expiry,
50   heap index, and an intrusive_list of waiter_nodes. Multiple 50   heap index, and an intrusive_list of waiter_nodes. Multiple
51   coroutines can wait on the same timer simultaneously. 51   coroutines can wait on the same timer simultaneously.
52   52  
53   timer_service owns a min-heap of active timers, a free list 53   timer_service owns a min-heap of active timers, a free list
54   of recycled impls, and a free list of recycled waiter_nodes. The 54   of recycled impls, and a free list of recycled waiter_nodes. The
55   heap is ordered by expiry time; the scheduler queries 55   heap is ordered by expiry time; the scheduler queries
56   nearest_expiry() to set the epoll/timerfd timeout. 56   nearest_expiry() to set the epoll/timerfd timeout.
57   57  
58   Optimization Strategy 58   Optimization Strategy
59   --------------------- 59   ---------------------
60   1. Deferred heap insertion — expires_after() stores the expiry 60   1. Deferred heap insertion — expires_after() stores the expiry
61   but does not insert into the heap. Insertion happens in wait(). 61   but does not insert into the heap. Insertion happens in wait().
62   2. Thread-local impl cache — single-slot per-thread cache. 62   2. Thread-local impl cache — single-slot per-thread cache.
63   3. Embedded completion_op — eliminates heap allocation per fire/cancel. 63   3. Embedded completion_op — eliminates heap allocation per fire/cancel.
64   4. Cached nearest expiry — atomic avoids mutex in nearest_expiry(). 64   4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
65   5. might_have_pending_waits_ flag — skips lock when no wait issued. 65   5. might_have_pending_waits_ flag — skips lock when no wait issued.
66   6. Thread-local waiter cache — single-slot per-thread cache. 66   6. Thread-local waiter cache — single-slot per-thread cache.
67   67  
68   Concurrency 68   Concurrency
69   ----------- 69   -----------
70   stop_token callbacks can fire from any thread. The impl_ 70   stop_token callbacks can fire from any thread. The impl_
71   pointer on waiter_node is used as a "still in list" marker. 71   pointer on waiter_node is used as a "still in list" marker.
72   */ 72   */
73   73  
74   struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node; 74   struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
75   75  
76   inline void timer_service_invalidate_cache() noexcept; 76   inline void timer_service_invalidate_cache() noexcept;
77   77  
78   // timer_service class body — member function definitions are 78   // timer_service class body — member function definitions are
79   // out-of-class (after implementation and waiter_node are complete) 79   // out-of-class (after implementation and waiter_node are complete)
80   class BOOST_COROSIO_DECL timer_service final 80   class BOOST_COROSIO_DECL timer_service final
81   : public capy::execution_context::service 81   : public capy::execution_context::service
82   , public io_object::io_service 82   , public io_object::io_service
83   { 83   {
84   public: 84   public:
85   using clock_type = std::chrono::steady_clock; 85   using clock_type = std::chrono::steady_clock;
86   using time_point = clock_type::time_point; 86   using time_point = clock_type::time_point;
87   87  
88   /// Type-erased callback for earliest-expiry-changed notifications. 88   /// Type-erased callback for earliest-expiry-changed notifications.
89   class callback 89   class callback
90   { 90   {
91   void* ctx_ = nullptr; 91   void* ctx_ = nullptr;
92   void (*fn_)(void*) = nullptr; 92   void (*fn_)(void*) = nullptr;
93   93  
94   public: 94   public:
95   /// Construct an empty callback. 95   /// Construct an empty callback.
HITCBC 96   1244 callback() = default; 96   1244 callback() = default;
97   97  
98   /// Construct a callback with the given context and function. 98   /// Construct a callback with the given context and function.
HITCBC 99   1244 callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {} 99   1244 callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
100   100  
101   /// Return true if the callback is non-empty. 101   /// Return true if the callback is non-empty.
102   explicit operator bool() const noexcept 102   explicit operator bool() const noexcept
103   { 103   {
104   return fn_ != nullptr; 104   return fn_ != nullptr;
105   } 105   }
106   106  
107   /// Invoke the callback. 107   /// Invoke the callback.
HITCBC 108   8793 void operator()() const 108   8687 void operator()() const
109   { 109   {
HITCBC 110   8793 if (fn_) 110   8687 if (fn_)
HITCBC 111   8793 fn_(ctx_); 111   8687 fn_(ctx_);
HITCBC 112   8793 } 112   8687 }
113   }; 113   };
114   114  
115   struct implementation; 115   struct implementation;
116   116  
117   private: 117   private:
118   struct heap_entry 118   struct heap_entry
119   { 119   {
120   time_point time_; 120   time_point time_;
121   implementation* timer_; 121   implementation* timer_;
122   }; 122   };
123   123  
124   scheduler* sched_ = nullptr; 124   scheduler* sched_ = nullptr;
125   BOOST_COROSIO_MSVC_WARNING_PUSH 125   BOOST_COROSIO_MSVC_WARNING_PUSH
126   BOOST_COROSIO_MSVC_WARNING_DISABLE(4251) // std:: members, dll-interface 126   BOOST_COROSIO_MSVC_WARNING_DISABLE(4251) // std:: members, dll-interface
127   mutable std::mutex mutex_; 127   mutable std::mutex mutex_;
128   std::vector<heap_entry> heap_; 128   std::vector<heap_entry> heap_;
129   implementation* free_list_ = nullptr; 129   implementation* free_list_ = nullptr;
130   waiter_node* waiter_free_list_ = nullptr; 130   waiter_node* waiter_free_list_ = nullptr;
131   callback on_earliest_changed_; 131   callback on_earliest_changed_;
132   bool shutting_down_ = false; 132   bool shutting_down_ = false;
133   // Avoids mutex in nearest_expiry() and empty() 133   // Avoids mutex in nearest_expiry() and empty()
134   mutable std::atomic<std::int64_t> cached_nearest_ns_{ 134   mutable std::atomic<std::int64_t> cached_nearest_ns_{
135   (std::numeric_limits<std::int64_t>::max)()}; 135   (std::numeric_limits<std::int64_t>::max)()};
136   BOOST_COROSIO_MSVC_WARNING_POP 136   BOOST_COROSIO_MSVC_WARNING_POP
137   137  
138   public: 138   public:
139   /// Construct the timer service bound to a scheduler. 139   /// Construct the timer service bound to a scheduler.
HITCBC 140   1244 inline timer_service(capy::execution_context&, scheduler& sched) 140   1244 inline timer_service(capy::execution_context&, scheduler& sched)
HITCBC 141   1244 : sched_(&sched) 141   1244 : sched_(&sched)
142   { 142   {
HITCBC 143   1244 } 143   1244 }
144   144  
145   /// Return the associated scheduler. 145   /// Return the associated scheduler.
HITCBC 146   17690 inline scheduler& get_scheduler() noexcept 146   17476 inline scheduler& get_scheduler() noexcept
147   { 147   {
HITCBC 148   17690 return *sched_; 148   17476 return *sched_;
149   } 149   }
150   150  
151   /// Destroy the timer service. 151   /// Destroy the timer service.
HITCBC 152   2488 ~timer_service() override = default; 152   2488 ~timer_service() override = default;
153   153  
154   timer_service(timer_service const&) = delete; 154   timer_service(timer_service const&) = delete;
155   timer_service& operator=(timer_service const&) = delete; 155   timer_service& operator=(timer_service const&) = delete;
156   156  
157   /// Register a callback invoked when the earliest expiry changes. 157   /// Register a callback invoked when the earliest expiry changes.
HITCBC 158   1244 inline void set_on_earliest_changed(callback cb) 158   1244 inline void set_on_earliest_changed(callback cb)
159   { 159   {
HITCBC 160   1244 on_earliest_changed_ = cb; 160   1244 on_earliest_changed_ = cb;
HITCBC 161   1244 } 161   1244 }
162   162  
163   /// Return true if no timers are in the heap. 163   /// Return true if no timers are in the heap.
164   inline bool empty() const noexcept 164   inline bool empty() const noexcept
165   { 165   {
166   return cached_nearest_ns_.load(std::memory_order_acquire) == 166   return cached_nearest_ns_.load(std::memory_order_acquire) ==
167   (std::numeric_limits<std::int64_t>::max)(); 167   (std::numeric_limits<std::int64_t>::max)();
168   } 168   }
169   169  
170   /// Return the nearest timer expiry without acquiring the mutex. 170   /// Return the nearest timer expiry without acquiring the mutex.
HITCBC 171   244365 inline time_point nearest_expiry() const noexcept 171   237189 inline time_point nearest_expiry() const noexcept
172   { 172   {
HITCBC 173   244365 auto ns = cached_nearest_ns_.load(std::memory_order_acquire); 173   237189 auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
HITCBC 174   244365 return time_point(time_point::duration(ns)); 174   237189 return time_point(time_point::duration(ns));
175   } 175   }
176   176  
177   /// Cancel all pending timers and free cached resources. 177   /// Cancel all pending timers and free cached resources.
178   inline void shutdown() override; 178   inline void shutdown() override;
179   179  
180   /// Construct a new timer implementation. 180   /// Construct a new timer implementation.
181   inline io_object::implementation* construct() override; 181   inline io_object::implementation* construct() override;
182   182  
183   /// Destroy a timer implementation, cancelling pending waiters. 183   /// Destroy a timer implementation, cancelling pending waiters.
184   inline void destroy(io_object::implementation* p) override; 184   inline void destroy(io_object::implementation* p) override;
185   185  
186   /// Cancel and recycle a timer implementation. 186   /// Cancel and recycle a timer implementation.
187   inline void destroy_impl(implementation& impl); 187   inline void destroy_impl(implementation& impl);
188   188  
189   /// Create or recycle a waiter node. 189   /// Create or recycle a waiter node.
190   inline waiter_node* create_waiter(); 190   inline waiter_node* create_waiter();
191   191  
192   /// Return a waiter node to the cache or free list. 192   /// Return a waiter node to the cache or free list.
193   inline void destroy_waiter(waiter_node* w); 193   inline void destroy_waiter(waiter_node* w);
194   194  
195   /// Update the timer expiry, cancelling existing waiters. 195   /// Update the timer expiry, cancelling existing waiters.
196   inline std::size_t update_timer(implementation& impl, time_point new_time); 196   inline std::size_t update_timer(implementation& impl, time_point new_time);
197   197  
198   /// Insert a waiter into the timer's waiter list and the heap. 198   /// Insert a waiter into the timer's waiter list and the heap.
199   inline void insert_waiter(implementation& impl, waiter_node* w); 199   inline void insert_waiter(implementation& impl, waiter_node* w);
200   200  
201   /// Cancel all waiters on a timer. 201   /// Cancel all waiters on a timer.
202   inline std::size_t cancel_timer(implementation& impl); 202   inline std::size_t cancel_timer(implementation& impl);
203   203  
204   /// Cancel a single waiter ( stop_token callback path ). 204   /// Cancel a single waiter ( stop_token callback path ).
205   inline void cancel_waiter(waiter_node* w); 205   inline void cancel_waiter(waiter_node* w);
206   206  
207   /// Cancel one waiter on a timer. 207   /// Cancel one waiter on a timer.
208   inline std::size_t cancel_one_waiter(implementation& impl); 208   inline std::size_t cancel_one_waiter(implementation& impl);
209   209  
210   /// Complete all waiters whose timers have expired. 210   /// Complete all waiters whose timers have expired.
211   inline std::size_t process_expired(); 211   inline std::size_t process_expired();
212   212  
213   private: 213   private:
HITCBC 214   272875 inline void refresh_cached_nearest() noexcept 214   265909 inline void refresh_cached_nearest() noexcept
215   { 215   {
HITCBC 216   272875 auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)() 216   265909 auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
HITCBC 217   271471 : heap_[0].time_.time_since_epoch().count(); 217   264504 : heap_[0].time_.time_since_epoch().count();
HITCBC 218   272875 cached_nearest_ns_.store(ns, std::memory_order_release); 218   265909 cached_nearest_ns_.store(ns, std::memory_order_release);
HITCBC 219   272875 } 219   265909 }
220   220  
221   inline void remove_timer_impl(implementation& impl); 221   inline void remove_timer_impl(implementation& impl);
222   inline void up_heap(std::size_t index); 222   inline void up_heap(std::size_t index);
223   inline void down_heap(std::size_t index); 223   inline void down_heap(std::size_t index);
224   inline void swap_heap(std::size_t i1, std::size_t i2); 224   inline void swap_heap(std::size_t i1, std::size_t i2);
225   }; 225   };
226   226  
227   struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node 227   struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
228   : intrusive_list<waiter_node>::node 228   : intrusive_list<waiter_node>::node
229   { 229   {
230   // Embedded completion op — avoids heap allocation per fire/cancel 230   // Embedded completion op — avoids heap allocation per fire/cancel
231   struct completion_op final : scheduler_op 231   struct completion_op final : scheduler_op
232   { 232   {
233   waiter_node* waiter_ = nullptr; 233   waiter_node* waiter_ = nullptr;
234   234  
235   static void do_complete( 235   static void do_complete(
236   void* owner, scheduler_op* base, std::uint32_t, std::uint32_t); 236   void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
237   237  
HITCBC 238   316 completion_op() noexcept : scheduler_op(&do_complete) {} 238   316 completion_op() noexcept : scheduler_op(&do_complete) {}
239   239  
240   void operator()() override; 240   void operator()() override;
241   void destroy() override; 241   void destroy() override;
242   }; 242   };
243   243  
244   // Per-waiter stop_token cancellation 244   // Per-waiter stop_token cancellation
245   struct canceller 245   struct canceller
246   { 246   {
247   waiter_node* waiter_; 247   waiter_node* waiter_;
248   void operator()() const; 248   void operator()() const;
249   }; 249   };
250   250  
251   // nullptr once removed from timer's waiter list (concurrency marker) 251   // nullptr once removed from timer's waiter list (concurrency marker)
252   timer_service::implementation* impl_ = nullptr; 252   timer_service::implementation* impl_ = nullptr;
253   timer_service* svc_ = nullptr; 253   timer_service* svc_ = nullptr;
254   std::coroutine_handle<> h_; 254   std::coroutine_handle<> h_;
255   capy::continuation* cont_ = nullptr; 255   capy::continuation* cont_ = nullptr;
256   capy::executor_ref d_; 256   capy::executor_ref d_;
257   std::error_code* ec_out_ = nullptr; 257   std::error_code* ec_out_ = nullptr;
258   std::stop_token token_; 258   std::stop_token token_;
259   std::optional<std::stop_callback<canceller>> stop_cb_; 259   std::optional<std::stop_callback<canceller>> stop_cb_;
260   completion_op op_; 260   completion_op op_;
261   std::error_code ec_value_; 261   std::error_code ec_value_;
262   waiter_node* next_free_ = nullptr; 262   waiter_node* next_free_ = nullptr;
263   263  
HITCBC 264   316 waiter_node() noexcept 264   316 waiter_node() noexcept
HITCBC 265   316 { 265   316 {
HITCBC 266   316 op_.waiter_ = this; 266   316 op_.waiter_ = this;
HITCBC 267   316 } 267   316 }
268   }; 268   };
269   269  
270   struct timer_service::implementation final : timer::implementation 270   struct timer_service::implementation final : timer::implementation
271   { 271   {
272   using clock_type = std::chrono::steady_clock; 272   using clock_type = std::chrono::steady_clock;
273   using time_point = clock_type::time_point; 273   using time_point = clock_type::time_point;
274   using duration = clock_type::duration; 274   using duration = clock_type::duration;
275   275  
276   timer_service* svc_ = nullptr; 276   timer_service* svc_ = nullptr;
277   intrusive_list<waiter_node> waiters_; 277   intrusive_list<waiter_node> waiters_;
278   278  
279   // Free list linkage (reused when impl is on free_list) 279   // Free list linkage (reused when impl is on free_list)
280   implementation* next_free_ = nullptr; 280   implementation* next_free_ = nullptr;
281   281  
282   inline explicit implementation(timer_service& svc) noexcept; 282   inline explicit implementation(timer_service& svc) noexcept;
283   283  
284   inline std::coroutine_handle<> wait( 284   inline std::coroutine_handle<> wait(
285   std::coroutine_handle<>, 285   std::coroutine_handle<>,
286   capy::executor_ref, 286   capy::executor_ref,
287   std::stop_token, 287   std::stop_token,
288   std::error_code*, 288   std::error_code*,
289   capy::continuation*) override; 289   capy::continuation*) override;
290   }; 290   };
291   291  
292   // Thread-local caches avoid hot-path mutex acquisitions: 292   // Thread-local caches avoid hot-path mutex acquisitions:
293   // 1. Impl cache — single-slot, validated by comparing svc_ 293   // 1. Impl cache — single-slot, validated by comparing svc_
294   // 2. Waiter cache — single-slot, no service affinity 294   // 2. Waiter cache — single-slot, no service affinity
295   // All caches are cleared by timer_service_invalidate_cache() during shutdown. 295   // All caches are cleared by timer_service_invalidate_cache() during shutdown.
296   296  
297   inline thread_local_ptr<timer_service::implementation> tl_cached_impl; 297   inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
298   inline thread_local_ptr<waiter_node> tl_cached_waiter; 298   inline thread_local_ptr<waiter_node> tl_cached_waiter;
299   299  
300   inline timer_service::implementation* 300   inline timer_service::implementation*
HITCBC 301   9068 try_pop_tl_cache(timer_service* svc) noexcept 301   8959 try_pop_tl_cache(timer_service* svc) noexcept
302   { 302   {
HITCBC 303   9068 auto* impl = tl_cached_impl.get(); 303   8959 auto* impl = tl_cached_impl.get();
HITCBC 304   9068 if (impl) 304   8959 if (impl)
305   { 305   {
HITCBC 306   8671 tl_cached_impl.set(nullptr); 306   8562 tl_cached_impl.set(nullptr);
HITCBC 307   8671 if (impl->svc_ == svc) 307   8562 if (impl->svc_ == svc)
HITCBC 308   8671 return impl; 308   8562 return impl;
309   // Stale impl from a destroyed service 309   // Stale impl from a destroyed service
MISUBC 310   delete impl; 310   delete impl;
311   } 311   }
HITCBC 312   397 return nullptr; 312   397 return nullptr;
313   } 313   }
314   314  
315   inline bool 315   inline bool
HITCBC 316   9060 try_push_tl_cache(timer_service::implementation* impl) noexcept 316   8951 try_push_tl_cache(timer_service::implementation* impl) noexcept
317   { 317   {
HITCBC 318   9060 if (!tl_cached_impl.get()) 318   8951 if (!tl_cached_impl.get())
319   { 319   {
HITCBC 320   8948 tl_cached_impl.set(impl); 320   8839 tl_cached_impl.set(impl);
HITCBC 321   8948 return true; 321   8839 return true;
322   } 322   }
HITCBC 323   112 return false; 323   112 return false;
324   } 324   }
325   325  
326   inline waiter_node* 326   inline waiter_node*
HITCBC 327   8849 try_pop_waiter_tl_cache() noexcept 327   8742 try_pop_waiter_tl_cache() noexcept
328   { 328   {
HITCBC 329   8849 auto* w = tl_cached_waiter.get(); 329   8742 auto* w = tl_cached_waiter.get();
HITCBC 330   8849 if (w) 330   8742 if (w)
331   { 331   {
HITCBC 332   8531 tl_cached_waiter.set(nullptr); 332   8424 tl_cached_waiter.set(nullptr);
HITCBC 333   8531 return w; 333   8424 return w;
334   } 334   }
HITCBC 335   318 return nullptr; 335   318 return nullptr;
336   } 336   }
337   337  
338   inline bool 338   inline bool
HITCBC 339   8833 try_push_waiter_tl_cache(waiter_node* w) noexcept 339   8726 try_push_waiter_tl_cache(waiter_node* w) noexcept
340   { 340   {
HITCBC 341   8833 if (!tl_cached_waiter.get()) 341   8726 if (!tl_cached_waiter.get())
342   { 342   {
HITCBC 343   8747 tl_cached_waiter.set(w); 343   8640 tl_cached_waiter.set(w);
HITCBC 344   8747 return true; 344   8640 return true;
345   } 345   }
HITCBC 346   86 return false; 346   86 return false;
347   } 347   }
348   348  
349   inline void 349   inline void
HITCBC 350   1244 timer_service_invalidate_cache() noexcept 350   1244 timer_service_invalidate_cache() noexcept
351   { 351   {
HITCBC 352   1244 delete tl_cached_impl.get(); 352   1244 delete tl_cached_impl.get();
HITCBC 353   1244 tl_cached_impl.set(nullptr); 353   1244 tl_cached_impl.set(nullptr);
354   354  
HITCBC 355   1244 delete tl_cached_waiter.get(); 355   1244 delete tl_cached_waiter.get();
HITCBC 356   1244 tl_cached_waiter.set(nullptr); 356   1244 tl_cached_waiter.set(nullptr);
HITCBC 357   1244 } 357   1244 }
358   358  
359   // timer_service out-of-class member function definitions 359   // timer_service out-of-class member function definitions
360   360  
HITCBC 361   397 inline timer_service::implementation::implementation( 361   397 inline timer_service::implementation::implementation(
HITCBC 362   397 timer_service& svc) noexcept 362   397 timer_service& svc) noexcept
HITCBC 363   397 : svc_(&svc) 363   397 : svc_(&svc)
364   { 364   {
HITCBC 365   397 } 365   397 }
366   366  
367   inline void 367   inline void
HITCBC 368   1244 timer_service::shutdown() 368   1244 timer_service::shutdown()
369   { 369   {
HITCBC 370   1244 timer_service_invalidate_cache(); 370   1244 timer_service_invalidate_cache();
HITCBC 371   1244 shutting_down_ = true; 371   1244 shutting_down_ = true;
372   372  
373   // Snapshot impls and detach them from the heap so that 373   // Snapshot impls and detach them from the heap so that
374   // coroutine-owned timer destructors (triggered by h.destroy() 374   // coroutine-owned timer destructors (triggered by h.destroy()
375   // below) cannot re-enter remove_timer_impl() and mutate the 375   // below) cannot re-enter remove_timer_impl() and mutate the
376   // vector during iteration. 376   // vector during iteration.
HITCBC 377   1244 std::vector<implementation*> impls; 377   1244 std::vector<implementation*> impls;
HITCBC 378   1244 impls.reserve(heap_.size()); 378   1244 impls.reserve(heap_.size());
HITCBC 379   1252 for (auto& entry : heap_) 379   1252 for (auto& entry : heap_)
380   { 380   {
HITCBC 381   8 entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)(); 381   8 entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
HITCBC 382   8 impls.push_back(entry.timer_); 382   8 impls.push_back(entry.timer_);
383   } 383   }
HITCBC 384   1244 heap_.clear(); 384   1244 heap_.clear();
HITCBC 385   1244 cached_nearest_ns_.store( 385   1244 cached_nearest_ns_.store(
386   (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release); 386   (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
387   387  
388   // Cancel waiting timers. Each waiter called work_started() 388   // Cancel waiting timers. Each waiter called work_started()
389   // in implementation::wait(). On IOCP the scheduler shutdown 389   // in implementation::wait(). On IOCP the scheduler shutdown
390   // loop exits when outstanding_work_ reaches zero, so we must 390   // loop exits when outstanding_work_ reaches zero, so we must
391   // call work_finished() here to balance it. On other backends 391   // call work_finished() here to balance it. On other backends
392   // this is harmless. 392   // this is harmless.
HITCBC 393   1252 for (auto* impl : impls) 393   1252 for (auto* impl : impls)
394   { 394   {
HITCBC 395   16 while (auto* w = impl->waiters_.pop_front()) 395   16 while (auto* w = impl->waiters_.pop_front())
396   { 396   {
HITCBC 397   8 w->stop_cb_.reset(); 397   8 w->stop_cb_.reset();
HITCBC 398   8 auto h = std::exchange(w->h_, {}); 398   8 auto h = std::exchange(w->h_, {});
HITCBC 399   8 sched_->work_finished(); 399   8 sched_->work_finished();
HITCBC 400   8 if (h) 400   8 if (h)
HITCBC 401   8 h.destroy(); 401   8 h.destroy();
HITCBC 402   8 delete w; 402   8 delete w;
HITCBC 403   8 } 403   8 }
HITCBC 404   8 delete impl; 404   8 delete impl;
405   } 405   }
406   406  
407   // Delete free-listed impls 407   // Delete free-listed impls
HITCBC 408   1356 while (free_list_) 408   1356 while (free_list_)
409   { 409   {
HITCBC 410   112 auto* next = free_list_->next_free_; 410   112 auto* next = free_list_->next_free_;
HITCBC 411   112 delete free_list_; 411   112 delete free_list_;
HITCBC 412   112 free_list_ = next; 412   112 free_list_ = next;
413   } 413   }
414   414  
415   // Delete free-listed waiters 415   // Delete free-listed waiters
HITCBC 416   1328 while (waiter_free_list_) 416   1328 while (waiter_free_list_)
417   { 417   {
HITCBC 418   84 auto* next = waiter_free_list_->next_free_; 418   84 auto* next = waiter_free_list_->next_free_;
HITCBC 419   84 delete waiter_free_list_; 419   84 delete waiter_free_list_;
HITCBC 420   84 waiter_free_list_ = next; 420   84 waiter_free_list_ = next;
421   } 421   }
HITCBC 422   1244 } 422   1244 }
423   423  
424   inline io_object::implementation* 424   inline io_object::implementation*
HITCBC 425   9068 timer_service::construct() 425   8959 timer_service::construct()
426   { 426   {
HITCBC 427   9068 implementation* impl = try_pop_tl_cache(this); 427   8959 implementation* impl = try_pop_tl_cache(this);
HITCBC 428   9068 if (impl) 428   8959 if (impl)
429   { 429   {
HITCBC 430   8671 impl->svc_ = this; 430   8562 impl->svc_ = this;
HITCBC 431   8671 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)(); 431   8562 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
HITCBC 432   8671 impl->might_have_pending_waits_ = false; 432   8562 impl->might_have_pending_waits_ = false;
HITCBC 433   8671 return impl; 433   8562 return impl;
434   } 434   }
435   435  
HITCBC 436   397 std::lock_guard lock(mutex_); 436   397 std::lock_guard lock(mutex_);
HITCBC 437   397 if (free_list_) 437   397 if (free_list_)
438   { 438   {
MISUBC 439   impl = free_list_; 439   impl = free_list_;
MISUBC 440   free_list_ = impl->next_free_; 440   free_list_ = impl->next_free_;
MISUBC 441   impl->next_free_ = nullptr; 441   impl->next_free_ = nullptr;
MISUBC 442   impl->svc_ = this; 442   impl->svc_ = this;
MISUBC 443   impl->heap_index_ = (std::numeric_limits<std::size_t>::max)(); 443   impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
MISUBC 444   impl->might_have_pending_waits_ = false; 444   impl->might_have_pending_waits_ = false;
445   } 445   }
446   else 446   else
447   { 447   {
HITCBC 448   397 impl = new implementation(*this); 448   397 impl = new implementation(*this);
449   } 449   }
HITCBC 450   397 return impl; 450   397 return impl;
HITCBC 451   397 } 451   397 }
452   452  
453   inline void 453   inline void
HITCBC 454   9066 timer_service::destroy(io_object::implementation* p) 454   8957 timer_service::destroy(io_object::implementation* p)
455   { 455   {
HITCBC 456   9066 destroy_impl(static_cast<implementation&>(*p)); 456   8957 destroy_impl(static_cast<implementation&>(*p));
HITCBC 457   9066 } 457   8957 }
458   458  
459   inline void 459   inline void
HITCBC 460   9066 timer_service::destroy_impl(implementation& impl) 460   8957 timer_service::destroy_impl(implementation& impl)
461   { 461   {
462   // During shutdown the impl is owned by the shutdown loop. 462   // During shutdown the impl is owned by the shutdown loop.
463   // Re-entering here (from a coroutine-owned timer destructor 463   // Re-entering here (from a coroutine-owned timer destructor
464   // triggered by h.destroy()) must not modify the heap or 464   // triggered by h.destroy()) must not modify the heap or
465   // recycle the impl — shutdown deletes it directly. 465   // recycle the impl — shutdown deletes it directly.
HITCBC 466   9066 if (shutting_down_) 466   8957 if (shutting_down_)
HITCBC 467   8954 return; 467   8845 return;
468   468  
HITCBC 469   9060 cancel_timer(impl); 469   8951 cancel_timer(impl);
470   470  
HITCBC 471   9060 if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)()) 471   8951 if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
472   { 472   {
MISUBC 473   std::lock_guard lock(mutex_); 473   std::lock_guard lock(mutex_);
MISUBC 474   remove_timer_impl(impl); 474   remove_timer_impl(impl);
MISUBC 475   refresh_cached_nearest(); 475   refresh_cached_nearest();
MISUBC 476   } 476   }
477   477  
HITCBC 478   9060 if (try_push_tl_cache(&impl)) 478   8951 if (try_push_tl_cache(&impl))
HITCBC 479   8948 return; 479   8839 return;
480   480  
HITCBC 481   112 std::lock_guard lock(mutex_); 481   112 std::lock_guard lock(mutex_);
HITCBC 482   112 impl.next_free_ = free_list_; 482   112 impl.next_free_ = free_list_;
HITCBC 483   112 free_list_ = &impl; 483   112 free_list_ = &impl;
HITCBC 484   112 } 484   112 }
485   485  
486   inline waiter_node* 486   inline waiter_node*
HITCBC 487   8849 timer_service::create_waiter() 487   8742 timer_service::create_waiter()
488   { 488   {
HITCBC 489   8849 if (auto* w = try_pop_waiter_tl_cache()) 489   8742 if (auto* w = try_pop_waiter_tl_cache())
HITCBC 490   8531 return w; 490   8424 return w;
491   491  
HITCBC 492   318 std::lock_guard lock(mutex_); 492   318 std::lock_guard lock(mutex_);
HITCBC 493   318 if (waiter_free_list_) 493   318 if (waiter_free_list_)
494   { 494   {
HITCBC 495   2 auto* w = waiter_free_list_; 495   2 auto* w = waiter_free_list_;
HITCBC 496   2 waiter_free_list_ = w->next_free_; 496   2 waiter_free_list_ = w->next_free_;
HITCBC 497   2 w->next_free_ = nullptr; 497   2 w->next_free_ = nullptr;
HITCBC 498   2 return w; 498   2 return w;
499   } 499   }
500   500  
HITCBC 501   316 return new waiter_node(); 501   316 return new waiter_node();
HITCBC 502   318 } 502   318 }
503   503  
504   inline void 504   inline void
HITCBC 505   8833 timer_service::destroy_waiter(waiter_node* w) 505   8726 timer_service::destroy_waiter(waiter_node* w)
506   { 506   {
HITCBC 507   8833 if (try_push_waiter_tl_cache(w)) 507   8726 if (try_push_waiter_tl_cache(w))
HITCBC 508   8747 return; 508   8640 return;
509   509  
HITCBC 510   86 std::lock_guard lock(mutex_); 510   86 std::lock_guard lock(mutex_);
HITCBC 511   86 w->next_free_ = waiter_free_list_; 511   86 w->next_free_ = waiter_free_list_;
HITCBC 512   86 waiter_free_list_ = w; 512   86 waiter_free_list_ = w;
HITCBC 513   86 } 513   86 }
514   514  
515   inline std::size_t 515   inline std::size_t
HITCBC 516   8 timer_service::update_timer(implementation& impl, time_point new_time) 516   8 timer_service::update_timer(implementation& impl, time_point new_time)
517   { 517   {
518   bool in_heap = 518   bool in_heap =
HITCBC 519   8 (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)()); 519   8 (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
HITCBC 520   8 if (!in_heap && impl.waiters_.empty()) 520   8 if (!in_heap && impl.waiters_.empty())
MISUBC 521   return 0; 521   return 0;
522   522  
HITCBC 523   8 bool notify = false; 523   8 bool notify = false;
HITCBC 524   8 intrusive_list<waiter_node> canceled; 524   8 intrusive_list<waiter_node> canceled;
525   525  
526   { 526   {
HITCBC 527   8 std::lock_guard lock(mutex_); 527   8 std::lock_guard lock(mutex_);
528   528  
HITCBC 529   20 while (auto* w = impl.waiters_.pop_front()) 529   20 while (auto* w = impl.waiters_.pop_front())
530   { 530   {
HITCBC 531   12 w->impl_ = nullptr; 531   12 w->impl_ = nullptr;
HITCBC 532   12 canceled.push_back(w); 532   12 canceled.push_back(w);
HITCBC 533   12 } 533   12 }
534   534  
HITCBC 535   8 if (impl.heap_index_ < heap_.size()) 535   8 if (impl.heap_index_ < heap_.size())
536   { 536   {
HITCBC 537   8 time_point old_time = heap_[impl.heap_index_].time_; 537   8 time_point old_time = heap_[impl.heap_index_].time_;
HITCBC 538   8 heap_[impl.heap_index_].time_ = new_time; 538   8 heap_[impl.heap_index_].time_ = new_time;
539   539  
HITCBC 540   8 if (new_time < old_time) 540   8 if (new_time < old_time)
HITCBC 541   6 up_heap(impl.heap_index_); 541   6 up_heap(impl.heap_index_);
542   else 542   else
HITCBC 543   2 down_heap(impl.heap_index_); 543   2 down_heap(impl.heap_index_);
544   544  
HITCBC 545   8 notify = (impl.heap_index_ == 0); 545   8 notify = (impl.heap_index_ == 0);
546   } 546   }
547   547  
HITCBC 548   8 refresh_cached_nearest(); 548   8 refresh_cached_nearest();
HITCBC 549   8 } 549   8 }
550   550  
HITCBC 551   8 std::size_t count = 0; 551   8 std::size_t count = 0;
HITCBC 552   20 while (auto* w = canceled.pop_front()) 552   20 while (auto* w = canceled.pop_front())
553   { 553   {
HITCBC 554   12 w->ec_value_ = make_error_code(capy::error::canceled); 554   12 w->ec_value_ = make_error_code(capy::error::canceled);
HITCBC 555   12 sched_->post(&w->op_); 555   12 sched_->post(&w->op_);
HITCBC 556   12 ++count; 556   12 ++count;
HITCBC 557   12 } 557   12 }
558   558  
HITCBC 559   8 if (notify) 559   8 if (notify)
HITCBC 560   6 on_earliest_changed_(); 560   6 on_earliest_changed_();
561   561  
HITCBC 562   8 return count; 562   8 return count;
563   } 563   }
564   564  
565   inline void 565   inline void
HITCBC 566   8849 timer_service::insert_waiter(implementation& impl, waiter_node* w) 566   8742 timer_service::insert_waiter(implementation& impl, waiter_node* w)
567   { 567   {
HITCBC 568   8849 bool notify = false; 568   8742 bool notify = false;
569   { 569   {
HITCBC 570   8849 std::lock_guard lock(mutex_); 570   8742 std::lock_guard lock(mutex_);
HITCBC 571   8849 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)()) 571   8742 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
572   { 572   {
HITCBC 573   8827 impl.heap_index_ = heap_.size(); 573   8720 impl.heap_index_ = heap_.size();
HITCBC 574   8827 heap_.push_back({impl.expiry_, &impl}); 574   8720 heap_.push_back({impl.expiry_, &impl});
HITCBC 575   8827 up_heap(heap_.size() - 1); 575   8720 up_heap(heap_.size() - 1);
HITCBC 576   8827 notify = (impl.heap_index_ == 0); 576   8720 notify = (impl.heap_index_ == 0);
HITCBC 577   8827 refresh_cached_nearest(); 577   8720 refresh_cached_nearest();
578   } 578   }
HITCBC 579   8849 impl.waiters_.push_back(w); 579   8742 impl.waiters_.push_back(w);
HITCBC 580   8849 } 580   8742 }
HITCBC 581   8849 if (notify) 581   8742 if (notify)
HITCBC 582   8787 on_earliest_changed_(); 582   8681 on_earliest_changed_();
HITCBC 583   8849 } 583   8742 }
584   584  
585   inline std::size_t 585   inline std::size_t
HITCBC 586   9068 timer_service::cancel_timer(implementation& impl) 586   8959 timer_service::cancel_timer(implementation& impl)
587   { 587   {
HITCBC 588   9068 if (!impl.might_have_pending_waits_) 588   8959 if (!impl.might_have_pending_waits_)
HITCBC 589   9042 return 0; 589   8933 return 0;
590   590  
591   // Not in heap and no waiters — just clear the flag 591   // Not in heap and no waiters — just clear the flag
HITCBC 592   26 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() && 592   26 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
MISUBC 593   impl.waiters_.empty()) 593   impl.waiters_.empty())
594   { 594   {
MISUBC 595   impl.might_have_pending_waits_ = false; 595   impl.might_have_pending_waits_ = false;
MISUBC 596   return 0; 596   return 0;
597   } 597   }
598   598  
HITCBC 599   26 intrusive_list<waiter_node> canceled; 599   26 intrusive_list<waiter_node> canceled;
600   600  
601   { 601   {
HITCBC 602   26 std::lock_guard lock(mutex_); 602   26 std::lock_guard lock(mutex_);
HITCBC 603   26 remove_timer_impl(impl); 603   26 remove_timer_impl(impl);
HITCBC 604   54 while (auto* w = impl.waiters_.pop_front()) 604   54 while (auto* w = impl.waiters_.pop_front())
605   { 605   {
HITCBC 606   28 w->impl_ = nullptr; 606   28 w->impl_ = nullptr;
HITCBC 607   28 canceled.push_back(w); 607   28 canceled.push_back(w);
HITCBC 608   28 } 608   28 }
HITCBC 609   26 refresh_cached_nearest(); 609   26 refresh_cached_nearest();
HITCBC 610   26 } 610   26 }
611   611  
HITCBC 612   26 impl.might_have_pending_waits_ = false; 612   26 impl.might_have_pending_waits_ = false;
613   613  
HITCBC 614   26 std::size_t count = 0; 614   26 std::size_t count = 0;
HITCBC 615   54 while (auto* w = canceled.pop_front()) 615   54 while (auto* w = canceled.pop_front())
616   { 616   {
HITCBC 617   28 w->ec_value_ = make_error_code(capy::error::canceled); 617   28 w->ec_value_ = make_error_code(capy::error::canceled);
HITCBC 618   28 sched_->post(&w->op_); 618   28 sched_->post(&w->op_);
HITCBC 619   28 ++count; 619   28 ++count;
HITCBC 620   28 } 620   28 }
621   621  
HITCBC 622   26 return count; 622   26 return count;
623   } 623   }
624   624  
625   inline void 625   inline void
HITCBC 626   36 timer_service::cancel_waiter(waiter_node* w) 626   36 timer_service::cancel_waiter(waiter_node* w)
627   { 627   {
628   { 628   {
HITCBC 629   36 std::lock_guard lock(mutex_); 629   36 std::lock_guard lock(mutex_);
630   // Already removed by cancel_timer or process_expired 630   // Already removed by cancel_timer or process_expired
HITCBC 631   36 if (!w->impl_) 631   36 if (!w->impl_)
MISUBC 632   return; 632   return;
HITCBC 633   36 auto* impl = w->impl_; 633   36 auto* impl = w->impl_;
HITCBC 634   36 w->impl_ = nullptr; 634   36 w->impl_ = nullptr;
HITCBC 635   36 impl->waiters_.remove(w); 635   36 impl->waiters_.remove(w);
HITCBC 636   36 if (impl->waiters_.empty()) 636   36 if (impl->waiters_.empty())
637   { 637   {
HITCBC 638   34 remove_timer_impl(*impl); 638   34 remove_timer_impl(*impl);
HITCBC 639   34 impl->might_have_pending_waits_ = false; 639   34 impl->might_have_pending_waits_ = false;
640   } 640   }
HITCBC 641   36 refresh_cached_nearest(); 641   36 refresh_cached_nearest();
HITCBC 642   36 } 642   36 }
643   643  
HITCBC 644   36 w->ec_value_ = make_error_code(capy::error::canceled); 644   36 w->ec_value_ = make_error_code(capy::error::canceled);
HITCBC 645   36 sched_->post(&w->op_); 645   36 sched_->post(&w->op_);
646   } 646   }
647   647  
648   inline std::size_t 648   inline std::size_t
HITCBC 649   2 timer_service::cancel_one_waiter(implementation& impl) 649   2 timer_service::cancel_one_waiter(implementation& impl)
650   { 650   {
HITCBC 651   2 if (!impl.might_have_pending_waits_) 651   2 if (!impl.might_have_pending_waits_)
MISUBC 652   return 0; 652   return 0;
653   653  
HITCBC 654   2 waiter_node* w = nullptr; 654   2 waiter_node* w = nullptr;
655   655  
656   { 656   {
HITCBC 657   2 std::lock_guard lock(mutex_); 657   2 std::lock_guard lock(mutex_);
HITCBC 658   2 w = impl.waiters_.pop_front(); 658   2 w = impl.waiters_.pop_front();
HITCBC 659   2 if (!w) 659   2 if (!w)
MISUBC 660   return 0; 660   return 0;
HITCBC 661   2 w->impl_ = nullptr; 661   2 w->impl_ = nullptr;
HITCBC 662   2 if (impl.waiters_.empty()) 662   2 if (impl.waiters_.empty())
663   { 663   {
MISUBC 664   remove_timer_impl(impl); 664   remove_timer_impl(impl);
MISUBC 665   impl.might_have_pending_waits_ = false; 665   impl.might_have_pending_waits_ = false;
666   } 666   }
HITCBC 667   2 refresh_cached_nearest(); 667   2 refresh_cached_nearest();
HITCBC 668   2 } 668   2 }
669   669  
HITCBC 670   2 w->ec_value_ = make_error_code(capy::error::canceled); 670   2 w->ec_value_ = make_error_code(capy::error::canceled);
HITCBC 671   2 sched_->post(&w->op_); 671   2 sched_->post(&w->op_);
HITCBC 672   2 return 1; 672   2 return 1;
673   } 673   }
674   674  
675   inline std::size_t 675   inline std::size_t
HITCBC 676   263976 timer_service::process_expired() 676   257117 timer_service::process_expired()
677   { 677   {
HITCBC 678   263976 intrusive_list<waiter_node> expired; 678   257117 intrusive_list<waiter_node> expired;
679   679  
680   { 680   {
HITCBC 681   263976 std::lock_guard lock(mutex_); 681   257117 std::lock_guard lock(mutex_);
HITCBC 682   263976 auto now = clock_type::now(); 682   257117 auto now = clock_type::now();
683   683  
HITCBC 684   272735 while (!heap_.empty() && heap_[0].time_ <= now) 684   265769 while (!heap_.empty() && heap_[0].time_ <= now)
685   { 685   {
HITCBC 686   8759 implementation* t = heap_[0].timer_; 686   8652 implementation* t = heap_[0].timer_;
HITCBC 687   8759 remove_timer_impl(*t); 687   8652 remove_timer_impl(*t);
HITCBC 688   17522 while (auto* w = t->waiters_.pop_front()) 688   17308 while (auto* w = t->waiters_.pop_front())
689   { 689   {
HITCBC 690   8763 w->impl_ = nullptr; 690   8656 w->impl_ = nullptr;
HITCBC 691   8763 w->ec_value_ = {}; 691   8656 w->ec_value_ = {};
HITCBC 692   8763 expired.push_back(w); 692   8656 expired.push_back(w);
HITCBC 693   8763 } 693   8656 }
HITCBC 694   8759 t->might_have_pending_waits_ = false; 694   8652 t->might_have_pending_waits_ = false;
695   } 695   }
696   696  
HITCBC 697   263976 refresh_cached_nearest(); 697   257117 refresh_cached_nearest();
HITCBC 698   263976 } 698   257117 }
699   699  
HITCBC 700   263976 std::size_t count = 0; 700   257117 std::size_t count = 0;
HITCBC 701   272739 while (auto* w = expired.pop_front()) 701   265773 while (auto* w = expired.pop_front())
702   { 702   {
HITCBC 703   8763 sched_->post(&w->op_); 703   8656 sched_->post(&w->op_);
HITCBC 704   8763 ++count; 704   8656 ++count;
HITCBC 705   8763 } 705   8656 }
706   706  
HITCBC 707   263976 return count; 707   257117 return count;
708   } 708   }
709   709  
710   inline void 710   inline void
HITCBC 711   8819 timer_service::remove_timer_impl(implementation& impl) 711   8712 timer_service::remove_timer_impl(implementation& impl)
712   { 712   {
HITCBC 713   8819 std::size_t index = impl.heap_index_; 713   8712 std::size_t index = impl.heap_index_;
HITCBC 714   8819 if (index >= heap_.size()) 714   8712 if (index >= heap_.size())
MISUBC 715   return; // Not in heap 715   return; // Not in heap
716   716  
HITCBC 717   8819 if (index == heap_.size() - 1) 717   8712 if (index == heap_.size() - 1)
718   { 718   {
719   // Last element, just pop 719   // Last element, just pop
HITCBC 720   278 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)(); 720   279 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
HITCBC 721   278 heap_.pop_back(); 721   279 heap_.pop_back();
722   } 722   }
723   else 723   else
724   { 724   {
725   // Swap with last and reheapify 725   // Swap with last and reheapify
HITCBC 726   8541 swap_heap(index, heap_.size() - 1); 726   8433 swap_heap(index, heap_.size() - 1);
HITCBC 727   8541 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)(); 727   8433 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
HITCBC 728   8541 heap_.pop_back(); 728   8433 heap_.pop_back();
729   729  
HITCBC 730   8541 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_) 730   8433 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
MISUBC 731   up_heap(index); 731   up_heap(index);
732   else 732   else
HITCBC 733   8541 down_heap(index); 733   8433 down_heap(index);
734   } 734   }
735   } 735   }
736   736  
737   inline void 737   inline void
HITCBC 738   8833 timer_service::up_heap(std::size_t index) 738   8726 timer_service::up_heap(std::size_t index)
739   { 739   {
HITCBC 740   17346 while (index > 0) 740   17132 while (index > 0)
741   { 741   {
HITCBC 742   8553 std::size_t parent = (index - 1) / 2; 742   8445 std::size_t parent = (index - 1) / 2;
HITCBC 743   8553 if (!(heap_[index].time_ < heap_[parent].time_)) 743   8445 if (!(heap_[index].time_ < heap_[parent].time_))
HITCBC 744   40 break; 744   39 break;
HITCBC 745   8513 swap_heap(index, parent); 745   8406 swap_heap(index, parent);
HITCBC 746   8513 index = parent; 746   8406 index = parent;
747   } 747   }
HITCBC 748   8833 } 748   8726 }
749   749  
750   inline void 750   inline void
HITCBC 751   8543 timer_service::down_heap(std::size_t index) 751   8435 timer_service::down_heap(std::size_t index)
752   { 752   {
HITCBC 753   8543 std::size_t child = index * 2 + 1; 753   8435 std::size_t child = index * 2 + 1;
HITCBC 754   8547 while (child < heap_.size()) 754   8439 while (child < heap_.size())
755   { 755   {
HITCBC 756   10 std::size_t min_child = (child + 1 == heap_.size() || 756   10 std::size_t min_child = (child + 1 == heap_.size() ||
HITCBC 757   2 heap_[child].time_ < heap_[child + 1].time_) 757   2 heap_[child].time_ < heap_[child + 1].time_)
HITCBC 758   12 ? child 758   12 ? child
HITCBC 759   10 : child + 1; 759   10 : child + 1;
760   760  
HITCBC 761   10 if (heap_[index].time_ < heap_[min_child].time_) 761   10 if (heap_[index].time_ < heap_[min_child].time_)
HITCBC 762   6 break; 762   6 break;
763   763  
HITCBC 764   4 swap_heap(index, min_child); 764   4 swap_heap(index, min_child);
HITCBC 765   4 index = min_child; 765   4 index = min_child;
HITCBC 766   4 child = index * 2 + 1; 766   4 child = index * 2 + 1;
767   } 767   }
HITCBC 768   8543 } 768   8435 }
769   769  
770   inline void 770   inline void
HITCBC 771   17058 timer_service::swap_heap(std::size_t i1, std::size_t i2) 771   16843 timer_service::swap_heap(std::size_t i1, std::size_t i2)
772   { 772   {
HITCBC 773   17058 heap_entry tmp = heap_[i1]; 773   16843 heap_entry tmp = heap_[i1];
HITCBC 774   17058 heap_[i1] = heap_[i2]; 774   16843 heap_[i1] = heap_[i2];
HITCBC 775   17058 heap_[i2] = tmp; 775   16843 heap_[i2] = tmp;
HITCBC 776   17058 heap_[i1].timer_->heap_index_ = i1; 776   16843 heap_[i1].timer_->heap_index_ = i1;
HITCBC 777   17058 heap_[i2].timer_->heap_index_ = i2; 777   16843 heap_[i2].timer_->heap_index_ = i2;
HITCBC 778   17058 } 778   16843 }
779   779  
780   // waiter_node out-of-class member function definitions 780   // waiter_node out-of-class member function definitions
781   781  
782   inline void 782   inline void
HITCBC 783   36 waiter_node::canceller::operator()() const 783   36 waiter_node::canceller::operator()() const
784   { 784   {
HITCBC 785   36 waiter_->svc_->cancel_waiter(waiter_); 785   36 waiter_->svc_->cancel_waiter(waiter_);
HITCBC 786   36 } 786   36 }
787   787  
788   inline void 788   inline void
MISUBC 789   waiter_node::completion_op::do_complete( 789   waiter_node::completion_op::do_complete(
790   [[maybe_unused]] void* owner, 790   [[maybe_unused]] void* owner,
791   scheduler_op* base, 791   scheduler_op* base,
792   std::uint32_t, 792   std::uint32_t,
793   std::uint32_t) 793   std::uint32_t)
794   { 794   {
795   // owner is always non-null here. The destroy path (owner == nullptr) 795   // owner is always non-null here. The destroy path (owner == nullptr)
796   // is unreachable because completion_op overrides destroy() directly, 796   // is unreachable because completion_op overrides destroy() directly,
797   // bypassing scheduler_op::destroy() which would call func_(nullptr, ...). 797   // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
MISUBC 798   BOOST_COROSIO_ASSERT(owner); 798   BOOST_COROSIO_ASSERT(owner);
MISUBC 799   static_cast<completion_op*>(base)->operator()(); 799   static_cast<completion_op*>(base)->operator()();
MISUBC 800   } 800   }
801   801  
802   inline void 802   inline void
HITCBC 803   8833 waiter_node::completion_op::operator()() 803   8726 waiter_node::completion_op::operator()()
804   { 804   {
HITCBC 805   8833 auto* w = waiter_; 805   8726 auto* w = waiter_;
HITCBC 806   8833 w->stop_cb_.reset(); 806   8726 w->stop_cb_.reset();
HITCBC 807   8833 if (w->ec_out_) 807   8726 if (w->ec_out_)
HITCBC 808   8833 *w->ec_out_ = w->ec_value_; 808   8726 *w->ec_out_ = w->ec_value_;
809   809  
HITCBC 810   8833 auto* cont = w->cont_; 810   8726 auto* cont = w->cont_;
HITCBC 811   8833 auto d = w->d_; 811   8726 auto d = w->d_;
HITCBC 812   8833 auto* svc = w->svc_; 812   8726 auto* svc = w->svc_;
HITCBC 813   8833 auto& sched = svc->get_scheduler(); 813   8726 auto& sched = svc->get_scheduler();
814   814  
HITCBC 815   8833 svc->destroy_waiter(w); 815   8726 svc->destroy_waiter(w);
816   816  
HITCBC 817   8833 d.post(*cont); 817   8726 d.post(*cont);
HITCBC 818   8833 sched.work_finished(); 818   8726 sched.work_finished();
HITCBC 819   8833 } 819   8726 }
820   820  
821   // GCC 14 false-positive: inlining ~optional<stop_callback> through 821   // GCC 14 false-positive: inlining ~optional<stop_callback> through
822   // delete loses track that stop_cb_ was already .reset() above. 822   // delete loses track that stop_cb_ was already .reset() above.
823   #if defined(__GNUC__) && !defined(__clang__) 823   #if defined(__GNUC__) && !defined(__clang__)
824   #pragma GCC diagnostic push 824   #pragma GCC diagnostic push
825   #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" 825   #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
826   #endif 826   #endif
827   inline void 827   inline void
HITCBC 828   8 waiter_node::completion_op::destroy() 828   8 waiter_node::completion_op::destroy()
829   { 829   {
830   // Called during scheduler shutdown drain when this completion_op is 830   // Called during scheduler shutdown drain when this completion_op is
831   // in the scheduler's ready queue (posted by cancel_timer() or 831   // in the scheduler's ready queue (posted by cancel_timer() or
832   // process_expired()). Balances the work_started() from 832   // process_expired()). Balances the work_started() from
833   // implementation::wait(). The scheduler drain loop separately 833   // implementation::wait(). The scheduler drain loop separately
834   // balances the work_started() from post(). On IOCP both decrements 834   // balances the work_started() from post(). On IOCP both decrements
835   // are required for outstanding_work_ to reach zero; on other 835   // are required for outstanding_work_ to reach zero; on other
836   // backends this is harmless. 836   // backends this is harmless.
837   // 837   //
838   // This override also prevents scheduler_op::destroy() from calling 838   // This override also prevents scheduler_op::destroy() from calling
839   // do_complete(nullptr, ...). See also: timer_service::shutdown() 839   // do_complete(nullptr, ...). See also: timer_service::shutdown()
840   // which drains waiters still in the timer heap (the other path). 840   // which drains waiters still in the timer heap (the other path).
HITCBC 841   8 auto* w = waiter_; 841   8 auto* w = waiter_;
HITCBC 842   8 w->stop_cb_.reset(); 842   8 w->stop_cb_.reset();
HITCBC 843   8 auto h = std::exchange(w->h_, {}); 843   8 auto h = std::exchange(w->h_, {});
HITCBC 844   8 auto& sched = w->svc_->get_scheduler(); 844   8 auto& sched = w->svc_->get_scheduler();
HITCBC 845   8 delete w; 845   8 delete w;
HITCBC 846   8 sched.work_finished(); 846   8 sched.work_finished();
HITCBC 847   8 if (h) 847   8 if (h)
HITCBC 848   8 h.destroy(); 848   8 h.destroy();
HITCBC 849   8 } 849   8 }
850   #if defined(__GNUC__) && !defined(__clang__) 850   #if defined(__GNUC__) && !defined(__clang__)
851   #pragma GCC diagnostic pop 851   #pragma GCC diagnostic pop
852   #endif 852   #endif
853   853  
854   inline std::coroutine_handle<> 854   inline std::coroutine_handle<>
HITCBC 855   8849 timer_service::implementation::wait( 855   8742 timer_service::implementation::wait(
856   std::coroutine_handle<> h, 856   std::coroutine_handle<> h,
857   capy::executor_ref d, 857   capy::executor_ref d,
858   std::stop_token token, 858   std::stop_token token,
859   std::error_code* ec, 859   std::error_code* ec,
860   capy::continuation* cont) 860   capy::continuation* cont)
861   { 861   {
862   // Already-expired fast path — no waiter_node, no mutex. 862   // Already-expired fast path — no waiter_node, no mutex.
863   // Post instead of dispatch so the coroutine yields to the 863   // Post instead of dispatch so the coroutine yields to the
864   // scheduler, allowing other queued work to run. 864   // scheduler, allowing other queued work to run.
HITCBC 865   8849 if (heap_index_ == (std::numeric_limits<std::size_t>::max)()) 865   8742 if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
866   { 866   {
HITCBC 867   8827 if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now()) 867   8720 if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
868   { 868   {
MISUBC 869   if (ec) 869   if (ec)
MISUBC 870   *ec = {}; 870   *ec = {};
MISUBC 871   d.post(*cont); 871   d.post(*cont);
MISUBC 872   return std::noop_coroutine(); 872   return std::noop_coroutine();
873   } 873   }
874   } 874   }
875   875  
HITCBC 876   8849 auto* w = svc_->create_waiter(); 876   8742 auto* w = svc_->create_waiter();
HITCBC 877   8849 w->impl_ = this; 877   8742 w->impl_ = this;
HITCBC 878   8849 w->svc_ = svc_; 878   8742 w->svc_ = svc_;
HITCBC 879   8849 w->h_ = h; 879   8742 w->h_ = h;
HITCBC 880   8849 w->cont_ = cont; 880   8742 w->cont_ = cont;
HITCBC 881   8849 w->d_ = d; 881   8742 w->d_ = d;
HITCBC 882   8849 w->token_ = std::move(token); 882   8742 w->token_ = std::move(token);
HITCBC 883   8849 w->ec_out_ = ec; 883   8742 w->ec_out_ = ec;
884   884  
HITCBC 885   8849 svc_->insert_waiter(*this, w); 885   8742 svc_->insert_waiter(*this, w);
HITCBC 886   8849 might_have_pending_waits_ = true; 886   8742 might_have_pending_waits_ = true;
HITCBC 887   8849 svc_->get_scheduler().work_started(); 887   8742 svc_->get_scheduler().work_started();
888   888  
HITCBC 889   8849 if (w->token_.stop_possible()) 889   8742 if (w->token_.stop_possible())
HITCBC 890   60 w->stop_cb_.emplace(w->token_, waiter_node::canceller{w}); 890   60 w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
891   891  
HITCBC 892   8849 return std::noop_coroutine(); 892   8742 return std::noop_coroutine();
893   } 893   }
894   894  
895   // Free functions 895   // Free functions
896   896  
897   inline std::size_t 897   inline std::size_t
HITCBC 898   8 timer_service_update_expiry(timer::implementation& base) 898   8 timer_service_update_expiry(timer::implementation& base)
899   { 899   {
HITCBC 900   8 auto& impl = static_cast<timer_service::implementation&>(base); 900   8 auto& impl = static_cast<timer_service::implementation&>(base);
HITCBC 901   8 return impl.svc_->update_timer(impl, impl.expiry_); 901   8 return impl.svc_->update_timer(impl, impl.expiry_);
902   } 902   }
903   903  
904   inline std::size_t 904   inline std::size_t
HITCBC 905   8 timer_service_cancel(timer::implementation& base) noexcept 905   8 timer_service_cancel(timer::implementation& base) noexcept
906   { 906   {
HITCBC 907   8 auto& impl = static_cast<timer_service::implementation&>(base); 907   8 auto& impl = static_cast<timer_service::implementation&>(base);
HITCBC 908   8 return impl.svc_->cancel_timer(impl); 908   8 return impl.svc_->cancel_timer(impl);
909   } 909   }
910   910  
911   inline std::size_t 911   inline std::size_t
HITCBC 912   2 timer_service_cancel_one(timer::implementation& base) noexcept 912   2 timer_service_cancel_one(timer::implementation& base) noexcept
913   { 913   {
HITCBC 914   2 auto& impl = static_cast<timer_service::implementation&>(base); 914   2 auto& impl = static_cast<timer_service::implementation&>(base);
HITCBC 915   2 return impl.svc_->cancel_one_waiter(impl); 915   2 return impl.svc_->cancel_one_waiter(impl);
916   } 916   }
917   917  
918   inline timer_service& 918   inline timer_service&
HITCBC 919   1244 get_timer_service(capy::execution_context& ctx, scheduler& sched) 919   1244 get_timer_service(capy::execution_context& ctx, scheduler& sched)
920   { 920   {
HITCBC 921   1244 return ctx.make_service<timer_service>(sched); 921   1244 return ctx.make_service<timer_service>(sched);
922   } 922   }
923   923  
924   } // namespace boost::corosio::detail 924   } // namespace boost::corosio::detail
925   925  
926   #endif 926   #endif