TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/capy/ex/execution_context.hpp>
15 :
16 : #include <boost/corosio/detail/scheduler.hpp>
17 : #include <boost/corosio/detail/scheduler_op.hpp>
18 : #include <boost/corosio/detail/thread_local_ptr.hpp>
19 :
20 : #include <atomic>
21 : #include <chrono>
22 : #include <coroutine>
23 : #include <cstddef>
24 : #include <cstdint>
25 : #include <limits>
26 : #include <memory>
27 : #include <stdexcept>
28 :
29 : #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
30 : #include <boost/corosio/detail/conditionally_enabled_event.hpp>
31 :
32 : namespace boost::corosio::detail {
33 :
34 : // Forward declarations
35 : class reactor_scheduler;
36 : class timer_service;
37 :
38 : /** Per-thread state for a reactor scheduler.
39 :
40 : Each thread running a scheduler's event loop has one of these
41 : on a thread-local stack. It holds a private work queue and
42 : inline completion budget for speculative I/O fast paths.
43 : */
44 : struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
45 : {
46 : /// Scheduler this context belongs to.
47 : reactor_scheduler const* key;
48 :
49 : /// Next context frame on this thread's stack.
50 : reactor_scheduler_context* next;
51 :
52 : /// Private work queue for reduced contention.
53 : op_queue private_queue;
54 :
55 : /// Unflushed work count for the private queue.
56 : std::int64_t private_outstanding_work;
57 :
58 : /// Remaining inline completions allowed this cycle.
59 : int inline_budget;
60 :
61 : /// Maximum inline budget (adaptive, 2-16).
62 : int inline_budget_max;
63 :
64 : /// True if no other thread absorbed queued work last cycle.
65 : bool unassisted;
66 :
67 : /// Construct a context frame linked to @a n.
68 : reactor_scheduler_context(
69 : reactor_scheduler const* k,
70 : reactor_scheduler_context* n);
71 : };
72 :
73 : /// Thread-local context stack for reactor schedulers.
74 : inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
75 :
76 : /// Find the context frame for a scheduler on this thread.
77 : inline reactor_scheduler_context*
78 HIT 938839 : reactor_find_context(reactor_scheduler const* self) noexcept
79 : {
80 938839 : for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
81 : {
82 932640 : if (c->key == self)
83 932640 : return c;
84 : }
85 6199 : return nullptr;
86 : }
87 :
88 : /// Flush private work count to global counter.
89 : inline void
90 MIS 0 : reactor_flush_private_work(
91 : reactor_scheduler_context* ctx,
92 : std::atomic<std::int64_t>& outstanding_work) noexcept
93 : {
94 0 : if (ctx && ctx->private_outstanding_work > 0)
95 : {
96 0 : outstanding_work.fetch_add(
97 : ctx->private_outstanding_work, std::memory_order_relaxed);
98 0 : ctx->private_outstanding_work = 0;
99 : }
100 0 : }
101 :
102 : /** Drain private queue to global queue, flushing work count first.
103 :
104 : @return True if any ops were drained.
105 : */
106 : inline bool
107 HIT 1 : reactor_drain_private_queue(
108 : reactor_scheduler_context* ctx,
109 : std::atomic<std::int64_t>& outstanding_work,
110 : op_queue& completed_ops) noexcept
111 : {
112 1 : if (!ctx || ctx->private_queue.empty())
113 1 : return false;
114 :
115 MIS 0 : reactor_flush_private_work(ctx, outstanding_work);
116 0 : completed_ops.splice(ctx->private_queue);
117 0 : return true;
118 : }
119 :
120 : /** Non-template base for reactor-backed scheduler implementations.
121 :
122 : Provides the complete threading model shared by epoll, kqueue,
123 : and select schedulers: signal state machine, inline completion
124 : budget, work counting, run/poll methods, and the do_one event
125 : loop.
126 :
127 : Derived classes provide platform-specific hooks by overriding:
128 : - `run_task(lock, ctx)` to run the reactor poll
129 : - `interrupt_reactor()` to wake a blocked reactor
130 :
131 : De-templated from the original CRTP design to eliminate
132 : duplicate instantiations when multiple backends are compiled
133 : into the same binary. Virtual dispatch for run_task (called
134 : once per reactor cycle, before a blocking syscall) has
135 : negligible overhead.
136 :
137 : @par Thread Safety
138 : All public member functions are thread-safe.
139 : */
140 : class reactor_scheduler
141 : : public scheduler
142 : , public capy::execution_context::service
143 : {
144 : public:
145 : using key_type = scheduler;
146 : using context_type = reactor_scheduler_context;
147 : using mutex_type = conditionally_enabled_mutex;
148 : using lock_type = mutex_type::scoped_lock;
149 : using event_type = conditionally_enabled_event;
150 :
151 : /// Post a coroutine for deferred execution.
152 : void post(std::coroutine_handle<> h) const override;
153 :
154 : /// Post a scheduler operation for deferred execution.
155 : void post(scheduler_op* h) const override;
156 :
157 : /// Return true if called from a thread running this scheduler.
158 : bool running_in_this_thread() const noexcept override;
159 :
160 : /// Request the scheduler to stop dispatching handlers.
161 : void stop() override;
162 :
163 : /// Return true if the scheduler has been stopped.
164 : bool stopped() const noexcept override;
165 :
166 : /// Reset the stopped state so `run()` can resume.
167 : void restart() override;
168 :
169 : /// Run the event loop until no work remains.
170 : std::size_t run() override;
171 :
172 : /// Run until one handler completes or no work remains.
173 : std::size_t run_one() override;
174 :
175 : /// Run until one handler completes or @a usec elapses.
176 : std::size_t wait_one(long usec) override;
177 :
178 : /// Run ready handlers without blocking.
179 : std::size_t poll() override;
180 :
181 : /// Run at most one ready handler without blocking.
182 : std::size_t poll_one() override;
183 :
184 : /// Increment the outstanding work count.
185 : void work_started() noexcept override;
186 :
187 : /// Decrement the outstanding work count, stopping on zero.
188 : void work_finished() noexcept override;
189 :
190 : /** Reset the thread's inline completion budget.
191 :
192 : Called at the start of each posted completion handler to
193 : grant a fresh budget for speculative inline completions.
194 : */
195 : void reset_inline_budget() const noexcept;
196 :
197 : /** Consume one unit of inline budget if available.
198 :
199 : @return True if budget was available and consumed.
200 : */
201 : bool try_consume_inline_budget() const noexcept;
202 :
203 : /** Offset a forthcoming work_finished from work_cleanup.
204 :
205 : Called by descriptor_state when all I/O returned EAGAIN and
206 : no handler will be executed. Must be called from a scheduler
207 : thread.
208 : */
209 : void compensating_work_started() const noexcept;
210 :
211 : /** Drain work from thread context's private queue to global queue.
212 :
213 : Flushes private work count to the global counter, then
214 : transfers the queue under mutex protection.
215 :
216 : @param queue The private queue to drain.
217 : @param count Private work count to flush before draining.
218 : */
219 : void drain_thread_queue(op_queue& queue, std::int64_t count) const;
220 :
221 : /** Post completed operations for deferred invocation.
222 :
223 : If called from a thread running this scheduler, operations
224 : go to the thread's private queue (fast path). Otherwise,
225 : operations are added to the global queue under mutex and a
226 : waiter is signaled.
227 :
228 : @par Preconditions
229 : work_started() must have been called for each operation.
230 :
231 : @param ops Queue of operations to post.
232 : */
233 : void post_deferred_completions(op_queue& ops) const;
234 :
235 : /** Apply runtime configuration to the scheduler.
236 :
237 : Called by `io_context` after construction. Values that do
238 : not apply to this backend are silently ignored.
239 :
240 : @param max_events Event buffer size for epoll/kqueue.
241 : @param budget_init Starting inline completion budget.
242 : @param budget_max Hard ceiling on adaptive budget ramp-up.
243 : @param unassisted Budget when single-threaded.
244 : */
245 : virtual void configure_reactor(
246 : unsigned max_events,
247 : unsigned budget_init,
248 : unsigned budget_max,
249 : unsigned unassisted);
250 :
251 : /// Return the configured initial inline budget.
252 HIT 860 : unsigned inline_budget_initial() const noexcept
253 : {
254 860 : return inline_budget_initial_;
255 : }
256 :
257 : /// Return true if single-threaded (lockless) mode is active.
258 159 : bool is_single_threaded() const noexcept override
259 : {
260 159 : return single_threaded_;
261 : }
262 :
263 : /** Enable or disable single-threaded (lockless) mode.
264 :
265 : When enabled, all scheduler mutex and condition variable
266 : operations become no-ops. Cross-thread post() is
267 : undefined behavior.
268 : */
269 10 : void configure_single_threaded(bool v) noexcept override
270 : {
271 10 : single_threaded_ = v;
272 10 : mutex_.set_enabled(!v);
273 10 : cond_.set_enabled(!v);
274 10 : }
275 :
276 : protected:
277 : timer_service* timer_svc_ = nullptr;
278 : bool single_threaded_ = false;
279 :
280 1244 : reactor_scheduler() = default;
281 :
282 : /** Drain completed_ops during shutdown.
283 :
284 : Pops all operations from the global queue and destroys them,
285 : skipping the task sentinel. Signals all waiting threads.
286 : Derived classes call this from their shutdown() override
287 : before performing platform-specific cleanup.
288 : */
289 : void shutdown_drain();
290 :
291 : /// RAII guard that re-inserts the task sentinel after `run_task`.
292 : struct task_cleanup
293 : {
294 : reactor_scheduler const* sched;
295 : lock_type* lock;
296 : context_type* ctx;
297 : ~task_cleanup();
298 : };
299 :
300 : mutable mutex_type mutex_{true};
301 : mutable event_type cond_{true};
302 : mutable op_queue completed_ops_;
303 : mutable std::atomic<std::int64_t> outstanding_work_{0};
304 : std::atomic<bool> stopped_{false};
305 : mutable std::atomic<bool> task_running_{false};
306 : mutable bool task_interrupted_ = false;
307 :
308 : // Runtime-configurable reactor tuning parameters.
309 : // Defaults match the library's built-in values.
310 : unsigned max_events_per_poll_ = 128;
311 : unsigned inline_budget_initial_ = 2;
312 : unsigned inline_budget_max_ = 16;
313 : unsigned unassisted_budget_ = 4;
314 :
315 : /// Bit 0 of `state_`: set when the condvar should be signaled.
316 : static constexpr std::size_t signaled_bit = 1;
317 :
318 : /// Increment per waiting thread in `state_`.
319 : static constexpr std::size_t waiter_increment = 2;
320 : mutable std::size_t state_ = 0;
321 :
322 : /// Sentinel op that triggers a reactor poll when dequeued.
323 : struct task_op final : scheduler_op
324 : {
325 MIS 0 : void operator()() override {}
326 0 : void destroy() override {}
327 : };
328 : task_op task_op_;
329 :
330 : /// Run the platform-specific reactor poll.
331 : virtual void
332 : run_task(lock_type& lock, context_type* ctx,
333 : long timeout_us) = 0;
334 :
335 : /// Wake a blocked reactor (e.g. write to eventfd or pipe).
336 : virtual void interrupt_reactor() const = 0;
337 :
338 : private:
339 : struct work_cleanup
340 : {
341 : reactor_scheduler* sched;
342 : lock_type* lock;
343 : context_type* ctx;
344 : ~work_cleanup();
345 : };
346 :
347 : std::size_t do_one(
348 : lock_type& lock, long timeout_us, context_type* ctx);
349 :
350 : void signal_all(lock_type& lock) const;
351 : bool maybe_unlock_and_signal_one(lock_type& lock) const;
352 : bool unlock_and_signal_one(lock_type& lock) const;
353 : void clear_signal() const;
354 : void wait_for_signal(lock_type& lock) const;
355 : void wait_for_signal_for(
356 : lock_type& lock, long timeout_us) const;
357 : void wake_one_thread_and_unlock(lock_type& lock) const;
358 : };
359 :
360 : /** RAII guard that pushes/pops a scheduler context frame.
361 :
362 : On construction, pushes a new context frame onto the
363 : thread-local stack. On destruction, drains any remaining
364 : private queue items to the global queue and pops the frame.
365 : */
366 : struct reactor_thread_context_guard
367 : {
368 : /// The context frame managed by this guard.
369 : reactor_scheduler_context frame_;
370 :
371 : /// Construct the guard, pushing a frame for @a sched.
372 HIT 860 : explicit reactor_thread_context_guard(
373 : reactor_scheduler const* sched) noexcept
374 860 : : frame_(sched, reactor_context_stack.get())
375 : {
376 860 : reactor_context_stack.set(&frame_);
377 860 : }
378 :
379 : /// Destroy the guard, draining private work and popping the frame.
380 860 : ~reactor_thread_context_guard() noexcept
381 : {
382 860 : if (!frame_.private_queue.empty())
383 MIS 0 : frame_.key->drain_thread_queue(
384 0 : frame_.private_queue, frame_.private_outstanding_work);
385 HIT 860 : reactor_context_stack.set(frame_.next);
386 860 : }
387 : };
388 :
389 : // ---- Inline implementations ------------------------------------------------
390 :
391 : inline
392 860 : reactor_scheduler_context::reactor_scheduler_context(
393 : reactor_scheduler const* k,
394 860 : reactor_scheduler_context* n)
395 860 : : key(k)
396 860 : , next(n)
397 860 : , private_outstanding_work(0)
398 860 : , inline_budget(0)
399 860 : , inline_budget_max(
400 860 : static_cast<int>(k->inline_budget_initial()))
401 860 : , unassisted(false)
402 : {
403 860 : }
404 :
405 : inline void
406 16 : reactor_scheduler::configure_reactor(
407 : unsigned max_events,
408 : unsigned budget_init,
409 : unsigned budget_max,
410 : unsigned unassisted)
411 : {
412 30 : if (max_events < 1 ||
413 14 : max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
414 : throw std::out_of_range(
415 2 : "max_events_per_poll must be in [1, INT_MAX]");
416 14 : if (budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
417 : throw std::out_of_range(
418 MIS 0 : "inline_budget_max must be in [0, INT_MAX]");
419 :
420 : // Clamp initial and unassisted to budget_max.
421 HIT 14 : if (budget_init > budget_max)
422 2 : budget_init = budget_max;
423 14 : if (unassisted > budget_max)
424 2 : unassisted = budget_max;
425 :
426 14 : max_events_per_poll_ = max_events;
427 14 : inline_budget_initial_ = budget_init;
428 14 : inline_budget_max_ = budget_max;
429 14 : unassisted_budget_ = unassisted;
430 14 : }
431 :
432 : inline void
433 108488 : reactor_scheduler::reset_inline_budget() const noexcept
434 : {
435 : // When budget is disabled (max==0), all paths below would no-op
436 : // (inline_budget stays 0). Skip the TLS lookup entirely.
437 108488 : if (inline_budget_max_ == 0)
438 MIS 0 : return;
439 HIT 108488 : if (auto* ctx = reactor_find_context(this))
440 : {
441 : // Cap when no other thread absorbed queued work
442 108488 : if (ctx->unassisted)
443 : {
444 108488 : ctx->inline_budget_max =
445 108488 : static_cast<int>(unassisted_budget_);
446 108488 : ctx->inline_budget =
447 108488 : static_cast<int>(unassisted_budget_);
448 108488 : return;
449 : }
450 : // Ramp up when previous cycle fully consumed budget.
451 : // max(1, ...) ensures the doubling escapes zero.
452 MIS 0 : if (ctx->inline_budget == 0)
453 0 : ctx->inline_budget_max = (std::min)(
454 0 : (std::max)(1, ctx->inline_budget_max) * 2,
455 0 : static_cast<int>(inline_budget_max_));
456 0 : else if (ctx->inline_budget < ctx->inline_budget_max)
457 0 : ctx->inline_budget_max =
458 0 : static_cast<int>(inline_budget_initial_);
459 0 : ctx->inline_budget = ctx->inline_budget_max;
460 : }
461 : }
462 :
463 : inline bool
464 HIT 454780 : reactor_scheduler::try_consume_inline_budget() const noexcept
465 : {
466 454780 : if (inline_budget_max_ == 0)
467 MIS 0 : return false;
468 HIT 454780 : if (auto* ctx = reactor_find_context(this))
469 : {
470 454780 : if (ctx->inline_budget > 0)
471 : {
472 363784 : --ctx->inline_budget;
473 363784 : return true;
474 : }
475 : }
476 90996 : return false;
477 : }
478 :
479 : inline void
480 4355 : reactor_scheduler::post(std::coroutine_handle<> h) const
481 : {
482 : struct post_handler final : scheduler_op
483 : {
484 : std::coroutine_handle<> h_;
485 :
486 4355 : explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
487 8710 : ~post_handler() override = default;
488 :
489 4343 : void operator()() override
490 : {
491 4343 : auto saved = h_;
492 4343 : delete this;
493 : // Ensure stores from the posting thread are visible. TSan
494 : // cannot instrument standalone fences; this acquire pairs
495 : // with the posting thread's release and is intentional.
496 : BOOST_COROSIO_GCC_WARNING_PUSH
497 : BOOST_COROSIO_GCC_WARNING_DISABLE("-Wtsan")
498 : std::atomic_thread_fence(std::memory_order_acquire);
499 : BOOST_COROSIO_GCC_WARNING_POP
500 4343 : saved.resume();
501 4343 : }
502 :
503 12 : void destroy() override
504 : {
505 12 : auto saved = h_;
506 12 : delete this;
507 12 : saved.destroy();
508 12 : }
509 : };
510 :
511 4355 : auto ph = std::make_unique<post_handler>(h);
512 :
513 4355 : if (auto* ctx = reactor_find_context(this))
514 : {
515 24 : ++ctx->private_outstanding_work;
516 24 : ctx->private_queue.push(ph.release());
517 24 : return;
518 : }
519 :
520 4331 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
521 :
522 4331 : lock_type lock(mutex_);
523 4331 : completed_ops_.push(ph.release());
524 4331 : wake_one_thread_and_unlock(lock);
525 4355 : }
526 :
527 : inline void
528 109515 : reactor_scheduler::post(scheduler_op* h) const
529 : {
530 109515 : if (auto* ctx = reactor_find_context(this))
531 : {
532 109122 : ++ctx->private_outstanding_work;
533 109122 : ctx->private_queue.push(h);
534 109122 : return;
535 : }
536 :
537 393 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
538 :
539 393 : lock_type lock(mutex_);
540 393 : completed_ops_.push(h);
541 393 : wake_one_thread_and_unlock(lock);
542 393 : }
543 :
544 : inline bool
545 2152 : reactor_scheduler::running_in_this_thread() const noexcept
546 : {
547 2152 : return reactor_find_context(this) != nullptr;
548 : }
549 :
550 : inline void
551 892 : reactor_scheduler::stop()
552 : {
553 892 : lock_type lock(mutex_);
554 892 : if (!stopped_.load(std::memory_order_acquire))
555 : {
556 821 : stopped_.store(true, std::memory_order_release);
557 821 : signal_all(lock);
558 821 : interrupt_reactor();
559 : }
560 892 : }
561 :
562 : inline bool
563 88 : reactor_scheduler::stopped() const noexcept
564 : {
565 88 : return stopped_.load(std::memory_order_acquire);
566 : }
567 :
568 : inline void
569 177 : reactor_scheduler::restart()
570 : {
571 177 : stopped_.store(false, std::memory_order_release);
572 177 : }
573 :
574 : inline std::size_t
575 835 : reactor_scheduler::run()
576 : {
577 1670 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
578 : {
579 59 : stop();
580 59 : return 0;
581 : }
582 :
583 776 : reactor_thread_context_guard ctx(this);
584 776 : lock_type lock(mutex_);
585 :
586 776 : std::size_t n = 0;
587 : for (;;)
588 : {
589 391141 : if (!do_one(lock, -1, &ctx.frame_))
590 776 : break;
591 390365 : if (n != (std::numeric_limits<std::size_t>::max)())
592 390365 : ++n;
593 390365 : if (!lock.owns_lock())
594 289902 : lock.lock();
595 : }
596 776 : return n;
597 776 : }
598 :
599 : inline std::size_t
600 16 : reactor_scheduler::run_one()
601 : {
602 32 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
603 : {
604 MIS 0 : stop();
605 0 : return 0;
606 : }
607 :
608 HIT 16 : reactor_thread_context_guard ctx(this);
609 16 : lock_type lock(mutex_);
610 16 : return do_one(lock, -1, &ctx.frame_);
611 16 : }
612 :
613 : inline std::size_t
614 62 : reactor_scheduler::wait_one(long usec)
615 : {
616 124 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
617 : {
618 20 : stop();
619 20 : return 0;
620 : }
621 :
622 42 : reactor_thread_context_guard ctx(this);
623 42 : lock_type lock(mutex_);
624 42 : return do_one(lock, usec, &ctx.frame_);
625 42 : }
626 :
627 : inline std::size_t
628 28 : reactor_scheduler::poll()
629 : {
630 56 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
631 : {
632 6 : stop();
633 6 : return 0;
634 : }
635 :
636 22 : reactor_thread_context_guard ctx(this);
637 22 : lock_type lock(mutex_);
638 :
639 22 : std::size_t n = 0;
640 : for (;;)
641 : {
642 64 : if (!do_one(lock, 0, &ctx.frame_))
643 22 : break;
644 42 : if (n != (std::numeric_limits<std::size_t>::max)())
645 42 : ++n;
646 42 : if (!lock.owns_lock())
647 34 : lock.lock();
648 : }
649 22 : return n;
650 22 : }
651 :
652 : inline std::size_t
653 8 : reactor_scheduler::poll_one()
654 : {
655 16 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
656 : {
657 4 : stop();
658 4 : return 0;
659 : }
660 :
661 4 : reactor_thread_context_guard ctx(this);
662 4 : lock_type lock(mutex_);
663 4 : return do_one(lock, 0, &ctx.frame_);
664 4 : }
665 :
666 : inline void
667 28681 : reactor_scheduler::work_started() noexcept
668 : {
669 28681 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
670 28681 : }
671 :
672 : inline void
673 42032 : reactor_scheduler::work_finished() noexcept
674 : {
675 84064 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
676 795 : stop();
677 42032 : }
678 :
679 : inline void
680 259549 : reactor_scheduler::compensating_work_started() const noexcept
681 : {
682 259549 : auto* ctx = reactor_find_context(this);
683 259549 : if (ctx)
684 259549 : ++ctx->private_outstanding_work;
685 259549 : }
686 :
687 : inline void
688 MIS 0 : reactor_scheduler::drain_thread_queue(
689 : op_queue& queue, std::int64_t count) const
690 : {
691 0 : if (count > 0)
692 0 : outstanding_work_.fetch_add(count, std::memory_order_relaxed);
693 :
694 0 : lock_type lock(mutex_);
695 0 : completed_ops_.splice(queue);
696 0 : if (count > 0)
697 0 : maybe_unlock_and_signal_one(lock);
698 0 : }
699 :
700 : inline void
701 HIT 17068 : reactor_scheduler::post_deferred_completions(op_queue& ops) const
702 : {
703 17068 : if (ops.empty())
704 17068 : return;
705 :
706 MIS 0 : if (auto* ctx = reactor_find_context(this))
707 : {
708 0 : ctx->private_queue.splice(ops);
709 0 : return;
710 : }
711 :
712 0 : lock_type lock(mutex_);
713 0 : completed_ops_.splice(ops);
714 0 : wake_one_thread_and_unlock(lock);
715 0 : }
716 :
717 : inline void
718 HIT 1244 : reactor_scheduler::shutdown_drain()
719 : {
720 1244 : lock_type lock(mutex_);
721 :
722 2677 : while (auto* h = completed_ops_.pop())
723 : {
724 1433 : if (h == &task_op_)
725 1244 : continue;
726 189 : lock.unlock();
727 189 : h->destroy();
728 189 : lock.lock();
729 1433 : }
730 :
731 1244 : signal_all(lock);
732 1244 : }
733 :
734 : inline void
735 2065 : reactor_scheduler::signal_all(lock_type&) const
736 : {
737 2065 : state_ |= signaled_bit;
738 2065 : cond_.notify_all();
739 2065 : }
740 :
741 : inline bool
742 4724 : reactor_scheduler::maybe_unlock_and_signal_one(
743 : lock_type& lock) const
744 : {
745 4724 : state_ |= signaled_bit;
746 4724 : if (state_ > signaled_bit)
747 : {
748 MIS 0 : lock.unlock();
749 0 : cond_.notify_one();
750 0 : return true;
751 : }
752 HIT 4724 : return false;
753 : }
754 :
755 : inline bool
756 443526 : reactor_scheduler::unlock_and_signal_one(
757 : lock_type& lock) const
758 : {
759 443526 : state_ |= signaled_bit;
760 443526 : bool have_waiters = state_ > signaled_bit;
761 443526 : lock.unlock();
762 443526 : if (have_waiters)
763 MIS 0 : cond_.notify_one();
764 HIT 443526 : return have_waiters;
765 : }
766 :
767 : inline void
768 1 : reactor_scheduler::clear_signal() const
769 : {
770 1 : state_ &= ~signaled_bit;
771 1 : }
772 :
773 : inline void
774 1 : reactor_scheduler::wait_for_signal(
775 : lock_type& lock) const
776 : {
777 2 : while ((state_ & signaled_bit) == 0)
778 : {
779 1 : state_ += waiter_increment;
780 1 : cond_.wait(lock);
781 1 : state_ -= waiter_increment;
782 : }
783 1 : }
784 :
785 : inline void
786 MIS 0 : reactor_scheduler::wait_for_signal_for(
787 : lock_type& lock, long timeout_us) const
788 : {
789 0 : if ((state_ & signaled_bit) == 0)
790 : {
791 0 : state_ += waiter_increment;
792 0 : cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
793 0 : state_ -= waiter_increment;
794 : }
795 0 : }
796 :
797 : inline void
798 HIT 4724 : reactor_scheduler::wake_one_thread_and_unlock(
799 : lock_type& lock) const
800 : {
801 4724 : if (maybe_unlock_and_signal_one(lock))
802 MIS 0 : return;
803 :
804 HIT 4724 : if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
805 : {
806 105 : task_interrupted_ = true;
807 105 : lock.unlock();
808 105 : interrupt_reactor();
809 : }
810 : else
811 : {
812 4619 : lock.unlock();
813 : }
814 : }
815 :
816 390451 : inline reactor_scheduler::work_cleanup::~work_cleanup()
817 : {
818 390451 : if (ctx)
819 : {
820 390451 : std::int64_t produced = ctx->private_outstanding_work;
821 390451 : if (produced > 1)
822 14 : sched->outstanding_work_.fetch_add(
823 : produced - 1, std::memory_order_relaxed);
824 390437 : else if (produced < 1)
825 30431 : sched->work_finished();
826 390451 : ctx->private_outstanding_work = 0;
827 :
828 390451 : if (!ctx->private_queue.empty())
829 : {
830 100471 : lock->lock();
831 100471 : sched->completed_ops_.splice(ctx->private_queue);
832 : }
833 : }
834 : else
835 : {
836 MIS 0 : sched->work_finished();
837 : }
838 HIT 390451 : }
839 :
840 587890 : inline reactor_scheduler::task_cleanup::~task_cleanup()
841 : {
842 293945 : if (!ctx)
843 MIS 0 : return;
844 :
845 HIT 293945 : if (ctx->private_outstanding_work > 0)
846 : {
847 8649 : sched->outstanding_work_.fetch_add(
848 8649 : ctx->private_outstanding_work, std::memory_order_relaxed);
849 8649 : ctx->private_outstanding_work = 0;
850 : }
851 :
852 293945 : if (!ctx->private_queue.empty())
853 : {
854 8649 : if (!lock->owns_lock())
855 MIS 0 : lock->lock();
856 HIT 8649 : sched->completed_ops_.splice(ctx->private_queue);
857 : }
858 293945 : }
859 :
860 : inline std::size_t
861 391267 : reactor_scheduler::do_one(
862 : lock_type& lock, long timeout_us, context_type* ctx)
863 : {
864 : for (;;)
865 : {
866 685195 : if (stopped_.load(std::memory_order_acquire))
867 782 : return 0;
868 :
869 684413 : scheduler_op* op = completed_ops_.pop();
870 :
871 : // Handle reactor sentinel — time to poll for I/O
872 684413 : if (op == &task_op_)
873 : {
874 : bool more_handlers =
875 293961 : !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
876 :
877 534847 : if (!more_handlers &&
878 481772 : (outstanding_work_.load(std::memory_order_acquire) == 0 ||
879 : timeout_us == 0))
880 : {
881 16 : completed_ops_.push(&task_op_);
882 16 : return 0;
883 : }
884 :
885 293945 : long task_timeout_us = more_handlers ? 0 : timeout_us;
886 293945 : task_interrupted_ = task_timeout_us == 0;
887 293945 : task_running_.store(true, std::memory_order_release);
888 :
889 293945 : if (more_handlers)
890 53075 : unlock_and_signal_one(lock);
891 :
892 : try
893 : {
894 293945 : run_task(lock, ctx, task_timeout_us);
895 : }
896 MIS 0 : catch (...)
897 : {
898 0 : task_running_.store(false, std::memory_order_relaxed);
899 0 : throw;
900 0 : }
901 :
902 HIT 293945 : task_running_.store(false, std::memory_order_relaxed);
903 293945 : completed_ops_.push(&task_op_);
904 293945 : if (timeout_us > 0)
905 18 : return 0;
906 293927 : continue;
907 293927 : }
908 :
909 : // Handle operation
910 390452 : if (op != nullptr)
911 : {
912 390451 : bool more = !completed_ops_.empty();
913 :
914 390451 : if (more)
915 390451 : ctx->unassisted = !unlock_and_signal_one(lock);
916 : else
917 : {
918 MIS 0 : ctx->unassisted = false;
919 0 : lock.unlock();
920 : }
921 :
922 HIT 390451 : work_cleanup on_exit{this, &lock, ctx};
923 : (void)on_exit;
924 :
925 390451 : (*op)();
926 390451 : return 1;
927 390451 : }
928 :
929 : // Try private queue before blocking
930 1 : if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
931 MIS 0 : continue;
932 :
933 HIT 2 : if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
934 : timeout_us == 0)
935 MIS 0 : return 0;
936 :
937 HIT 1 : clear_signal();
938 1 : if (timeout_us < 0)
939 1 : wait_for_signal(lock);
940 : else
941 MIS 0 : wait_for_signal_for(lock, timeout_us);
942 HIT 293928 : }
943 : }
944 :
945 : } // namespace boost::corosio::detail
946 :
947 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
|