LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_scheduler.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 81.6 % 342 279 63
Test Date: 2026-06-30 19:55:18 Functions: 89.4 % 47 42 5

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/config.hpp>
      14                 : #include <boost/capy/ex/execution_context.hpp>
      15                 : 
      16                 : #include <boost/corosio/detail/scheduler.hpp>
      17                 : #include <boost/corosio/detail/scheduler_op.hpp>
      18                 : #include <boost/corosio/detail/thread_local_ptr.hpp>
      19                 : 
      20                 : #include <atomic>
      21                 : #include <chrono>
      22                 : #include <coroutine>
      23                 : #include <cstddef>
      24                 : #include <cstdint>
      25                 : #include <limits>
      26                 : #include <memory>
      27                 : #include <stdexcept>
      28                 : 
      29                 : #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
      30                 : #include <boost/corosio/detail/conditionally_enabled_event.hpp>
      31                 : 
      32                 : namespace boost::corosio::detail {
      33                 : 
      34                 : // Forward declarations
      35                 : class reactor_scheduler;
      36                 : class timer_service;
      37                 : 
      38                 : /** Per-thread state for a reactor scheduler.
      39                 : 
      40                 :     Each thread running a scheduler's event loop has one of these
      41                 :     on a thread-local stack. It holds a private work queue and
      42                 :     inline completion budget for speculative I/O fast paths.
      43                 : */
      44                 : struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
      45                 : {
      46                 :     /// Scheduler this context belongs to.
      47                 :     reactor_scheduler const* key;
      48                 : 
      49                 :     /// Next context frame on this thread's stack.
      50                 :     reactor_scheduler_context* next;
      51                 : 
      52                 :     /// Private work queue for reduced contention.
      53                 :     op_queue private_queue;
      54                 : 
      55                 :     /// Unflushed work count for the private queue.
      56                 :     std::int64_t private_outstanding_work;
      57                 : 
      58                 :     /// Remaining inline completions allowed this cycle.
      59                 :     int inline_budget;
      60                 : 
      61                 :     /// Maximum inline budget (adaptive, 2-16).
      62                 :     int inline_budget_max;
      63                 : 
      64                 :     /// True if no other thread absorbed queued work last cycle.
      65                 :     bool unassisted;
      66                 : 
      67                 :     /// Construct a context frame linked to @a n.
      68                 :     reactor_scheduler_context(
      69                 :         reactor_scheduler const* k,
      70                 :         reactor_scheduler_context* n);
      71                 : };
      72                 : 
      73                 : /// Thread-local context stack for reactor schedulers.
      74                 : inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
      75                 : 
      76                 : /// Find the context frame for a scheduler on this thread.
      77                 : inline reactor_scheduler_context*
      78 HIT      938839 : reactor_find_context(reactor_scheduler const* self) noexcept
      79                 : {
      80          938839 :     for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
      81                 :     {
      82          932640 :         if (c->key == self)
      83          932640 :             return c;
      84                 :     }
      85            6199 :     return nullptr;
      86                 : }
      87                 : 
      88                 : /// Flush private work count to global counter.
      89                 : inline void
      90 MIS           0 : reactor_flush_private_work(
      91                 :     reactor_scheduler_context* ctx,
      92                 :     std::atomic<std::int64_t>& outstanding_work) noexcept
      93                 : {
      94               0 :     if (ctx && ctx->private_outstanding_work > 0)
      95                 :     {
      96               0 :         outstanding_work.fetch_add(
      97                 :             ctx->private_outstanding_work, std::memory_order_relaxed);
      98               0 :         ctx->private_outstanding_work = 0;
      99                 :     }
     100               0 : }
     101                 : 
     102                 : /** Drain private queue to global queue, flushing work count first.
     103                 : 
     104                 :     @return True if any ops were drained.
     105                 : */
     106                 : inline bool
     107 HIT           1 : reactor_drain_private_queue(
     108                 :     reactor_scheduler_context* ctx,
     109                 :     std::atomic<std::int64_t>& outstanding_work,
     110                 :     op_queue& completed_ops) noexcept
     111                 : {
     112               1 :     if (!ctx || ctx->private_queue.empty())
     113               1 :         return false;
     114                 : 
     115 MIS           0 :     reactor_flush_private_work(ctx, outstanding_work);
     116               0 :     completed_ops.splice(ctx->private_queue);
     117               0 :     return true;
     118                 : }
     119                 : 
     120                 : /** Non-template base for reactor-backed scheduler implementations.
     121                 : 
     122                 :     Provides the complete threading model shared by epoll, kqueue,
     123                 :     and select schedulers: signal state machine, inline completion
     124                 :     budget, work counting, run/poll methods, and the do_one event
     125                 :     loop.
     126                 : 
     127                 :     Derived classes provide platform-specific hooks by overriding:
     128                 :     - `run_task(lock, ctx)` to run the reactor poll
     129                 :     - `interrupt_reactor()` to wake a blocked reactor
     130                 : 
     131                 :     De-templated from the original CRTP design to eliminate
     132                 :     duplicate instantiations when multiple backends are compiled
     133                 :     into the same binary. Virtual dispatch for run_task (called
     134                 :     once per reactor cycle, before a blocking syscall) has
     135                 :     negligible overhead.
     136                 : 
     137                 :     @par Thread Safety
     138                 :     All public member functions are thread-safe.
     139                 : */
     140                 : class reactor_scheduler
     141                 :     : public scheduler
     142                 :     , public capy::execution_context::service
     143                 : {
     144                 : public:
     145                 :     using key_type     = scheduler;
     146                 :     using context_type = reactor_scheduler_context;
     147                 :     using mutex_type = conditionally_enabled_mutex;
     148                 :     using lock_type = mutex_type::scoped_lock;
     149                 :     using event_type = conditionally_enabled_event;
     150                 : 
     151                 :     /// Post a coroutine for deferred execution.
     152                 :     void post(std::coroutine_handle<> h) const override;
     153                 : 
     154                 :     /// Post a scheduler operation for deferred execution.
     155                 :     void post(scheduler_op* h) const override;
     156                 : 
     157                 :     /// Return true if called from a thread running this scheduler.
     158                 :     bool running_in_this_thread() const noexcept override;
     159                 : 
     160                 :     /// Request the scheduler to stop dispatching handlers.
     161                 :     void stop() override;
     162                 : 
     163                 :     /// Return true if the scheduler has been stopped.
     164                 :     bool stopped() const noexcept override;
     165                 : 
     166                 :     /// Reset the stopped state so `run()` can resume.
     167                 :     void restart() override;
     168                 : 
     169                 :     /// Run the event loop until no work remains.
     170                 :     std::size_t run() override;
     171                 : 
     172                 :     /// Run until one handler completes or no work remains.
     173                 :     std::size_t run_one() override;
     174                 : 
     175                 :     /// Run until one handler completes or @a usec elapses.
     176                 :     std::size_t wait_one(long usec) override;
     177                 : 
     178                 :     /// Run ready handlers without blocking.
     179                 :     std::size_t poll() override;
     180                 : 
     181                 :     /// Run at most one ready handler without blocking.
     182                 :     std::size_t poll_one() override;
     183                 : 
     184                 :     /// Increment the outstanding work count.
     185                 :     void work_started() noexcept override;
     186                 : 
     187                 :     /// Decrement the outstanding work count, stopping on zero.
     188                 :     void work_finished() noexcept override;
     189                 : 
     190                 :     /** Reset the thread's inline completion budget.
     191                 : 
     192                 :         Called at the start of each posted completion handler to
     193                 :         grant a fresh budget for speculative inline completions.
     194                 :     */
     195                 :     void reset_inline_budget() const noexcept;
     196                 : 
     197                 :     /** Consume one unit of inline budget if available.
     198                 : 
     199                 :         @return True if budget was available and consumed.
     200                 :     */
     201                 :     bool try_consume_inline_budget() const noexcept;
     202                 : 
     203                 :     /** Offset a forthcoming work_finished from work_cleanup.
     204                 : 
     205                 :         Called by descriptor_state when all I/O returned EAGAIN and
     206                 :         no handler will be executed. Must be called from a scheduler
     207                 :         thread.
     208                 :     */
     209                 :     void compensating_work_started() const noexcept;
     210                 : 
     211                 :     /** Drain work from thread context's private queue to global queue.
     212                 : 
     213                 :         Flushes private work count to the global counter, then
     214                 :         transfers the queue under mutex protection.
     215                 : 
     216                 :         @param queue The private queue to drain.
     217                 :         @param count Private work count to flush before draining.
     218                 :     */
     219                 :     void drain_thread_queue(op_queue& queue, std::int64_t count) const;
     220                 : 
     221                 :     /** Post completed operations for deferred invocation.
     222                 : 
     223                 :         If called from a thread running this scheduler, operations
     224                 :         go to the thread's private queue (fast path). Otherwise,
     225                 :         operations are added to the global queue under mutex and a
     226                 :         waiter is signaled.
     227                 : 
     228                 :         @par Preconditions
     229                 :         work_started() must have been called for each operation.
     230                 : 
     231                 :         @param ops Queue of operations to post.
     232                 :     */
     233                 :     void post_deferred_completions(op_queue& ops) const;
     234                 : 
     235                 :     /** Apply runtime configuration to the scheduler.
     236                 : 
     237                 :         Called by `io_context` after construction. Values that do
     238                 :         not apply to this backend are silently ignored.
     239                 : 
     240                 :         @param max_events  Event buffer size for epoll/kqueue.
     241                 :         @param budget_init Starting inline completion budget.
     242                 :         @param budget_max  Hard ceiling on adaptive budget ramp-up.
     243                 :         @param unassisted  Budget when single-threaded.
     244                 :     */
     245                 :     virtual void configure_reactor(
     246                 :         unsigned max_events,
     247                 :         unsigned budget_init,
     248                 :         unsigned budget_max,
     249                 :         unsigned unassisted);
     250                 : 
     251                 :     /// Return the configured initial inline budget.
     252 HIT         860 :     unsigned inline_budget_initial() const noexcept
     253                 :     {
     254             860 :         return inline_budget_initial_;
     255                 :     }
     256                 : 
     257                 :     /// Return true if single-threaded (lockless) mode is active.
     258             159 :     bool is_single_threaded() const noexcept override
     259                 :     {
     260             159 :         return single_threaded_;
     261                 :     }
     262                 : 
     263                 :     /** Enable or disable single-threaded (lockless) mode.
     264                 : 
     265                 :         When enabled, all scheduler mutex and condition variable
     266                 :         operations become no-ops. Cross-thread post() is
     267                 :         undefined behavior.
     268                 :     */
     269              10 :     void configure_single_threaded(bool v) noexcept override
     270                 :     {
     271              10 :         single_threaded_ = v;
     272              10 :         mutex_.set_enabled(!v);
     273              10 :         cond_.set_enabled(!v);
     274              10 :     }
     275                 : 
     276                 : protected:
     277                 :     timer_service* timer_svc_ = nullptr;
     278                 :     bool single_threaded_ = false;
     279                 : 
     280            1244 :     reactor_scheduler() = default;
     281                 : 
     282                 :     /** Drain completed_ops during shutdown.
     283                 : 
     284                 :         Pops all operations from the global queue and destroys them,
     285                 :         skipping the task sentinel. Signals all waiting threads.
     286                 :         Derived classes call this from their shutdown() override
     287                 :         before performing platform-specific cleanup.
     288                 :     */
     289                 :     void shutdown_drain();
     290                 : 
     291                 :     /// RAII guard that re-inserts the task sentinel after `run_task`.
     292                 :     struct task_cleanup
     293                 :     {
     294                 :         reactor_scheduler const* sched;
     295                 :         lock_type* lock;
     296                 :         context_type* ctx;
     297                 :         ~task_cleanup();
     298                 :     };
     299                 : 
     300                 :     mutable mutex_type mutex_{true};
     301                 :     mutable event_type cond_{true};
     302                 :     mutable op_queue completed_ops_;
     303                 :     mutable std::atomic<std::int64_t> outstanding_work_{0};
     304                 :     std::atomic<bool> stopped_{false};
     305                 :     mutable std::atomic<bool> task_running_{false};
     306                 :     mutable bool task_interrupted_ = false;
     307                 : 
     308                 :     // Runtime-configurable reactor tuning parameters.
     309                 :     // Defaults match the library's built-in values.
     310                 :     unsigned max_events_per_poll_   = 128;
     311                 :     unsigned inline_budget_initial_ = 2;
     312                 :     unsigned inline_budget_max_     = 16;
     313                 :     unsigned unassisted_budget_     = 4;
     314                 : 
     315                 :     /// Bit 0 of `state_`: set when the condvar should be signaled.
     316                 :     static constexpr std::size_t signaled_bit = 1;
     317                 : 
     318                 :     /// Increment per waiting thread in `state_`.
     319                 :     static constexpr std::size_t waiter_increment = 2;
     320                 :     mutable std::size_t state_                    = 0;
     321                 : 
     322                 :     /// Sentinel op that triggers a reactor poll when dequeued.
     323                 :     struct task_op final : scheduler_op
     324                 :     {
     325 MIS           0 :         void operator()() override {}
     326               0 :         void destroy() override {}
     327                 :     };
     328                 :     task_op task_op_;
     329                 : 
     330                 :     /// Run the platform-specific reactor poll.
     331                 :     virtual void
     332                 :     run_task(lock_type& lock, context_type* ctx,
     333                 :         long timeout_us) = 0;
     334                 : 
     335                 :     /// Wake a blocked reactor (e.g. write to eventfd or pipe).
     336                 :     virtual void interrupt_reactor() const = 0;
     337                 : 
     338                 : private:
     339                 :     struct work_cleanup
     340                 :     {
     341                 :         reactor_scheduler* sched;
     342                 :         lock_type* lock;
     343                 :         context_type* ctx;
     344                 :         ~work_cleanup();
     345                 :     };
     346                 : 
     347                 :     std::size_t do_one(
     348                 :         lock_type& lock, long timeout_us, context_type* ctx);
     349                 : 
     350                 :     void signal_all(lock_type& lock) const;
     351                 :     bool maybe_unlock_and_signal_one(lock_type& lock) const;
     352                 :     bool unlock_and_signal_one(lock_type& lock) const;
     353                 :     void clear_signal() const;
     354                 :     void wait_for_signal(lock_type& lock) const;
     355                 :     void wait_for_signal_for(
     356                 :         lock_type& lock, long timeout_us) const;
     357                 :     void wake_one_thread_and_unlock(lock_type& lock) const;
     358                 : };
     359                 : 
     360                 : /** RAII guard that pushes/pops a scheduler context frame.
     361                 : 
     362                 :     On construction, pushes a new context frame onto the
     363                 :     thread-local stack. On destruction, drains any remaining
     364                 :     private queue items to the global queue and pops the frame.
     365                 : */
     366                 : struct reactor_thread_context_guard
     367                 : {
     368                 :     /// The context frame managed by this guard.
     369                 :     reactor_scheduler_context frame_;
     370                 : 
     371                 :     /// Construct the guard, pushing a frame for @a sched.
     372 HIT         860 :     explicit reactor_thread_context_guard(
     373                 :         reactor_scheduler const* sched) noexcept
     374             860 :         : frame_(sched, reactor_context_stack.get())
     375                 :     {
     376             860 :         reactor_context_stack.set(&frame_);
     377             860 :     }
     378                 : 
     379                 :     /// Destroy the guard, draining private work and popping the frame.
     380             860 :     ~reactor_thread_context_guard() noexcept
     381                 :     {
     382             860 :         if (!frame_.private_queue.empty())
     383 MIS           0 :             frame_.key->drain_thread_queue(
     384               0 :                 frame_.private_queue, frame_.private_outstanding_work);
     385 HIT         860 :         reactor_context_stack.set(frame_.next);
     386             860 :     }
     387                 : };
     388                 : 
     389                 : // ---- Inline implementations ------------------------------------------------
     390                 : 
     391                 : inline
     392             860 : reactor_scheduler_context::reactor_scheduler_context(
     393                 :     reactor_scheduler const* k,
     394             860 :     reactor_scheduler_context* n)
     395             860 :     : key(k)
     396             860 :     , next(n)
     397             860 :     , private_outstanding_work(0)
     398             860 :     , inline_budget(0)
     399             860 :     , inline_budget_max(
     400             860 :           static_cast<int>(k->inline_budget_initial()))
     401             860 :     , unassisted(false)
     402                 : {
     403             860 : }
     404                 : 
     405                 : inline void
     406              16 : reactor_scheduler::configure_reactor(
     407                 :     unsigned max_events,
     408                 :     unsigned budget_init,
     409                 :     unsigned budget_max,
     410                 :     unsigned unassisted)
     411                 : {
     412              30 :     if (max_events < 1 ||
     413              14 :         max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
     414                 :         throw std::out_of_range(
     415               2 :             "max_events_per_poll must be in [1, INT_MAX]");
     416              14 :     if (budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
     417                 :         throw std::out_of_range(
     418 MIS           0 :             "inline_budget_max must be in [0, INT_MAX]");
     419                 : 
     420                 :     // Clamp initial and unassisted to budget_max.
     421 HIT          14 :     if (budget_init > budget_max)
     422               2 :         budget_init = budget_max;
     423              14 :     if (unassisted > budget_max)
     424               2 :         unassisted = budget_max;
     425                 : 
     426              14 :     max_events_per_poll_   = max_events;
     427              14 :     inline_budget_initial_ = budget_init;
     428              14 :     inline_budget_max_     = budget_max;
     429              14 :     unassisted_budget_     = unassisted;
     430              14 : }
     431                 : 
     432                 : inline void
     433          108488 : reactor_scheduler::reset_inline_budget() const noexcept
     434                 : {
     435                 :     // When budget is disabled (max==0), all paths below would no-op
     436                 :     // (inline_budget stays 0). Skip the TLS lookup entirely.
     437          108488 :     if (inline_budget_max_ == 0)
     438 MIS           0 :         return;
     439 HIT      108488 :     if (auto* ctx = reactor_find_context(this))
     440                 :     {
     441                 :         // Cap when no other thread absorbed queued work
     442          108488 :         if (ctx->unassisted)
     443                 :         {
     444          108488 :             ctx->inline_budget_max =
     445          108488 :                 static_cast<int>(unassisted_budget_);
     446          108488 :             ctx->inline_budget =
     447          108488 :                 static_cast<int>(unassisted_budget_);
     448          108488 :             return;
     449                 :         }
     450                 :         // Ramp up when previous cycle fully consumed budget.
     451                 :         // max(1, ...) ensures the doubling escapes zero.
     452 MIS           0 :         if (ctx->inline_budget == 0)
     453               0 :             ctx->inline_budget_max = (std::min)(
     454               0 :                 (std::max)(1, ctx->inline_budget_max) * 2,
     455               0 :                 static_cast<int>(inline_budget_max_));
     456               0 :         else if (ctx->inline_budget < ctx->inline_budget_max)
     457               0 :             ctx->inline_budget_max =
     458               0 :                 static_cast<int>(inline_budget_initial_);
     459               0 :         ctx->inline_budget = ctx->inline_budget_max;
     460                 :     }
     461                 : }
     462                 : 
     463                 : inline bool
     464 HIT      454780 : reactor_scheduler::try_consume_inline_budget() const noexcept
     465                 : {
     466          454780 :     if (inline_budget_max_ == 0)
     467 MIS           0 :         return false;
     468 HIT      454780 :     if (auto* ctx = reactor_find_context(this))
     469                 :     {
     470          454780 :         if (ctx->inline_budget > 0)
     471                 :         {
     472          363784 :             --ctx->inline_budget;
     473          363784 :             return true;
     474                 :         }
     475                 :     }
     476           90996 :     return false;
     477                 : }
     478                 : 
     479                 : inline void
     480            4355 : reactor_scheduler::post(std::coroutine_handle<> h) const
     481                 : {
     482                 :     struct post_handler final : scheduler_op
     483                 :     {
     484                 :         std::coroutine_handle<> h_;
     485                 : 
     486            4355 :         explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
     487            8710 :         ~post_handler() override = default;
     488                 : 
     489            4343 :         void operator()() override
     490                 :         {
     491            4343 :             auto saved = h_;
     492            4343 :             delete this;
     493                 :             // Ensure stores from the posting thread are visible. TSan
     494                 :             // cannot instrument standalone fences; this acquire pairs
     495                 :             // with the posting thread's release and is intentional.
     496                 :             BOOST_COROSIO_GCC_WARNING_PUSH
     497                 :             BOOST_COROSIO_GCC_WARNING_DISABLE("-Wtsan")
     498                 :             std::atomic_thread_fence(std::memory_order_acquire);
     499                 :             BOOST_COROSIO_GCC_WARNING_POP
     500            4343 :             saved.resume();
     501            4343 :         }
     502                 : 
     503              12 :         void destroy() override
     504                 :         {
     505              12 :             auto saved = h_;
     506              12 :             delete this;
     507              12 :             saved.destroy();
     508              12 :         }
     509                 :     };
     510                 : 
     511            4355 :     auto ph = std::make_unique<post_handler>(h);
     512                 : 
     513            4355 :     if (auto* ctx = reactor_find_context(this))
     514                 :     {
     515              24 :         ++ctx->private_outstanding_work;
     516              24 :         ctx->private_queue.push(ph.release());
     517              24 :         return;
     518                 :     }
     519                 : 
     520            4331 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     521                 : 
     522            4331 :     lock_type lock(mutex_);
     523            4331 :     completed_ops_.push(ph.release());
     524            4331 :     wake_one_thread_and_unlock(lock);
     525            4355 : }
     526                 : 
     527                 : inline void
     528          109515 : reactor_scheduler::post(scheduler_op* h) const
     529                 : {
     530          109515 :     if (auto* ctx = reactor_find_context(this))
     531                 :     {
     532          109122 :         ++ctx->private_outstanding_work;
     533          109122 :         ctx->private_queue.push(h);
     534          109122 :         return;
     535                 :     }
     536                 : 
     537             393 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     538                 : 
     539             393 :     lock_type lock(mutex_);
     540             393 :     completed_ops_.push(h);
     541             393 :     wake_one_thread_and_unlock(lock);
     542             393 : }
     543                 : 
     544                 : inline bool
     545            2152 : reactor_scheduler::running_in_this_thread() const noexcept
     546                 : {
     547            2152 :     return reactor_find_context(this) != nullptr;
     548                 : }
     549                 : 
     550                 : inline void
     551             892 : reactor_scheduler::stop()
     552                 : {
     553             892 :     lock_type lock(mutex_);
     554             892 :     if (!stopped_.load(std::memory_order_acquire))
     555                 :     {
     556             821 :         stopped_.store(true, std::memory_order_release);
     557             821 :         signal_all(lock);
     558             821 :         interrupt_reactor();
     559                 :     }
     560             892 : }
     561                 : 
     562                 : inline bool
     563              88 : reactor_scheduler::stopped() const noexcept
     564                 : {
     565              88 :     return stopped_.load(std::memory_order_acquire);
     566                 : }
     567                 : 
     568                 : inline void
     569             177 : reactor_scheduler::restart()
     570                 : {
     571             177 :     stopped_.store(false, std::memory_order_release);
     572             177 : }
     573                 : 
     574                 : inline std::size_t
     575             835 : reactor_scheduler::run()
     576                 : {
     577            1670 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     578                 :     {
     579              59 :         stop();
     580              59 :         return 0;
     581                 :     }
     582                 : 
     583             776 :     reactor_thread_context_guard ctx(this);
     584             776 :     lock_type lock(mutex_);
     585                 : 
     586             776 :     std::size_t n = 0;
     587                 :     for (;;)
     588                 :     {
     589          391141 :         if (!do_one(lock, -1, &ctx.frame_))
     590             776 :             break;
     591          390365 :         if (n != (std::numeric_limits<std::size_t>::max)())
     592          390365 :             ++n;
     593          390365 :         if (!lock.owns_lock())
     594          289902 :             lock.lock();
     595                 :     }
     596             776 :     return n;
     597             776 : }
     598                 : 
     599                 : inline std::size_t
     600              16 : reactor_scheduler::run_one()
     601                 : {
     602              32 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     603                 :     {
     604 MIS           0 :         stop();
     605               0 :         return 0;
     606                 :     }
     607                 : 
     608 HIT          16 :     reactor_thread_context_guard ctx(this);
     609              16 :     lock_type lock(mutex_);
     610              16 :     return do_one(lock, -1, &ctx.frame_);
     611              16 : }
     612                 : 
     613                 : inline std::size_t
     614              62 : reactor_scheduler::wait_one(long usec)
     615                 : {
     616             124 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     617                 :     {
     618              20 :         stop();
     619              20 :         return 0;
     620                 :     }
     621                 : 
     622              42 :     reactor_thread_context_guard ctx(this);
     623              42 :     lock_type lock(mutex_);
     624              42 :     return do_one(lock, usec, &ctx.frame_);
     625              42 : }
     626                 : 
     627                 : inline std::size_t
     628              28 : reactor_scheduler::poll()
     629                 : {
     630              56 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     631                 :     {
     632               6 :         stop();
     633               6 :         return 0;
     634                 :     }
     635                 : 
     636              22 :     reactor_thread_context_guard ctx(this);
     637              22 :     lock_type lock(mutex_);
     638                 : 
     639              22 :     std::size_t n = 0;
     640                 :     for (;;)
     641                 :     {
     642              64 :         if (!do_one(lock, 0, &ctx.frame_))
     643              22 :             break;
     644              42 :         if (n != (std::numeric_limits<std::size_t>::max)())
     645              42 :             ++n;
     646              42 :         if (!lock.owns_lock())
     647              34 :             lock.lock();
     648                 :     }
     649              22 :     return n;
     650              22 : }
     651                 : 
     652                 : inline std::size_t
     653               8 : reactor_scheduler::poll_one()
     654                 : {
     655              16 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     656                 :     {
     657               4 :         stop();
     658               4 :         return 0;
     659                 :     }
     660                 : 
     661               4 :     reactor_thread_context_guard ctx(this);
     662               4 :     lock_type lock(mutex_);
     663               4 :     return do_one(lock, 0, &ctx.frame_);
     664               4 : }
     665                 : 
     666                 : inline void
     667           28681 : reactor_scheduler::work_started() noexcept
     668                 : {
     669           28681 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     670           28681 : }
     671                 : 
     672                 : inline void
     673           42032 : reactor_scheduler::work_finished() noexcept
     674                 : {
     675           84064 :     if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
     676             795 :         stop();
     677           42032 : }
     678                 : 
     679                 : inline void
     680          259549 : reactor_scheduler::compensating_work_started() const noexcept
     681                 : {
     682          259549 :     auto* ctx = reactor_find_context(this);
     683          259549 :     if (ctx)
     684          259549 :         ++ctx->private_outstanding_work;
     685          259549 : }
     686                 : 
     687                 : inline void
     688 MIS           0 : reactor_scheduler::drain_thread_queue(
     689                 :     op_queue& queue, std::int64_t count) const
     690                 : {
     691               0 :     if (count > 0)
     692               0 :         outstanding_work_.fetch_add(count, std::memory_order_relaxed);
     693                 : 
     694               0 :     lock_type lock(mutex_);
     695               0 :     completed_ops_.splice(queue);
     696               0 :     if (count > 0)
     697               0 :         maybe_unlock_and_signal_one(lock);
     698               0 : }
     699                 : 
     700                 : inline void
     701 HIT       17068 : reactor_scheduler::post_deferred_completions(op_queue& ops) const
     702                 : {
     703           17068 :     if (ops.empty())
     704           17068 :         return;
     705                 : 
     706 MIS           0 :     if (auto* ctx = reactor_find_context(this))
     707                 :     {
     708               0 :         ctx->private_queue.splice(ops);
     709               0 :         return;
     710                 :     }
     711                 : 
     712               0 :     lock_type lock(mutex_);
     713               0 :     completed_ops_.splice(ops);
     714               0 :     wake_one_thread_and_unlock(lock);
     715               0 : }
     716                 : 
     717                 : inline void
     718 HIT        1244 : reactor_scheduler::shutdown_drain()
     719                 : {
     720            1244 :     lock_type lock(mutex_);
     721                 : 
     722            2677 :     while (auto* h = completed_ops_.pop())
     723                 :     {
     724            1433 :         if (h == &task_op_)
     725            1244 :             continue;
     726             189 :         lock.unlock();
     727             189 :         h->destroy();
     728             189 :         lock.lock();
     729            1433 :     }
     730                 : 
     731            1244 :     signal_all(lock);
     732            1244 : }
     733                 : 
     734                 : inline void
     735            2065 : reactor_scheduler::signal_all(lock_type&) const
     736                 : {
     737            2065 :     state_ |= signaled_bit;
     738            2065 :     cond_.notify_all();
     739            2065 : }
     740                 : 
     741                 : inline bool
     742            4724 : reactor_scheduler::maybe_unlock_and_signal_one(
     743                 :     lock_type& lock) const
     744                 : {
     745            4724 :     state_ |= signaled_bit;
     746            4724 :     if (state_ > signaled_bit)
     747                 :     {
     748 MIS           0 :         lock.unlock();
     749               0 :         cond_.notify_one();
     750               0 :         return true;
     751                 :     }
     752 HIT        4724 :     return false;
     753                 : }
     754                 : 
     755                 : inline bool
     756          443526 : reactor_scheduler::unlock_and_signal_one(
     757                 :     lock_type& lock) const
     758                 : {
     759          443526 :     state_ |= signaled_bit;
     760          443526 :     bool have_waiters = state_ > signaled_bit;
     761          443526 :     lock.unlock();
     762          443526 :     if (have_waiters)
     763 MIS           0 :         cond_.notify_one();
     764 HIT      443526 :     return have_waiters;
     765                 : }
     766                 : 
     767                 : inline void
     768               1 : reactor_scheduler::clear_signal() const
     769                 : {
     770               1 :     state_ &= ~signaled_bit;
     771               1 : }
     772                 : 
     773                 : inline void
     774               1 : reactor_scheduler::wait_for_signal(
     775                 :     lock_type& lock) const
     776                 : {
     777               2 :     while ((state_ & signaled_bit) == 0)
     778                 :     {
     779               1 :         state_ += waiter_increment;
     780               1 :         cond_.wait(lock);
     781               1 :         state_ -= waiter_increment;
     782                 :     }
     783               1 : }
     784                 : 
     785                 : inline void
     786 MIS           0 : reactor_scheduler::wait_for_signal_for(
     787                 :     lock_type& lock, long timeout_us) const
     788                 : {
     789               0 :     if ((state_ & signaled_bit) == 0)
     790                 :     {
     791               0 :         state_ += waiter_increment;
     792               0 :         cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
     793               0 :         state_ -= waiter_increment;
     794                 :     }
     795               0 : }
     796                 : 
     797                 : inline void
     798 HIT        4724 : reactor_scheduler::wake_one_thread_and_unlock(
     799                 :     lock_type& lock) const
     800                 : {
     801            4724 :     if (maybe_unlock_and_signal_one(lock))
     802 MIS           0 :         return;
     803                 : 
     804 HIT        4724 :     if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
     805                 :     {
     806             105 :         task_interrupted_ = true;
     807             105 :         lock.unlock();
     808             105 :         interrupt_reactor();
     809                 :     }
     810                 :     else
     811                 :     {
     812            4619 :         lock.unlock();
     813                 :     }
     814                 : }
     815                 : 
     816          390451 : inline reactor_scheduler::work_cleanup::~work_cleanup()
     817                 : {
     818          390451 :     if (ctx)
     819                 :     {
     820          390451 :         std::int64_t produced = ctx->private_outstanding_work;
     821          390451 :         if (produced > 1)
     822              14 :             sched->outstanding_work_.fetch_add(
     823                 :                 produced - 1, std::memory_order_relaxed);
     824          390437 :         else if (produced < 1)
     825           30431 :             sched->work_finished();
     826          390451 :         ctx->private_outstanding_work = 0;
     827                 : 
     828          390451 :         if (!ctx->private_queue.empty())
     829                 :         {
     830          100471 :             lock->lock();
     831          100471 :             sched->completed_ops_.splice(ctx->private_queue);
     832                 :         }
     833                 :     }
     834                 :     else
     835                 :     {
     836 MIS           0 :         sched->work_finished();
     837                 :     }
     838 HIT      390451 : }
     839                 : 
     840          587890 : inline reactor_scheduler::task_cleanup::~task_cleanup()
     841                 : {
     842          293945 :     if (!ctx)
     843 MIS           0 :         return;
     844                 : 
     845 HIT      293945 :     if (ctx->private_outstanding_work > 0)
     846                 :     {
     847            8649 :         sched->outstanding_work_.fetch_add(
     848            8649 :             ctx->private_outstanding_work, std::memory_order_relaxed);
     849            8649 :         ctx->private_outstanding_work = 0;
     850                 :     }
     851                 : 
     852          293945 :     if (!ctx->private_queue.empty())
     853                 :     {
     854            8649 :         if (!lock->owns_lock())
     855 MIS           0 :             lock->lock();
     856 HIT        8649 :         sched->completed_ops_.splice(ctx->private_queue);
     857                 :     }
     858          293945 : }
     859                 : 
     860                 : inline std::size_t
     861          391267 : reactor_scheduler::do_one(
     862                 :     lock_type& lock, long timeout_us, context_type* ctx)
     863                 : {
     864                 :     for (;;)
     865                 :     {
     866          685195 :         if (stopped_.load(std::memory_order_acquire))
     867             782 :             return 0;
     868                 : 
     869          684413 :         scheduler_op* op = completed_ops_.pop();
     870                 : 
     871                 :         // Handle reactor sentinel — time to poll for I/O
     872          684413 :         if (op == &task_op_)
     873                 :         {
     874                 :             bool more_handlers =
     875          293961 :                 !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
     876                 : 
     877          534847 :             if (!more_handlers &&
     878          481772 :                 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
     879                 :                  timeout_us == 0))
     880                 :             {
     881              16 :                 completed_ops_.push(&task_op_);
     882              16 :                 return 0;
     883                 :             }
     884                 : 
     885          293945 :             long task_timeout_us = more_handlers ? 0 : timeout_us;
     886          293945 :             task_interrupted_ = task_timeout_us == 0;
     887          293945 :             task_running_.store(true, std::memory_order_release);
     888                 : 
     889          293945 :             if (more_handlers)
     890           53075 :                 unlock_and_signal_one(lock);
     891                 : 
     892                 :             try
     893                 :             {
     894          293945 :                 run_task(lock, ctx, task_timeout_us);
     895                 :             }
     896 MIS           0 :             catch (...)
     897                 :             {
     898               0 :                 task_running_.store(false, std::memory_order_relaxed);
     899               0 :                 throw;
     900               0 :             }
     901                 : 
     902 HIT      293945 :             task_running_.store(false, std::memory_order_relaxed);
     903          293945 :             completed_ops_.push(&task_op_);
     904          293945 :             if (timeout_us > 0)
     905              18 :                 return 0;
     906          293927 :             continue;
     907          293927 :         }
     908                 : 
     909                 :         // Handle operation
     910          390452 :         if (op != nullptr)
     911                 :         {
     912          390451 :             bool more = !completed_ops_.empty();
     913                 : 
     914          390451 :             if (more)
     915          390451 :                 ctx->unassisted = !unlock_and_signal_one(lock);
     916                 :             else
     917                 :             {
     918 MIS           0 :                 ctx->unassisted = false;
     919               0 :                 lock.unlock();
     920                 :             }
     921                 : 
     922 HIT      390451 :             work_cleanup on_exit{this, &lock, ctx};
     923                 :             (void)on_exit;
     924                 : 
     925          390451 :             (*op)();
     926          390451 :             return 1;
     927          390451 :         }
     928                 : 
     929                 :         // Try private queue before blocking
     930               1 :         if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
     931 MIS           0 :             continue;
     932                 : 
     933 HIT           2 :         if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
     934                 :             timeout_us == 0)
     935 MIS           0 :             return 0;
     936                 : 
     937 HIT           1 :         clear_signal();
     938               1 :         if (timeout_us < 0)
     939               1 :             wait_for_signal(lock);
     940                 :         else
     941 MIS           0 :             wait_for_signal_for(lock, timeout_us);
     942 HIT      293928 :     }
     943                 : }
     944                 : 
     945                 : } // namespace boost::corosio::detail
     946                 : 
     947                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
        

Generated by: LCOV version 2.3