TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Steve Gerbino
4 : // Copyright (c) 2026 Michael Vandeberg
5 : //
6 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
7 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8 : //
9 : // Official repository: https://github.com/cppalliance/corosio
10 : //
11 :
12 : #ifndef BOOST_COROSIO_IO_CONTEXT_HPP
13 : #define BOOST_COROSIO_IO_CONTEXT_HPP
14 :
15 : #include <boost/corosio/detail/config.hpp>
16 : #include <boost/corosio/detail/continuation_op.hpp>
17 : #include <boost/corosio/detail/platform.hpp>
18 : #include <boost/corosio/detail/scheduler.hpp>
19 : #include <boost/capy/continuation.hpp>
20 : #include <boost/capy/ex/execution_context.hpp>
21 :
22 : #include <chrono>
23 : #include <coroutine>
24 : #include <cstddef>
25 : #include <limits>
26 : #include <thread>
27 :
28 : namespace boost::corosio {
29 :
30 : /** Runtime tuning options for @ref io_context.
31 :
32 : All fields have defaults that match the library's built-in
33 : values, so constructing a default `io_context_options` produces
34 : identical behavior to an unconfigured context.
35 :
36 : Options that apply only to a specific backend family are
37 : silently ignored when the active backend does not support them.
38 :
39 : @par Example
40 : @code
41 : io_context_options opts;
42 : opts.max_events_per_poll = 256; // larger batch per syscall
43 : opts.inline_budget_max = 32; // more speculative completions
44 : opts.thread_pool_size = 4; // more file-I/O workers
45 :
46 : io_context ioc(opts);
47 : @endcode
48 :
49 : @see io_context, native_io_context
50 : */
51 : struct io_context_options
52 : {
53 : /** Maximum events fetched per reactor poll call.
54 :
55 : Controls the buffer size passed to `epoll_wait()` or
56 : `kevent()`. Larger values reduce syscall frequency under
57 : high load; smaller values improve fairness between
58 : connections. Ignored on IOCP and select backends.
59 : */
60 : unsigned max_events_per_poll = 128;
61 :
62 : /** Starting inline completion budget per handler chain.
63 :
64 : After a posted handler executes, the reactor grants this
65 : many speculative inline completions before forcing a
66 : re-queue. Applies to reactor backends only.
67 :
68 : @note Constructing an `io_context` with `concurrency_hint > 1`
69 : and all three budget fields at their defaults overrides
70 : them to disable inline completion (post-everything mode),
71 : since multi-thread workloads benefit from cross-thread
72 : work-stealing. Setting any budget field to a non-default
73 : value disables the override.
74 : */
75 : unsigned inline_budget_initial = 2;
76 :
77 : /** Hard ceiling on adaptive inline budget ramp-up.
78 :
79 : The budget doubles each cycle it is fully consumed, up to
80 : this limit. Applies to reactor backends only.
81 : */
82 : unsigned inline_budget_max = 16;
83 :
84 : /** Inline budget when no other thread assists the reactor.
85 :
86 : When only one thread is running the event loop, this
87 : value caps the inline budget to preserve fairness.
88 : Applies to reactor backends only.
89 : */
90 : unsigned unassisted_budget = 4;
91 :
92 : /** Thread pool size for blocking I/O (file I/O, DNS resolution).
93 :
94 : Sets the number of worker threads in the shared thread pool
95 : used by POSIX file services and DNS resolution. Must be at
96 : least 1. Applies to POSIX backends only; ignored on IOCP
97 : where file I/O uses native overlapped I/O.
98 : */
99 : unsigned thread_pool_size = 1;
100 :
101 : /** Enable single-threaded mode (disable scheduler locking).
102 :
103 : When true, the scheduler skips all mutex lock/unlock and
104 : condition variable operations on the hot path. This
105 : eliminates synchronization overhead when only one thread
106 : calls `run()`.
107 :
108 : @par Restrictions
109 : - Only one thread may call `run()` (or any run variant).
110 : - Posting work from another thread is undefined behavior.
111 : - DNS resolution returns `operation_not_supported`.
112 : - POSIX file I/O returns `operation_not_supported`.
113 : - Signal sets should not be shared across contexts.
114 :
115 : @note Constructing an `io_context` with `concurrency_hint == 1`
116 : automatically enables single-threaded mode regardless of
117 : this field's value, matching asio's convention. To opt out,
118 : pass `concurrency_hint > 1`.
119 : */
120 : bool single_threaded = false;
121 :
122 : /** Enable IORING_SETUP_SQPOLL on the io_uring backend.
123 :
124 : With SQPOLL, the kernel forks a thread that busy-polls the
125 : submission ring; submission becomes a userspace-only memory
126 : store, eliminating the io_uring_enter syscall on the submit
127 : path. Most useful for sustained traffic. Idle thread parks
128 : after `sq_thread_idle_ms` of no activity.
129 :
130 : Independent of `single_threaded`. Default: off.
131 :
132 : Ignored on non-io_uring backends.
133 : */
134 : bool enable_sqpoll = false;
135 :
136 : /** SQ-poll idle timeout in milliseconds.
137 :
138 : After this many ms of no submissions, the kernel polling
139 : thread sleeps; next submit re-wakes it via SQ_WAKEUP. 0
140 : means use the kernel default (1ms). Recommended for bursty
141 : workloads: 100-1000ms (avoids park/unpark thrash).
142 :
143 : Ignored unless `enable_sqpoll` is true. Ignored on
144 : non-io_uring backends.
145 : */
146 : unsigned sq_thread_idle_ms = 0;
147 :
148 : /** Pin the SQ-poll kernel thread to this CPU.
149 :
150 : -1 means do not pin (kernel scheduler picks). Pinning off
151 : the dispatch core is recommended on latency-sensitive
152 : deployments to avoid cache contention.
153 :
154 : Ignored unless `enable_sqpoll` is true. Ignored on
155 : non-io_uring backends.
156 : */
157 : int sq_thread_cpu = -1;
158 : };
159 :
160 : namespace detail {
161 : class timer_service;
162 : } // namespace detail
163 :
164 : /** An I/O context for running asynchronous operations.
165 :
166 : The io_context provides an execution environment for async
167 : operations. It maintains a queue of pending work items and
168 : processes them when `run()` is called.
169 :
170 : The default and unsigned constructors select the platform's
171 : native backend:
172 : - Windows: IOCP
173 : - Linux: epoll
174 : - BSD/macOS: kqueue
175 : - Other POSIX: select
176 :
177 : The template constructor accepts a backend tag value to
178 : choose a specific backend at compile time:
179 :
180 : @par Example
181 : @code
182 : io_context ioc; // platform default
183 : io_context ioc2(corosio::epoll); // explicit backend
184 : @endcode
185 :
186 : @par Thread Safety
187 : Distinct objects: Safe.@n
188 : Shared objects: Safe, if using a concurrency hint greater
189 : than 1.
190 :
191 : @see epoll_t, select_t, kqueue_t, iocp_t
192 : */
193 : class BOOST_COROSIO_DECL io_context : public capy::execution_context
194 : {
195 : /// Pre-create services that depend on options (before construct).
196 : void apply_options_pre_(io_context_options const& opts);
197 :
198 : /// Apply runtime tuning to the scheduler (after construct).
199 : void apply_options_post_(
200 : io_context_options const& opts,
201 : unsigned concurrency_hint);
202 :
203 : /// Switch the scheduler to single-threaded (lockless) mode.
204 : void configure_single_threaded_();
205 :
206 : protected:
207 : detail::scheduler* sched_;
208 :
209 : public:
210 : /** The executor type for this context. */
211 : class executor_type;
212 :
213 : /** Construct with default concurrency and platform backend.
214 :
215 : Uses `std::thread::hardware_concurrency()` clamped to a minimum
216 : of 2 as the concurrency hint, so the default constructor never
217 : silently engages single-threaded mode (see
218 : @ref io_context_options::single_threaded). Pass an explicit
219 : `concurrency_hint == 1` to opt into single-threaded mode.
220 : */
221 : io_context();
222 :
223 : /** Construct with a concurrency hint and platform backend.
224 :
225 : @param concurrency_hint Hint for the number of threads
226 : that will call `run()`.
227 : */
228 : explicit io_context(unsigned concurrency_hint);
229 :
230 : /** Construct with runtime tuning options and platform backend.
231 :
232 : @param opts Runtime options controlling scheduler and
233 : service behavior.
234 : @param concurrency_hint Hint for the number of threads
235 : that will call `run()`.
236 : */
237 : explicit io_context(
238 : io_context_options const& opts,
239 : unsigned concurrency_hint = std::thread::hardware_concurrency());
240 :
241 : /** Construct with an explicit backend tag.
242 :
243 : @param backend The backend tag value selecting the I/O
244 : multiplexer (e.g. `corosio::epoll`).
245 : @param concurrency_hint Hint for the number of threads
246 : that will call `run()`.
247 : */
248 : template<class Backend>
249 : requires requires { Backend::construct; }
250 HIT 1176 : explicit io_context(
251 : Backend backend,
252 : unsigned concurrency_hint = std::thread::hardware_concurrency())
253 : : capy::execution_context(this)
254 1176 : , sched_(nullptr)
255 : {
256 : (void)backend;
257 1176 : sched_ = &Backend::construct(*this, concurrency_hint);
258 1176 : if (concurrency_hint == 1)
259 4 : configure_single_threaded_();
260 1176 : }
261 :
262 : /** Construct with an explicit backend tag and runtime options.
263 :
264 : @param backend The backend tag value selecting the I/O
265 : multiplexer (e.g. `corosio::epoll`).
266 : @param opts Runtime options controlling scheduler and
267 : service behavior.
268 : @param concurrency_hint Hint for the number of threads
269 : that will call `run()`.
270 : */
271 : template<class Backend>
272 : requires requires { Backend::construct; }
273 12 : explicit io_context(
274 : Backend backend,
275 : io_context_options const& opts,
276 : unsigned concurrency_hint = std::thread::hardware_concurrency())
277 : : capy::execution_context(this)
278 12 : , sched_(nullptr)
279 : {
280 : (void)backend;
281 12 : apply_options_pre_(opts);
282 12 : sched_ = &Backend::construct(*this, concurrency_hint);
283 12 : apply_options_post_(opts, concurrency_hint);
284 12 : }
285 :
286 : ~io_context();
287 :
288 : io_context(io_context const&) = delete;
289 : io_context& operator=(io_context const&) = delete;
290 :
291 : /** Return an executor for this context.
292 :
293 : The returned executor can be used to dispatch coroutines
294 : and post work items to this context.
295 :
296 : @return An executor associated with this context.
297 : */
298 : executor_type get_executor() const noexcept;
299 :
300 : /** Signal the context to stop processing.
301 :
302 : This causes `run()` to return as soon as possible. Any pending
303 : work items remain queued.
304 : */
305 6 : void stop()
306 : {
307 6 : sched_->stop();
308 6 : }
309 :
310 : /** Return whether the context has been stopped.
311 :
312 : @return `true` if `stop()` has been called and `restart()`
313 : has not been called since.
314 : */
315 66 : bool stopped() const noexcept
316 : {
317 66 : return sched_->stopped();
318 : }
319 :
320 : /** Restart the context after being stopped.
321 :
322 : This function must be called before `run()` can be called
323 : again after `stop()` has been called.
324 : */
325 175 : void restart()
326 : {
327 175 : sched_->restart();
328 175 : }
329 :
330 : /** Process all pending work items.
331 :
332 : This function blocks until all pending work items have been
333 : executed or `stop()` is called. The context is stopped
334 : when there is no more outstanding work.
335 :
336 : @note The context must be restarted with `restart()` before
337 : calling this function again after it returns.
338 :
339 : @return The number of handlers executed.
340 : */
341 833 : std::size_t run()
342 : {
343 833 : return sched_->run();
344 : }
345 :
346 : /** Process at most one pending work item.
347 :
348 : This function blocks until one work item has been executed
349 : or `stop()` is called. The context is stopped when there
350 : is no more outstanding work.
351 :
352 : @note The context must be restarted with `restart()` before
353 : calling this function again after it returns.
354 :
355 : @return The number of handlers executed (0 or 1).
356 : */
357 16 : std::size_t run_one()
358 : {
359 16 : return sched_->run_one();
360 : }
361 :
362 : /** Process work items for the specified duration.
363 :
364 : This function blocks until work items have been executed for
365 : the specified duration, or `stop()` is called. The context
366 : is stopped when there is no more outstanding work.
367 :
368 : @note The context must be restarted with `restart()` before
369 : calling this function again after it returns.
370 :
371 : @param rel_time The duration for which to process work.
372 :
373 : @return The number of handlers executed.
374 : */
375 : template<class Rep, class Period>
376 10 : std::size_t run_for(std::chrono::duration<Rep, Period> const& rel_time)
377 : {
378 10 : return run_until(std::chrono::steady_clock::now() + rel_time);
379 : }
380 :
381 : /** Process work items until the specified time.
382 :
383 : This function blocks until the specified time is reached
384 : or `stop()` is called. The context is stopped when there
385 : is no more outstanding work.
386 :
387 : @note The context must be restarted with `restart()` before
388 : calling this function again after it returns.
389 :
390 : @param abs_time The time point until which to process work.
391 :
392 : @return The number of handlers executed.
393 : */
394 : template<class Clock, class Duration>
395 : std::size_t
396 10 : run_until(std::chrono::time_point<Clock, Duration> const& abs_time)
397 : {
398 10 : std::size_t n = 0;
399 28 : while (run_one_until(abs_time))
400 18 : if (n != (std::numeric_limits<std::size_t>::max)())
401 18 : ++n;
402 10 : return n;
403 : }
404 :
405 : /** Process at most one work item for the specified duration.
406 :
407 : This function blocks until one work item has been executed,
408 : the specified duration has elapsed, or `stop()` is called.
409 : The context is stopped when there is no more outstanding work.
410 :
411 : @note The context must be restarted with `restart()` before
412 : calling this function again after it returns.
413 :
414 : @param rel_time The duration for which the call may block.
415 :
416 : @return The number of handlers executed (0 or 1).
417 : */
418 : template<class Rep, class Period>
419 6 : std::size_t run_one_for(std::chrono::duration<Rep, Period> const& rel_time)
420 : {
421 6 : return run_one_until(std::chrono::steady_clock::now() + rel_time);
422 : }
423 :
424 : /** Process at most one work item until the specified time.
425 :
426 : This function blocks until one work item has been executed,
427 : the specified time is reached, or `stop()` is called.
428 : The context is stopped when there is no more outstanding work.
429 :
430 : @note The context must be restarted with `restart()` before
431 : calling this function again after it returns.
432 :
433 : @param abs_time The time point until which the call may block.
434 :
435 : @return The number of handlers executed (0 or 1).
436 : */
437 : template<class Clock, class Duration>
438 : std::size_t
439 42 : run_one_until(std::chrono::time_point<Clock, Duration> const& abs_time)
440 : {
441 42 : typename Clock::time_point now = Clock::now();
442 8 : for (;;)
443 : {
444 50 : auto rel_time = abs_time - now;
445 : using rel_type = decltype(rel_time);
446 50 : if (rel_time < rel_type::zero())
447 4 : rel_time = rel_type::zero();
448 46 : else if (rel_time > std::chrono::seconds(1))
449 22 : rel_time = std::chrono::seconds(1);
450 :
451 50 : std::size_t s = sched_->wait_one(
452 : static_cast<long>(
453 50 : std::chrono::duration_cast<std::chrono::microseconds>(
454 : rel_time)
455 50 : .count()));
456 :
457 50 : if (s || stopped())
458 42 : return s;
459 :
460 12 : now = Clock::now();
461 12 : if (now >= abs_time)
462 4 : return 0;
463 : }
464 : }
465 :
466 : /** Process all ready work items without blocking.
467 :
468 : This function executes all work items that are ready to run
469 : without blocking for more work. The context is stopped
470 : when there is no more outstanding work.
471 :
472 : @note The context must be restarted with `restart()` before
473 : calling this function again after it returns.
474 :
475 : @return The number of handlers executed.
476 : */
477 26 : std::size_t poll()
478 : {
479 26 : return sched_->poll();
480 : }
481 :
482 : /** Process at most one ready work item without blocking.
483 :
484 : This function executes at most one work item that is ready
485 : to run without blocking for more work. The context is
486 : stopped when there is no more outstanding work.
487 :
488 : @note The context must be restarted with `restart()` before
489 : calling this function again after it returns.
490 :
491 : @return The number of handlers executed (0 or 1).
492 : */
493 8 : std::size_t poll_one()
494 : {
495 8 : return sched_->poll_one();
496 : }
497 : };
498 :
499 : /** An executor for dispatching work to an I/O context.
500 :
501 : The executor provides the interface for posting work items and
502 : dispatching coroutines to the associated context. It satisfies
503 : the `capy::Executor` concept.
504 :
505 : Executors are lightweight handles that can be copied and compared
506 : for equality. Two executors compare equal if they refer to the
507 : same context.
508 :
509 : @par Thread Safety
510 : Distinct objects: Safe.@n
511 : Shared objects: Safe.
512 : */
513 : class io_context::executor_type
514 : {
515 : io_context* ctx_ = nullptr;
516 :
517 : public:
518 : /** Default constructor.
519 :
520 : Constructs an executor not associated with any context.
521 : */
522 : executor_type() = default;
523 :
524 : /** Construct an executor from a context.
525 :
526 : @param ctx The context to associate with this executor.
527 : */
528 1248 : explicit executor_type(io_context& ctx) noexcept : ctx_(&ctx) {}
529 :
530 : /** Return a reference to the associated execution context.
531 :
532 : @return Reference to the context.
533 : */
534 2097 : io_context& context() const noexcept
535 : {
536 2097 : return *ctx_;
537 : }
538 :
539 : /** Check if the current thread is running this executor's context.
540 :
541 : @return `true` if `run()` is being called on this thread.
542 : */
543 2152 : bool running_in_this_thread() const noexcept
544 : {
545 2152 : return ctx_->sched_->running_in_this_thread();
546 : }
547 :
548 : /** Informs the executor that work is beginning.
549 :
550 : Must be paired with `on_work_finished()`.
551 : */
552 2433 : void on_work_started() const noexcept
553 : {
554 2433 : ctx_->sched_->work_started();
555 2433 : }
556 :
557 : /** Informs the executor that work has completed.
558 :
559 : @par Preconditions
560 : A preceding call to `on_work_started()` on an equal executor.
561 : */
562 2388 : void on_work_finished() const noexcept
563 : {
564 2388 : ctx_->sched_->work_finished();
565 2388 : }
566 :
567 : /** Dispatch a continuation.
568 :
569 : Returns a handle for symmetric transfer. If called from
570 : within `run()`, returns `c.h`. Otherwise posts the
571 : enclosing continuation_op as a scheduler_op for later
572 : execution and returns `std::noop_coroutine()`.
573 :
574 : @param c The continuation to dispatch. Must be the `cont`
575 : member of a `detail::continuation_op`.
576 :
577 : @return A handle for symmetric transfer or `std::noop_coroutine()`.
578 : */
579 2148 : std::coroutine_handle<> dispatch(capy::continuation& c) const
580 : {
581 2148 : if (running_in_this_thread())
582 675 : return c.h;
583 1473 : post(c);
584 1473 : return std::noop_coroutine();
585 : }
586 :
587 : /** Post a continuation for deferred execution.
588 :
589 : If the continuation is backed by a continuation_op
590 : (tagged), posts it directly as a scheduler_op — zero
591 : heap allocation. Otherwise falls back to the
592 : heap-allocating post(coroutine_handle<>) path.
593 : */
594 10443 : void post(capy::continuation& c) const
595 : {
596 10443 : auto* op = detail::continuation_op::try_from_continuation(c);
597 10443 : if (op)
598 8964 : ctx_->sched_->post(op);
599 : else
600 1479 : ctx_->sched_->post(c.h);
601 10443 : }
602 :
603 : /** Post a bare coroutine handle for deferred execution.
604 :
605 : Heap-allocates a scheduler_op to wrap the handle. Prefer
606 : posting through a continuation_op-backed continuation when
607 : the continuation has suitable lifetime.
608 :
609 : @param h The coroutine handle to post.
610 : */
611 2876 : void post(std::coroutine_handle<> h) const
612 : {
613 2876 : ctx_->sched_->post(h);
614 2876 : }
615 :
616 : /** Compare two executors for equality.
617 :
618 : @return `true` if both executors refer to the same context.
619 : */
620 2 : bool operator==(executor_type const& other) const noexcept
621 : {
622 2 : return ctx_ == other.ctx_;
623 : }
624 :
625 : /** Compare two executors for inequality.
626 :
627 : @return `true` if the executors refer to different contexts.
628 : */
629 : bool operator!=(executor_type const& other) const noexcept
630 : {
631 : return ctx_ != other.ctx_;
632 : }
633 : };
634 :
635 : inline io_context::executor_type
636 1248 : io_context::get_executor() const noexcept
637 : {
638 1248 : return executor_type(const_cast<io_context&>(*this));
639 : }
640 :
641 : } // namespace boost::corosio
642 :
643 : #endif // BOOST_COROSIO_IO_CONTEXT_HPP
|