include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp

81.6% Lines (279/342) 89.1% List of functions (41/46)
reactor_scheduler.hpp
f(x) Functions (46)
Function Calls Lines Blocks
boost::corosio::detail::reactor_find_context(boost::corosio::detail::reactor_scheduler const*) :78 938839x 100.0% 86.0% boost::corosio::detail::reactor_flush_private_work(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&) :90 0 0.0% 0.0% boost::corosio::detail::reactor_drain_private_queue(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :107 1x 50.0% 64.0% boost::corosio::detail::reactor_scheduler::inline_budget_initial() const :252 860x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::is_single_threaded() const :258 159x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::configure_single_threaded(bool) :269 10x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::reactor_scheduler() :280 1244x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::task_op::operator()() :325 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::task_op::destroy() :326 0 0.0% 0.0% boost::corosio::detail::reactor_thread_context_guard::reactor_thread_context_guard(boost::corosio::detail::reactor_scheduler const*) :372 860x 100.0% 100.0% boost::corosio::detail::reactor_thread_context_guard::~reactor_thread_context_guard() :380 860x 66.7% 80.0% boost::corosio::detail::reactor_scheduler_context::reactor_scheduler_context(boost::corosio::detail::reactor_scheduler const*, boost::corosio::detail::reactor_scheduler_context*) :392 860x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :406 16x 93.3% 74.0% boost::corosio::detail::reactor_scheduler::reset_inline_budget() const :433 108488x 50.0% 43.0% boost::corosio::detail::reactor_scheduler::try_consume_inline_budget() const :464 454780x 87.5% 88.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const :480 4355x 100.0% 84.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :486 4355x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::~post_handler() :487 8710x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :489 4343x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :503 12x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(boost::corosio::detail::scheduler_op*) const :528 109515x 100.0% 87.0% boost::corosio::detail::reactor_scheduler::running_in_this_thread() const :545 2152x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::stop() :551 892x 100.0% 82.0% boost::corosio::detail::reactor_scheduler::stopped() const :563 88x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::restart() :569 177x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::run() :575 835x 100.0% 76.0% boost::corosio::detail::reactor_scheduler::run_one() :600 16x 75.0% 64.0% boost::corosio::detail::reactor_scheduler::wait_one(long) :614 62x 100.0% 70.0% boost::corosio::detail::reactor_scheduler::poll() :628 28x 100.0% 76.0% boost::corosio::detail::reactor_scheduler::poll_one() :653 8x 100.0% 70.0% boost::corosio::detail::reactor_scheduler::work_started() :667 28681x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::work_finished() :673 42032x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::compensating_work_started() const :680 259549x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long) const :688 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :701 17068x 30.0% 35.0% boost::corosio::detail::reactor_scheduler::shutdown_drain() :718 1244x 100.0% 88.0% boost::corosio::detail::reactor_scheduler::signal_all(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :735 2065x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::maybe_unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :742 4724x 57.1% 50.0% boost::corosio::detail::reactor_scheduler::unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :756 443526x 85.7% 80.0% boost::corosio::detail::reactor_scheduler::clear_signal() const :768 1x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::wait_for_signal(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :774 1x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::wait_for_signal_for(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long) const :786 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::wake_one_thread_and_unlock(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :798 4724x 87.5% 92.0% boost::corosio::detail::reactor_scheduler::work_cleanup::~work_cleanup() :816 390451x 92.3% 92.0% boost::corosio::detail::reactor_scheduler::task_cleanup::~task_cleanup() :840 293945x 83.3% 86.0% boost::corosio::detail::reactor_scheduler::do_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long, boost::corosio::detail::reactor_scheduler_context*) :861 391267x 80.0% 75.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/capy/ex/execution_context.hpp>
15
16 #include <boost/corosio/detail/scheduler.hpp>
17 #include <boost/corosio/detail/scheduler_op.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19
20 #include <atomic>
21 #include <chrono>
22 #include <coroutine>
23 #include <cstddef>
24 #include <cstdint>
25 #include <limits>
26 #include <memory>
27 #include <stdexcept>
28
29 #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
30 #include <boost/corosio/detail/conditionally_enabled_event.hpp>
31
32 namespace boost::corosio::detail {
33
34 // Forward declarations
35 class reactor_scheduler;
36 class timer_service;
37
38 /** Per-thread state for a reactor scheduler.
39
40 Each thread running a scheduler's event loop has one of these
41 on a thread-local stack. It holds a private work queue and
42 inline completion budget for speculative I/O fast paths.
43 */
44 struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
45 {
46 /// Scheduler this context belongs to.
47 reactor_scheduler const* key;
48
49 /// Next context frame on this thread's stack.
50 reactor_scheduler_context* next;
51
52 /// Private work queue for reduced contention.
53 op_queue private_queue;
54
55 /// Unflushed work count for the private queue.
56 std::int64_t private_outstanding_work;
57
58 /// Remaining inline completions allowed this cycle.
59 int inline_budget;
60
61 /// Maximum inline budget (adaptive, 2-16).
62 int inline_budget_max;
63
64 /// True if no other thread absorbed queued work last cycle.
65 bool unassisted;
66
67 /// Construct a context frame linked to @a n.
68 reactor_scheduler_context(
69 reactor_scheduler const* k,
70 reactor_scheduler_context* n);
71 };
72
73 /// Thread-local context stack for reactor schedulers.
74 inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
75
76 /// Find the context frame for a scheduler on this thread.
77 inline reactor_scheduler_context*
78 938839x reactor_find_context(reactor_scheduler const* self) noexcept
79 {
80 938839x for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
81 {
82 932640x if (c->key == self)
83 932640x return c;
84 }
85 6199x return nullptr;
86 }
87
88 /// Flush private work count to global counter.
89 inline void
90 reactor_flush_private_work(
91 reactor_scheduler_context* ctx,
92 std::atomic<std::int64_t>& outstanding_work) noexcept
93 {
94 if (ctx && ctx->private_outstanding_work > 0)
95 {
96 outstanding_work.fetch_add(
97 ctx->private_outstanding_work, std::memory_order_relaxed);
98 ctx->private_outstanding_work = 0;
99 }
100 }
101
102 /** Drain private queue to global queue, flushing work count first.
103
104 @return True if any ops were drained.
105 */
106 inline bool
107 1x reactor_drain_private_queue(
108 reactor_scheduler_context* ctx,
109 std::atomic<std::int64_t>& outstanding_work,
110 op_queue& completed_ops) noexcept
111 {
112 1x if (!ctx || ctx->private_queue.empty())
113 1x return false;
114
115 reactor_flush_private_work(ctx, outstanding_work);
116 completed_ops.splice(ctx->private_queue);
117 return true;
118 }
119
120 /** Non-template base for reactor-backed scheduler implementations.
121
122 Provides the complete threading model shared by epoll, kqueue,
123 and select schedulers: signal state machine, inline completion
124 budget, work counting, run/poll methods, and the do_one event
125 loop.
126
127 Derived classes provide platform-specific hooks by overriding:
128 - `run_task(lock, ctx)` to run the reactor poll
129 - `interrupt_reactor()` to wake a blocked reactor
130
131 De-templated from the original CRTP design to eliminate
132 duplicate instantiations when multiple backends are compiled
133 into the same binary. Virtual dispatch for run_task (called
134 once per reactor cycle, before a blocking syscall) has
135 negligible overhead.
136
137 @par Thread Safety
138 All public member functions are thread-safe.
139 */
140 class reactor_scheduler
141 : public scheduler
142 , public capy::execution_context::service
143 {
144 public:
145 using key_type = scheduler;
146 using context_type = reactor_scheduler_context;
147 using mutex_type = conditionally_enabled_mutex;
148 using lock_type = mutex_type::scoped_lock;
149 using event_type = conditionally_enabled_event;
150
151 /// Post a coroutine for deferred execution.
152 void post(std::coroutine_handle<> h) const override;
153
154 /// Post a scheduler operation for deferred execution.
155 void post(scheduler_op* h) const override;
156
157 /// Return true if called from a thread running this scheduler.
158 bool running_in_this_thread() const noexcept override;
159
160 /// Request the scheduler to stop dispatching handlers.
161 void stop() override;
162
163 /// Return true if the scheduler has been stopped.
164 bool stopped() const noexcept override;
165
166 /// Reset the stopped state so `run()` can resume.
167 void restart() override;
168
169 /// Run the event loop until no work remains.
170 std::size_t run() override;
171
172 /// Run until one handler completes or no work remains.
173 std::size_t run_one() override;
174
175 /// Run until one handler completes or @a usec elapses.
176 std::size_t wait_one(long usec) override;
177
178 /// Run ready handlers without blocking.
179 std::size_t poll() override;
180
181 /// Run at most one ready handler without blocking.
182 std::size_t poll_one() override;
183
184 /// Increment the outstanding work count.
185 void work_started() noexcept override;
186
187 /// Decrement the outstanding work count, stopping on zero.
188 void work_finished() noexcept override;
189
190 /** Reset the thread's inline completion budget.
191
192 Called at the start of each posted completion handler to
193 grant a fresh budget for speculative inline completions.
194 */
195 void reset_inline_budget() const noexcept;
196
197 /** Consume one unit of inline budget if available.
198
199 @return True if budget was available and consumed.
200 */
201 bool try_consume_inline_budget() const noexcept;
202
203 /** Offset a forthcoming work_finished from work_cleanup.
204
205 Called by descriptor_state when all I/O returned EAGAIN and
206 no handler will be executed. Must be called from a scheduler
207 thread.
208 */
209 void compensating_work_started() const noexcept;
210
211 /** Drain work from thread context's private queue to global queue.
212
213 Flushes private work count to the global counter, then
214 transfers the queue under mutex protection.
215
216 @param queue The private queue to drain.
217 @param count Private work count to flush before draining.
218 */
219 void drain_thread_queue(op_queue& queue, std::int64_t count) const;
220
221 /** Post completed operations for deferred invocation.
222
223 If called from a thread running this scheduler, operations
224 go to the thread's private queue (fast path). Otherwise,
225 operations are added to the global queue under mutex and a
226 waiter is signaled.
227
228 @par Preconditions
229 work_started() must have been called for each operation.
230
231 @param ops Queue of operations to post.
232 */
233 void post_deferred_completions(op_queue& ops) const;
234
235 /** Apply runtime configuration to the scheduler.
236
237 Called by `io_context` after construction. Values that do
238 not apply to this backend are silently ignored.
239
240 @param max_events Event buffer size for epoll/kqueue.
241 @param budget_init Starting inline completion budget.
242 @param budget_max Hard ceiling on adaptive budget ramp-up.
243 @param unassisted Budget when single-threaded.
244 */
245 virtual void configure_reactor(
246 unsigned max_events,
247 unsigned budget_init,
248 unsigned budget_max,
249 unsigned unassisted);
250
251 /// Return the configured initial inline budget.
252 860x unsigned inline_budget_initial() const noexcept
253 {
254 860x return inline_budget_initial_;
255 }
256
257 /// Return true if single-threaded (lockless) mode is active.
258 159x bool is_single_threaded() const noexcept override
259 {
260 159x return single_threaded_;
261 }
262
263 /** Enable or disable single-threaded (lockless) mode.
264
265 When enabled, all scheduler mutex and condition variable
266 operations become no-ops. Cross-thread post() is
267 undefined behavior.
268 */
269 10x void configure_single_threaded(bool v) noexcept override
270 {
271 10x single_threaded_ = v;
272 10x mutex_.set_enabled(!v);
273 10x cond_.set_enabled(!v);
274 10x }
275
276 protected:
277 timer_service* timer_svc_ = nullptr;
278 bool single_threaded_ = false;
279
280 1244x reactor_scheduler() = default;
281
282 /** Drain completed_ops during shutdown.
283
284 Pops all operations from the global queue and destroys them,
285 skipping the task sentinel. Signals all waiting threads.
286 Derived classes call this from their shutdown() override
287 before performing platform-specific cleanup.
288 */
289 void shutdown_drain();
290
291 /// RAII guard that re-inserts the task sentinel after `run_task`.
292 struct task_cleanup
293 {
294 reactor_scheduler const* sched;
295 lock_type* lock;
296 context_type* ctx;
297 ~task_cleanup();
298 };
299
300 mutable mutex_type mutex_{true};
301 mutable event_type cond_{true};
302 mutable op_queue completed_ops_;
303 mutable std::atomic<std::int64_t> outstanding_work_{0};
304 std::atomic<bool> stopped_{false};
305 mutable std::atomic<bool> task_running_{false};
306 mutable bool task_interrupted_ = false;
307
308 // Runtime-configurable reactor tuning parameters.
309 // Defaults match the library's built-in values.
310 unsigned max_events_per_poll_ = 128;
311 unsigned inline_budget_initial_ = 2;
312 unsigned inline_budget_max_ = 16;
313 unsigned unassisted_budget_ = 4;
314
315 /// Bit 0 of `state_`: set when the condvar should be signaled.
316 static constexpr std::size_t signaled_bit = 1;
317
318 /// Increment per waiting thread in `state_`.
319 static constexpr std::size_t waiter_increment = 2;
320 mutable std::size_t state_ = 0;
321
322 /// Sentinel op that triggers a reactor poll when dequeued.
323 struct task_op final : scheduler_op
324 {
325 void operator()() override {}
326 void destroy() override {}
327 };
328 task_op task_op_;
329
330 /// Run the platform-specific reactor poll.
331 virtual void
332 run_task(lock_type& lock, context_type* ctx,
333 long timeout_us) = 0;
334
335 /// Wake a blocked reactor (e.g. write to eventfd or pipe).
336 virtual void interrupt_reactor() const = 0;
337
338 private:
339 struct work_cleanup
340 {
341 reactor_scheduler* sched;
342 lock_type* lock;
343 context_type* ctx;
344 ~work_cleanup();
345 };
346
347 std::size_t do_one(
348 lock_type& lock, long timeout_us, context_type* ctx);
349
350 void signal_all(lock_type& lock) const;
351 bool maybe_unlock_and_signal_one(lock_type& lock) const;
352 bool unlock_and_signal_one(lock_type& lock) const;
353 void clear_signal() const;
354 void wait_for_signal(lock_type& lock) const;
355 void wait_for_signal_for(
356 lock_type& lock, long timeout_us) const;
357 void wake_one_thread_and_unlock(lock_type& lock) const;
358 };
359
360 /** RAII guard that pushes/pops a scheduler context frame.
361
362 On construction, pushes a new context frame onto the
363 thread-local stack. On destruction, drains any remaining
364 private queue items to the global queue and pops the frame.
365 */
366 struct reactor_thread_context_guard
367 {
368 /// The context frame managed by this guard.
369 reactor_scheduler_context frame_;
370
371 /// Construct the guard, pushing a frame for @a sched.
372 860x explicit reactor_thread_context_guard(
373 reactor_scheduler const* sched) noexcept
374 860x : frame_(sched, reactor_context_stack.get())
375 {
376 860x reactor_context_stack.set(&frame_);
377 860x }
378
379 /// Destroy the guard, draining private work and popping the frame.
380 860x ~reactor_thread_context_guard() noexcept
381 {
382 860x if (!frame_.private_queue.empty())
383 frame_.key->drain_thread_queue(
384 frame_.private_queue, frame_.private_outstanding_work);
385 860x reactor_context_stack.set(frame_.next);
386 860x }
387 };
388
389 // ---- Inline implementations ------------------------------------------------
390
391 inline
392 860x reactor_scheduler_context::reactor_scheduler_context(
393 reactor_scheduler const* k,
394 860x reactor_scheduler_context* n)
395 860x : key(k)
396 860x , next(n)
397 860x , private_outstanding_work(0)
398 860x , inline_budget(0)
399 860x , inline_budget_max(
400 860x static_cast<int>(k->inline_budget_initial()))
401 860x , unassisted(false)
402 {
403 860x }
404
405 inline void
406 16x reactor_scheduler::configure_reactor(
407 unsigned max_events,
408 unsigned budget_init,
409 unsigned budget_max,
410 unsigned unassisted)
411 {
412 30x if (max_events < 1 ||
413 14x max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
414 throw std::out_of_range(
415 2x "max_events_per_poll must be in [1, INT_MAX]");
416 14x if (budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
417 throw std::out_of_range(
418 "inline_budget_max must be in [0, INT_MAX]");
419
420 // Clamp initial and unassisted to budget_max.
421 14x if (budget_init > budget_max)
422 2x budget_init = budget_max;
423 14x if (unassisted > budget_max)
424 2x unassisted = budget_max;
425
426 14x max_events_per_poll_ = max_events;
427 14x inline_budget_initial_ = budget_init;
428 14x inline_budget_max_ = budget_max;
429 14x unassisted_budget_ = unassisted;
430 14x }
431
432 inline void
433 108488x reactor_scheduler::reset_inline_budget() const noexcept
434 {
435 // When budget is disabled (max==0), all paths below would no-op
436 // (inline_budget stays 0). Skip the TLS lookup entirely.
437 108488x if (inline_budget_max_ == 0)
438 return;
439 108488x if (auto* ctx = reactor_find_context(this))
440 {
441 // Cap when no other thread absorbed queued work
442 108488x if (ctx->unassisted)
443 {
444 108488x ctx->inline_budget_max =
445 108488x static_cast<int>(unassisted_budget_);
446 108488x ctx->inline_budget =
447 108488x static_cast<int>(unassisted_budget_);
448 108488x return;
449 }
450 // Ramp up when previous cycle fully consumed budget.
451 // max(1, ...) ensures the doubling escapes zero.
452 if (ctx->inline_budget == 0)
453 ctx->inline_budget_max = (std::min)(
454 (std::max)(1, ctx->inline_budget_max) * 2,
455 static_cast<int>(inline_budget_max_));
456 else if (ctx->inline_budget < ctx->inline_budget_max)
457 ctx->inline_budget_max =
458 static_cast<int>(inline_budget_initial_);
459 ctx->inline_budget = ctx->inline_budget_max;
460 }
461 }
462
463 inline bool
464 454780x reactor_scheduler::try_consume_inline_budget() const noexcept
465 {
466 454780x if (inline_budget_max_ == 0)
467 return false;
468 454780x if (auto* ctx = reactor_find_context(this))
469 {
470 454780x if (ctx->inline_budget > 0)
471 {
472 363784x --ctx->inline_budget;
473 363784x return true;
474 }
475 }
476 90996x return false;
477 }
478
479 inline void
480 4355x reactor_scheduler::post(std::coroutine_handle<> h) const
481 {
482 struct post_handler final : scheduler_op
483 {
484 std::coroutine_handle<> h_;
485
486 4355x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
487 8710x ~post_handler() override = default;
488
489 4343x void operator()() override
490 {
491 4343x auto saved = h_;
492 4343x delete this;
493 // Ensure stores from the posting thread are visible. TSan
494 // cannot instrument standalone fences; this acquire pairs
495 // with the posting thread's release and is intentional.
496 BOOST_COROSIO_GCC_WARNING_PUSH
497 BOOST_COROSIO_GCC_WARNING_DISABLE("-Wtsan")
498 std::atomic_thread_fence(std::memory_order_acquire);
499 BOOST_COROSIO_GCC_WARNING_POP
500 4343x saved.resume();
501 4343x }
502
503 12x void destroy() override
504 {
505 12x auto saved = h_;
506 12x delete this;
507 12x saved.destroy();
508 12x }
509 };
510
511 4355x auto ph = std::make_unique<post_handler>(h);
512
513 4355x if (auto* ctx = reactor_find_context(this))
514 {
515 24x ++ctx->private_outstanding_work;
516 24x ctx->private_queue.push(ph.release());
517 24x return;
518 }
519
520 4331x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
521
522 4331x lock_type lock(mutex_);
523 4331x completed_ops_.push(ph.release());
524 4331x wake_one_thread_and_unlock(lock);
525 4355x }
526
527 inline void
528 109515x reactor_scheduler::post(scheduler_op* h) const
529 {
530 109515x if (auto* ctx = reactor_find_context(this))
531 {
532 109122x ++ctx->private_outstanding_work;
533 109122x ctx->private_queue.push(h);
534 109122x return;
535 }
536
537 393x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
538
539 393x lock_type lock(mutex_);
540 393x completed_ops_.push(h);
541 393x wake_one_thread_and_unlock(lock);
542 393x }
543
544 inline bool
545 2152x reactor_scheduler::running_in_this_thread() const noexcept
546 {
547 2152x return reactor_find_context(this) != nullptr;
548 }
549
550 inline void
551 892x reactor_scheduler::stop()
552 {
553 892x lock_type lock(mutex_);
554 892x if (!stopped_.load(std::memory_order_acquire))
555 {
556 821x stopped_.store(true, std::memory_order_release);
557 821x signal_all(lock);
558 821x interrupt_reactor();
559 }
560 892x }
561
562 inline bool
563 88x reactor_scheduler::stopped() const noexcept
564 {
565 88x return stopped_.load(std::memory_order_acquire);
566 }
567
568 inline void
569 177x reactor_scheduler::restart()
570 {
571 177x stopped_.store(false, std::memory_order_release);
572 177x }
573
574 inline std::size_t
575 835x reactor_scheduler::run()
576 {
577 1670x if (outstanding_work_.load(std::memory_order_acquire) == 0)
578 {
579 59x stop();
580 59x return 0;
581 }
582
583 776x reactor_thread_context_guard ctx(this);
584 776x lock_type lock(mutex_);
585
586 776x std::size_t n = 0;
587 for (;;)
588 {
589 391141x if (!do_one(lock, -1, &ctx.frame_))
590 776x break;
591 390365x if (n != (std::numeric_limits<std::size_t>::max)())
592 390365x ++n;
593 390365x if (!lock.owns_lock())
594 289902x lock.lock();
595 }
596 776x return n;
597 776x }
598
599 inline std::size_t
600 16x reactor_scheduler::run_one()
601 {
602 32x if (outstanding_work_.load(std::memory_order_acquire) == 0)
603 {
604 stop();
605 return 0;
606 }
607
608 16x reactor_thread_context_guard ctx(this);
609 16x lock_type lock(mutex_);
610 16x return do_one(lock, -1, &ctx.frame_);
611 16x }
612
613 inline std::size_t
614 62x reactor_scheduler::wait_one(long usec)
615 {
616 124x if (outstanding_work_.load(std::memory_order_acquire) == 0)
617 {
618 20x stop();
619 20x return 0;
620 }
621
622 42x reactor_thread_context_guard ctx(this);
623 42x lock_type lock(mutex_);
624 42x return do_one(lock, usec, &ctx.frame_);
625 42x }
626
627 inline std::size_t
628 28x reactor_scheduler::poll()
629 {
630 56x if (outstanding_work_.load(std::memory_order_acquire) == 0)
631 {
632 6x stop();
633 6x return 0;
634 }
635
636 22x reactor_thread_context_guard ctx(this);
637 22x lock_type lock(mutex_);
638
639 22x std::size_t n = 0;
640 for (;;)
641 {
642 64x if (!do_one(lock, 0, &ctx.frame_))
643 22x break;
644 42x if (n != (std::numeric_limits<std::size_t>::max)())
645 42x ++n;
646 42x if (!lock.owns_lock())
647 34x lock.lock();
648 }
649 22x return n;
650 22x }
651
652 inline std::size_t
653 8x reactor_scheduler::poll_one()
654 {
655 16x if (outstanding_work_.load(std::memory_order_acquire) == 0)
656 {
657 4x stop();
658 4x return 0;
659 }
660
661 4x reactor_thread_context_guard ctx(this);
662 4x lock_type lock(mutex_);
663 4x return do_one(lock, 0, &ctx.frame_);
664 4x }
665
666 inline void
667 28681x reactor_scheduler::work_started() noexcept
668 {
669 28681x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
670 28681x }
671
672 inline void
673 42032x reactor_scheduler::work_finished() noexcept
674 {
675 84064x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
676 795x stop();
677 42032x }
678
679 inline void
680 259549x reactor_scheduler::compensating_work_started() const noexcept
681 {
682 259549x auto* ctx = reactor_find_context(this);
683 259549x if (ctx)
684 259549x ++ctx->private_outstanding_work;
685 259549x }
686
687 inline void
688 reactor_scheduler::drain_thread_queue(
689 op_queue& queue, std::int64_t count) const
690 {
691 if (count > 0)
692 outstanding_work_.fetch_add(count, std::memory_order_relaxed);
693
694 lock_type lock(mutex_);
695 completed_ops_.splice(queue);
696 if (count > 0)
697 maybe_unlock_and_signal_one(lock);
698 }
699
700 inline void
701 17068x reactor_scheduler::post_deferred_completions(op_queue& ops) const
702 {
703 17068x if (ops.empty())
704 17068x return;
705
706 if (auto* ctx = reactor_find_context(this))
707 {
708 ctx->private_queue.splice(ops);
709 return;
710 }
711
712 lock_type lock(mutex_);
713 completed_ops_.splice(ops);
714 wake_one_thread_and_unlock(lock);
715 }
716
717 inline void
718 1244x reactor_scheduler::shutdown_drain()
719 {
720 1244x lock_type lock(mutex_);
721
722 2677x while (auto* h = completed_ops_.pop())
723 {
724 1433x if (h == &task_op_)
725 1244x continue;
726 189x lock.unlock();
727 189x h->destroy();
728 189x lock.lock();
729 1433x }
730
731 1244x signal_all(lock);
732 1244x }
733
734 inline void
735 2065x reactor_scheduler::signal_all(lock_type&) const
736 {
737 2065x state_ |= signaled_bit;
738 2065x cond_.notify_all();
739 2065x }
740
741 inline bool
742 4724x reactor_scheduler::maybe_unlock_and_signal_one(
743 lock_type& lock) const
744 {
745 4724x state_ |= signaled_bit;
746 4724x if (state_ > signaled_bit)
747 {
748 lock.unlock();
749 cond_.notify_one();
750 return true;
751 }
752 4724x return false;
753 }
754
755 inline bool
756 443526x reactor_scheduler::unlock_and_signal_one(
757 lock_type& lock) const
758 {
759 443526x state_ |= signaled_bit;
760 443526x bool have_waiters = state_ > signaled_bit;
761 443526x lock.unlock();
762 443526x if (have_waiters)
763 cond_.notify_one();
764 443526x return have_waiters;
765 }
766
767 inline void
768 1x reactor_scheduler::clear_signal() const
769 {
770 1x state_ &= ~signaled_bit;
771 1x }
772
773 inline void
774 1x reactor_scheduler::wait_for_signal(
775 lock_type& lock) const
776 {
777 2x while ((state_ & signaled_bit) == 0)
778 {
779 1x state_ += waiter_increment;
780 1x cond_.wait(lock);
781 1x state_ -= waiter_increment;
782 }
783 1x }
784
785 inline void
786 reactor_scheduler::wait_for_signal_for(
787 lock_type& lock, long timeout_us) const
788 {
789 if ((state_ & signaled_bit) == 0)
790 {
791 state_ += waiter_increment;
792 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
793 state_ -= waiter_increment;
794 }
795 }
796
797 inline void
798 4724x reactor_scheduler::wake_one_thread_and_unlock(
799 lock_type& lock) const
800 {
801 4724x if (maybe_unlock_and_signal_one(lock))
802 return;
803
804 4724x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
805 {
806 105x task_interrupted_ = true;
807 105x lock.unlock();
808 105x interrupt_reactor();
809 }
810 else
811 {
812 4619x lock.unlock();
813 }
814 }
815
816 390451x inline reactor_scheduler::work_cleanup::~work_cleanup()
817 {
818 390451x if (ctx)
819 {
820 390451x std::int64_t produced = ctx->private_outstanding_work;
821 390451x if (produced > 1)
822 14x sched->outstanding_work_.fetch_add(
823 produced - 1, std::memory_order_relaxed);
824 390437x else if (produced < 1)
825 30431x sched->work_finished();
826 390451x ctx->private_outstanding_work = 0;
827
828 390451x if (!ctx->private_queue.empty())
829 {
830 100471x lock->lock();
831 100471x sched->completed_ops_.splice(ctx->private_queue);
832 }
833 }
834 else
835 {
836 sched->work_finished();
837 }
838 390451x }
839
840 587890x inline reactor_scheduler::task_cleanup::~task_cleanup()
841 {
842 293945x if (!ctx)
843 return;
844
845 293945x if (ctx->private_outstanding_work > 0)
846 {
847 8649x sched->outstanding_work_.fetch_add(
848 8649x ctx->private_outstanding_work, std::memory_order_relaxed);
849 8649x ctx->private_outstanding_work = 0;
850 }
851
852 293945x if (!ctx->private_queue.empty())
853 {
854 8649x if (!lock->owns_lock())
855 lock->lock();
856 8649x sched->completed_ops_.splice(ctx->private_queue);
857 }
858 293945x }
859
860 inline std::size_t
861 391267x reactor_scheduler::do_one(
862 lock_type& lock, long timeout_us, context_type* ctx)
863 {
864 for (;;)
865 {
866 685195x if (stopped_.load(std::memory_order_acquire))
867 782x return 0;
868
869 684413x scheduler_op* op = completed_ops_.pop();
870
871 // Handle reactor sentinel — time to poll for I/O
872 684413x if (op == &task_op_)
873 {
874 bool more_handlers =
875 293961x !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
876
877 534847x if (!more_handlers &&
878 481772x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
879 timeout_us == 0))
880 {
881 16x completed_ops_.push(&task_op_);
882 16x return 0;
883 }
884
885 293945x long task_timeout_us = more_handlers ? 0 : timeout_us;
886 293945x task_interrupted_ = task_timeout_us == 0;
887 293945x task_running_.store(true, std::memory_order_release);
888
889 293945x if (more_handlers)
890 53075x unlock_and_signal_one(lock);
891
892 try
893 {
894 293945x run_task(lock, ctx, task_timeout_us);
895 }
896 catch (...)
897 {
898 task_running_.store(false, std::memory_order_relaxed);
899 throw;
900 }
901
902 293945x task_running_.store(false, std::memory_order_relaxed);
903 293945x completed_ops_.push(&task_op_);
904 293945x if (timeout_us > 0)
905 18x return 0;
906 293927x continue;
907 293927x }
908
909 // Handle operation
910 390452x if (op != nullptr)
911 {
912 390451x bool more = !completed_ops_.empty();
913
914 390451x if (more)
915 390451x ctx->unassisted = !unlock_and_signal_one(lock);
916 else
917 {
918 ctx->unassisted = false;
919 lock.unlock();
920 }
921
922 390451x work_cleanup on_exit{this, &lock, ctx};
923 (void)on_exit;
924
925 390451x (*op)();
926 390451x return 1;
927 390451x }
928
929 // Try private queue before blocking
930 1x if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
931 continue;
932
933 2x if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
934 timeout_us == 0)
935 return 0;
936
937 1x clear_signal();
938 1x if (timeout_us < 0)
939 1x wait_for_signal(lock);
940 else
941 wait_for_signal_for(lock, timeout_us);
942 293928x }
943 }
944
945 } // namespace boost::corosio::detail
946
947 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
948