TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_TCP_SERVER_HPP
11 : #define BOOST_COROSIO_TCP_SERVER_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/detail/except.hpp>
15 : #include <boost/corosio/tcp_acceptor.hpp>
16 : #include <boost/corosio/tcp_socket.hpp>
17 : #include <boost/corosio/io_context.hpp>
18 : #include <boost/corosio/endpoint.hpp>
19 : #include <boost/capy/task.hpp>
20 : #include <boost/capy/concept/execution_context.hpp>
21 : #include <boost/capy/concept/io_awaitable.hpp>
22 : #include <boost/capy/concept/executor.hpp>
23 : #include <boost/capy/ex/any_executor.hpp>
24 : #include <boost/capy/ex/frame_allocator.hpp>
25 : #include <boost/capy/ex/io_env.hpp>
26 : #include <boost/capy/ex/run_async.hpp>
27 :
28 : #include <coroutine>
29 : #include <memory>
30 : #include <ranges>
31 : #include <vector>
32 :
33 : namespace boost::corosio {
34 :
35 : #ifdef _MSC_VER
36 : #pragma warning(push)
37 : #pragma warning(disable : 4251) // class needs to have dll-interface
38 : #endif
39 :
40 : /** TCP server with pooled workers.
41 :
42 : This class manages a pool of reusable worker objects that handle
43 : incoming connections. When a connection arrives, an idle worker
44 : is dispatched to handle it. After the connection completes, the
45 : worker returns to the pool for reuse, avoiding allocation overhead
46 : per connection.
47 :
48 : Workers are set via @ref set_workers as a forward range of
49 : pointer-like objects (e.g., `unique_ptr<worker_base>`). The server
50 : takes ownership of the container via type erasure.
51 :
52 : @par Thread Safety
53 : Distinct objects: Safe.
54 : Shared objects: Unsafe.
55 :
56 : @par Lifecycle
57 : The server operates in three states:
58 :
59 : - **Stopped**: Initial state, or after @ref join completes.
60 : - **Running**: After @ref start, actively accepting connections.
61 : - **Stopping**: After @ref stop, draining active work.
62 :
63 : State transitions:
64 : @code
65 : [Stopped] --start()--> [Running] --stop()--> [Stopping] --join()--> [Stopped]
66 : @endcode
67 :
68 : @par Running the Server
69 : @code
70 : io_context ioc;
71 : tcp_server srv(ioc, ioc.get_executor());
72 : srv.set_workers(make_workers(ioc, 100));
73 : srv.bind(endpoint{address_v4::any(), 8080});
74 : srv.start();
75 : ioc.run(); // Blocks until all work completes
76 : @endcode
77 :
78 : @par Graceful Shutdown
79 : To shut down gracefully, call @ref stop then drain the io_context:
80 : @code
81 : // From a signal handler or timer callback:
82 : srv.stop();
83 :
84 : // ioc.run() returns after pending work drains.
85 : // Then from the thread that called ioc.run():
86 : srv.join(); // Wait for accept loops to finish
87 : @endcode
88 :
89 : @par Restart After Stop
90 : The server can be restarted after a complete shutdown cycle.
91 : You must drain the io_context and call @ref join before restarting:
92 : @code
93 : srv.start();
94 : ioc.run_for( 10s ); // Run for a while
95 : srv.stop(); // Signal shutdown
96 : ioc.run(); // REQUIRED: drain pending completions
97 : srv.join(); // REQUIRED: wait for accept loops
98 :
99 : // Now safe to restart
100 : srv.start();
101 : ioc.run();
102 : @endcode
103 :
104 : @par WARNING: What NOT to Do
105 : - Do NOT call @ref join from inside a worker coroutine (deadlock).
106 : - Do NOT call @ref join from a thread running `ioc.run()` (deadlock).
107 : - Do NOT call @ref start without completing @ref join after @ref stop.
108 : - Do NOT call `ioc.stop()` for graceful shutdown; use @ref stop instead.
109 :
110 : @par Example
111 : @code
112 : class my_worker : public tcp_server::worker_base
113 : {
114 : corosio::tcp_socket sock_;
115 : capy::any_executor ex_;
116 : public:
117 : my_worker(io_context& ctx)
118 : : sock_(ctx)
119 : , ex_(ctx.get_executor())
120 : {
121 : }
122 :
123 : corosio::tcp_socket& socket() override { return sock_; }
124 :
125 : void run(launcher launch) override
126 : {
127 : launch(ex_, [](corosio::tcp_socket* sock) -> capy::task<>
128 : {
129 : // handle connection using sock
130 : co_return;
131 : }(&sock_));
132 : }
133 : };
134 :
135 : auto make_workers(io_context& ctx, int n)
136 : {
137 : std::vector<std::unique_ptr<tcp_server::worker_base>> v;
138 : v.reserve(n);
139 : for(int i = 0; i < n; ++i)
140 : v.push_back(std::make_unique<my_worker>(ctx));
141 : return v;
142 : }
143 :
144 : io_context ioc;
145 : tcp_server srv(ioc, ioc.get_executor());
146 : srv.set_workers(make_workers(ioc, 100));
147 : @endcode
148 :
149 : @see worker_base, set_workers, launcher
150 : */
151 : class BOOST_COROSIO_DECL tcp_server
152 : {
153 : public:
154 : class worker_base; ///< Abstract base for connection handlers.
155 : class launcher; ///< Move-only handle to launch worker coroutines.
156 :
157 : private:
158 : struct waiter
159 : {
160 : waiter* next;
161 : std::coroutine_handle<> h;
162 : detail::continuation_op cont_op;
163 : worker_base* w;
164 : };
165 :
166 : struct impl;
167 :
168 : static impl* make_impl(capy::execution_context& ctx);
169 :
170 : impl* impl_;
171 : capy::any_executor ex_;
172 : waiter* waiters_ = nullptr;
173 : worker_base* idle_head_ = nullptr; // Forward list: available workers
174 : worker_base* active_head_ =
175 : nullptr; // Doubly linked: workers handling connections
176 : worker_base* active_tail_ = nullptr; // Tail for O(1) push_back
177 : std::size_t active_accepts_ = 0; // Number of active do_accept coroutines
178 : std::shared_ptr<void> storage_; // Owns the worker container (type-erased)
179 : bool running_ = false;
180 :
181 : // Idle list (forward/singly linked) - push front, pop front
182 HIT 45 : void idle_push(worker_base* w) noexcept
183 : {
184 45 : w->next_ = idle_head_;
185 45 : idle_head_ = w;
186 45 : }
187 :
188 9 : worker_base* idle_pop() noexcept
189 : {
190 9 : auto* w = idle_head_;
191 9 : if (w)
192 9 : idle_head_ = w->next_;
193 9 : return w;
194 : }
195 :
196 9 : bool idle_empty() const noexcept
197 : {
198 9 : return idle_head_ == nullptr;
199 : }
200 :
201 : // Active list (doubly linked) - push back, remove anywhere
202 3 : void active_push(worker_base* w) noexcept
203 : {
204 3 : w->next_ = nullptr;
205 3 : w->prev_ = active_tail_;
206 3 : if (active_tail_)
207 MIS 0 : active_tail_->next_ = w;
208 : else
209 HIT 3 : active_head_ = w;
210 3 : active_tail_ = w;
211 3 : }
212 :
213 9 : void active_remove(worker_base* w) noexcept
214 : {
215 : // Skip if not in active list (e.g., after failed accept)
216 9 : if (w != active_head_ && w->prev_ == nullptr)
217 6 : return;
218 3 : if (w->prev_)
219 MIS 0 : w->prev_->next_ = w->next_;
220 : else
221 HIT 3 : active_head_ = w->next_;
222 3 : if (w->next_)
223 MIS 0 : w->next_->prev_ = w->prev_;
224 : else
225 HIT 3 : active_tail_ = w->prev_;
226 3 : w->prev_ = nullptr; // Mark as not in active list
227 : }
228 :
229 : template<capy::Executor Ex>
230 : struct launch_wrapper
231 : {
232 : struct promise_type
233 : {
234 : Ex ex; // Executor stored directly in frame (outlives child tasks)
235 : capy::io_env env_;
236 :
237 : // For regular coroutines: first arg is executor, second is stop token
238 : template<class E, class S, class... Args>
239 : requires capy::Executor<std::decay_t<E>>
240 : promise_type(E e, S s, Args&&...)
241 : : ex(std::move(e))
242 : , env_{
243 : capy::executor_ref(ex), std::move(s),
244 : capy::get_current_frame_allocator()}
245 : {
246 : }
247 :
248 : // For lambda coroutines: first arg is closure, second is executor, third is stop token
249 : template<class Closure, class E, class S, class... Args>
250 : requires(!capy::Executor<std::decay_t<Closure>> &&
251 : capy::Executor<std::decay_t<E>>)
252 3 : promise_type(Closure&&, E e, S s, Args&&...)
253 3 : : ex(std::move(e))
254 3 : , env_{
255 3 : capy::executor_ref(ex), std::move(s),
256 3 : capy::get_current_frame_allocator()}
257 : {
258 3 : }
259 :
260 3 : launch_wrapper get_return_object() noexcept
261 : {
262 : return {
263 3 : std::coroutine_handle<promise_type>::from_promise(*this)};
264 : }
265 3 : std::suspend_always initial_suspend() noexcept
266 : {
267 3 : return {};
268 : }
269 3 : std::suspend_never final_suspend() noexcept
270 : {
271 3 : return {};
272 : }
273 3 : void return_void() noexcept {}
274 MIS 0 : void unhandled_exception()
275 : {
276 0 : std::terminate();
277 : }
278 :
279 : // Inject io_env for IoAwaitable
280 : template<capy::IoAwaitable Awaitable>
281 HIT 6 : auto await_transform(Awaitable&& a)
282 : {
283 : using AwaitableT = std::decay_t<Awaitable>;
284 : struct adapter
285 : {
286 : AwaitableT aw;
287 : capy::io_env const* env;
288 :
289 6 : bool await_ready()
290 : {
291 6 : return aw.await_ready();
292 : }
293 6 : decltype(auto) await_resume()
294 : {
295 6 : return aw.await_resume();
296 : }
297 :
298 6 : auto await_suspend(std::coroutine_handle<promise_type> h)
299 : {
300 6 : return aw.await_suspend(h, env);
301 : }
302 : };
303 12 : return adapter{std::forward<Awaitable>(a), &env_};
304 6 : }
305 : };
306 :
307 : std::coroutine_handle<promise_type> h;
308 :
309 3 : launch_wrapper(std::coroutine_handle<promise_type> handle) noexcept
310 3 : : h(handle)
311 : {
312 3 : }
313 :
314 3 : ~launch_wrapper()
315 : {
316 3 : if (h)
317 MIS 0 : h.destroy();
318 HIT 3 : }
319 :
320 : launch_wrapper(launch_wrapper&& o) noexcept
321 : : h(std::exchange(o.h, nullptr))
322 : {
323 : }
324 :
325 : launch_wrapper(launch_wrapper const&) = delete;
326 : launch_wrapper& operator=(launch_wrapper const&) = delete;
327 : launch_wrapper& operator=(launch_wrapper&&) = delete;
328 : };
329 :
330 : // Named functor to avoid incomplete lambda type in coroutine promise
331 : template<class Executor>
332 : struct launch_coro
333 : {
334 3 : launch_wrapper<Executor> operator()(
335 : Executor,
336 : std::stop_token,
337 : tcp_server* self,
338 : capy::task<void> t,
339 : worker_base* wp)
340 : {
341 : // Executor and stop token stored in promise via constructor
342 : co_await std::move(t);
343 : co_await self->push(*wp); // worker goes back to idle list
344 6 : }
345 : };
346 :
347 : class push_awaitable
348 : {
349 : tcp_server& self_;
350 : worker_base& w_;
351 : detail::continuation_op cont_op_;
352 :
353 : public:
354 9 : push_awaitable(tcp_server& self, worker_base& w) noexcept
355 9 : : self_(self)
356 9 : , w_(w)
357 : {
358 9 : }
359 :
360 9 : bool await_ready() const noexcept
361 : {
362 9 : return false;
363 : }
364 :
365 : std::coroutine_handle<>
366 9 : await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
367 : {
368 : // Symmetric transfer to server's executor
369 9 : cont_op_.cont.h = h;
370 9 : return self_.ex_.dispatch(cont_op_.cont);
371 : }
372 :
373 9 : void await_resume() noexcept
374 : {
375 : // Running on server executor - safe to modify lists
376 : // Remove from active (if present), then wake waiter or add to idle
377 9 : self_.active_remove(&w_);
378 9 : if (self_.waiters_)
379 : {
380 MIS 0 : auto* wait = self_.waiters_;
381 0 : self_.waiters_ = wait->next;
382 0 : wait->w = &w_;
383 0 : wait->cont_op.cont.h = wait->h;
384 0 : self_.ex_.post(wait->cont_op.cont);
385 : }
386 : else
387 : {
388 HIT 9 : self_.idle_push(&w_);
389 : }
390 9 : }
391 : };
392 :
393 : class pop_awaitable
394 : {
395 : tcp_server& self_;
396 : waiter wait_;
397 :
398 : public:
399 9 : pop_awaitable(tcp_server& self) noexcept : self_(self), wait_{} {}
400 :
401 9 : bool await_ready() const noexcept
402 : {
403 9 : return !self_.idle_empty();
404 : }
405 :
406 : bool
407 MIS 0 : await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
408 : {
409 : // Running on server executor (do_accept runs there)
410 0 : wait_.h = h;
411 0 : wait_.w = nullptr;
412 0 : wait_.next = self_.waiters_;
413 0 : self_.waiters_ = &wait_;
414 0 : return true;
415 : }
416 :
417 HIT 9 : worker_base& await_resume() noexcept
418 : {
419 : // Running on server executor
420 9 : if (wait_.w)
421 MIS 0 : return *wait_.w; // Woken by push_awaitable
422 HIT 9 : return *self_.idle_pop();
423 : }
424 : };
425 :
426 9 : push_awaitable push(worker_base& w)
427 : {
428 9 : return push_awaitable{*this, w};
429 : }
430 :
431 : // Synchronous version for destructor/guard paths
432 : // Must be called from server executor context
433 MIS 0 : void push_sync(worker_base& w) noexcept
434 : {
435 0 : active_remove(&w);
436 0 : if (waiters_)
437 : {
438 0 : auto* wait = waiters_;
439 0 : waiters_ = wait->next;
440 0 : wait->w = &w;
441 0 : wait->cont_op.cont.h = wait->h;
442 0 : ex_.post(wait->cont_op.cont);
443 : }
444 : else
445 : {
446 0 : idle_push(&w);
447 : }
448 0 : }
449 :
450 HIT 9 : pop_awaitable pop()
451 : {
452 9 : return pop_awaitable{*this};
453 : }
454 :
455 : capy::task<void> do_accept(tcp_acceptor& acc);
456 :
457 : public:
458 : /** Abstract base class for connection handlers.
459 :
460 : Derive from this class to implement custom connection handling.
461 : Each worker owns a socket and is reused across multiple
462 : connections to avoid per-connection allocation.
463 :
464 : @see tcp_server, launcher
465 : */
466 : class BOOST_COROSIO_DECL worker_base
467 : {
468 : // Ordered largest to smallest for optimal packing
469 : std::stop_source stop_; // ~16 bytes
470 : worker_base* next_ = nullptr; // 8 bytes - used by idle and active lists
471 : worker_base* prev_ = nullptr; // 8 bytes - used only by active list
472 :
473 : friend class tcp_server;
474 :
475 : public:
476 : /// Construct a worker.
477 : worker_base();
478 :
479 : /// Destroy the worker.
480 : virtual ~worker_base();
481 :
482 : /** Handle an accepted connection.
483 :
484 : Called when this worker is dispatched to handle a new
485 : connection. The implementation must invoke the launcher
486 : exactly once to start the handling coroutine.
487 :
488 : @param launch Handle to launch the connection coroutine.
489 : */
490 : virtual void run(launcher launch) = 0;
491 :
492 : /// Return the socket used for connections.
493 : virtual corosio::tcp_socket& socket() = 0;
494 : };
495 :
496 : /** Move-only handle to launch a worker coroutine.
497 :
498 : Passed to @ref worker_base::run to start the connection-handling
499 : coroutine. The launcher ensures the worker returns to the idle
500 : pool when the coroutine completes or if launching fails.
501 :
502 : The launcher must be invoked exactly once via `operator()`.
503 : If destroyed without invoking, the worker is returned to the
504 : idle pool automatically.
505 :
506 : @see worker_base::run
507 : */
508 : class BOOST_COROSIO_DECL launcher
509 : {
510 : tcp_server* srv_;
511 : worker_base* w_;
512 :
513 : friend class tcp_server;
514 :
515 3 : launcher(tcp_server& srv, worker_base& w) noexcept : srv_(&srv), w_(&w)
516 : {
517 3 : }
518 :
519 : public:
520 : /// Return the worker to the pool if not launched.
521 3 : ~launcher()
522 : {
523 3 : if (w_)
524 MIS 0 : srv_->push_sync(*w_);
525 HIT 3 : }
526 :
527 : launcher(launcher&& o) noexcept
528 : : srv_(o.srv_)
529 : , w_(std::exchange(o.w_, nullptr))
530 : {
531 : }
532 : launcher(launcher const&) = delete;
533 : launcher& operator=(launcher const&) = delete;
534 : launcher& operator=(launcher&&) = delete;
535 :
536 : /** Launch the connection-handling coroutine.
537 :
538 : Starts the given coroutine on the specified executor. When
539 : the coroutine completes, the worker is automatically returned
540 : to the idle pool.
541 :
542 : @param ex The executor to run the coroutine on.
543 : @param task The coroutine to execute.
544 :
545 : @throws std::logic_error If this launcher was already invoked.
546 : */
547 : template<class Executor>
548 3 : void operator()(Executor const& ex, capy::task<void> task)
549 : {
550 3 : if (!w_)
551 MIS 0 : detail::throw_logic_error(); // launcher already invoked
552 :
553 HIT 3 : auto* w = std::exchange(w_, nullptr);
554 :
555 : // Worker is being dispatched - add to active list
556 3 : srv_->active_push(w);
557 :
558 : // Return worker to pool if coroutine setup throws
559 : struct guard_t
560 : {
561 : tcp_server* srv;
562 : worker_base* w;
563 3 : ~guard_t()
564 : {
565 3 : if (w)
566 MIS 0 : srv->push_sync(*w);
567 HIT 3 : }
568 3 : } guard{srv_, w};
569 :
570 : // Reset worker's stop source for this connection
571 3 : w->stop_ = {};
572 3 : auto st = w->stop_.get_token();
573 :
574 3 : auto wrapper =
575 3 : launch_coro<Executor>{}(ex, st, srv_, std::move(task), w);
576 :
577 : // Executor and stop token stored in promise via constructor
578 3 : ex.post(std::exchange(wrapper.h, nullptr)); // Release before post
579 3 : guard.w = nullptr; // Success - dismiss guard
580 3 : }
581 : };
582 :
583 : /** Construct a TCP server.
584 :
585 : @tparam Ctx Execution context type satisfying ExecutionContext.
586 : @tparam Ex Executor type satisfying Executor.
587 :
588 : @param ctx The execution context for socket operations.
589 : @param ex The executor for dispatching coroutines.
590 :
591 : @par Example
592 : @code
593 : tcp_server srv(ctx, ctx.get_executor());
594 : srv.set_workers(make_workers(ctx, 100));
595 : srv.bind(endpoint{...});
596 : srv.start();
597 : @endcode
598 : */
599 : template<capy::ExecutionContext Ctx, capy::Executor Ex>
600 9 : tcp_server(Ctx& ctx, Ex ex) : impl_(make_impl(ctx))
601 9 : , ex_(std::move(ex))
602 : {
603 9 : }
604 :
605 : public:
606 : /// Destroy the server, stopping all accept loops.
607 : ~tcp_server();
608 :
609 : tcp_server(tcp_server const&) = delete;
610 : tcp_server& operator=(tcp_server const&) = delete;
611 :
612 : /** Move construct from another server.
613 :
614 : @param o The source server. After the move, @p o is
615 : in a valid but unspecified state.
616 : */
617 : tcp_server(tcp_server&& o) noexcept;
618 :
619 : /** Move assign from another server.
620 :
621 : @param o The source server. After the move, @p o is
622 : in a valid but unspecified state.
623 :
624 : @return `*this`.
625 : */
626 : tcp_server& operator=(tcp_server&& o) noexcept;
627 :
628 : /** Bind to a local endpoint.
629 :
630 : Creates an acceptor listening on the specified endpoint.
631 : Multiple endpoints can be bound by calling this method
632 : multiple times before @ref start.
633 :
634 : @param ep The local endpoint to bind to.
635 :
636 : @return The error code if binding fails.
637 : */
638 : std::error_code bind(endpoint ep);
639 :
640 : /** Set the worker pool.
641 :
642 : Replaces any existing workers with the given range. Any
643 : previous workers are released and the idle/active lists
644 : are cleared before populating with new workers.
645 :
646 : @tparam Range Forward range of pointer-like objects to worker_base.
647 :
648 : @param workers Range of workers to manage. Each element must
649 : support `std::to_address()` yielding `worker_base*`.
650 :
651 : @par Example
652 : @code
653 : std::vector<std::unique_ptr<my_worker>> workers;
654 : for(int i = 0; i < 100; ++i)
655 : workers.push_back(std::make_unique<my_worker>(ctx));
656 : srv.set_workers(std::move(workers));
657 : @endcode
658 : */
659 : template<std::ranges::forward_range Range>
660 : requires std::convertible_to<
661 : decltype(std::to_address(
662 : std::declval<std::ranges::range_value_t<Range>&>())),
663 : worker_base*>
664 9 : void set_workers(Range&& workers)
665 : {
666 : // Clear existing state
667 9 : storage_.reset();
668 9 : idle_head_ = nullptr;
669 9 : active_head_ = nullptr;
670 9 : active_tail_ = nullptr;
671 :
672 : // Take ownership and populate idle list
673 : using StorageType = std::decay_t<Range>;
674 9 : auto* p = new StorageType(std::forward<Range>(workers));
675 9 : storage_ = std::shared_ptr<void>(
676 9 : p, [](void* ptr) { delete static_cast<StorageType*>(ptr); });
677 45 : for (auto&& elem : *static_cast<StorageType*>(p))
678 36 : idle_push(std::to_address(elem));
679 9 : }
680 :
681 : /** Start accepting connections.
682 :
683 : Launches accept loops for all bound endpoints. Incoming
684 : connections are dispatched to idle workers from the pool.
685 :
686 : Calling `start()` on an already-running server has no effect.
687 :
688 : @par Preconditions
689 : - At least one endpoint bound via @ref bind.
690 : - Workers provided to the constructor.
691 : - If restarting, @ref join must have completed first.
692 :
693 : @par Effects
694 : Creates one accept coroutine per bound endpoint. Each coroutine
695 : runs on the server's executor, waiting for connections and
696 : dispatching them to idle workers.
697 :
698 : @par Restart Sequence
699 : To restart after stopping, complete the full shutdown cycle:
700 : @code
701 : srv.start();
702 : ioc.run_for( 1s );
703 : srv.stop(); // 1. Signal shutdown
704 : ioc.run(); // 2. Drain remaining completions
705 : srv.join(); // 3. Wait for accept loops
706 :
707 : // Now safe to restart
708 : srv.start();
709 : ioc.run();
710 : @endcode
711 :
712 : @par Thread Safety
713 : Not thread safe.
714 :
715 : @throws std::logic_error If a previous session has not been
716 : joined (accept loops still active).
717 : */
718 : void start();
719 :
720 : /** Return the local endpoint for the i-th bound port.
721 :
722 : @param index Zero-based index into the list of bound ports.
723 :
724 : @return The local endpoint, or a default-constructed endpoint
725 : if @p index is out of range or the acceptor is not open.
726 : */
727 : endpoint local_endpoint(std::size_t index = 0) const noexcept;
728 :
729 : /** Stop accepting connections.
730 :
731 : Signals all listening ports to stop accepting new connections
732 : and requests cancellation of active workers via their stop tokens.
733 :
734 : This function returns immediately; it does not wait for workers
735 : to finish. Pending I/O operations complete asynchronously.
736 :
737 : Calling `stop()` on a non-running server has no effect.
738 :
739 : @par Effects
740 : - Closes all acceptors (pending accepts complete with error).
741 : - Requests stop on each active worker's stop token.
742 : - Workers observing their stop token should exit promptly.
743 :
744 : @par Postconditions
745 : No new connections will be accepted. Active workers continue
746 : until they observe their stop token or complete naturally.
747 :
748 : @par What Happens Next
749 : After calling `stop()`:
750 : 1. Let `ioc.run()` return (drains pending completions).
751 : 2. Call @ref join to wait for accept loops to finish.
752 : 3. Only then is it safe to restart or destroy the server.
753 :
754 : @par Thread Safety
755 : Not thread safe.
756 :
757 : @see join, start
758 : */
759 : void stop();
760 :
761 : /** Block until all accept loops complete.
762 :
763 : Blocks the calling thread until all accept coroutines launched
764 : by @ref start have finished executing. This synchronizes the
765 : shutdown sequence, ensuring the server is fully stopped before
766 : restarting or destroying it.
767 :
768 : @par Preconditions
769 : @ref stop has been called and `ioc.run()` has returned.
770 :
771 : @par Postconditions
772 : All accept loops have completed. The server is in the stopped
773 : state and may be restarted via @ref start.
774 :
775 : @par Example (Correct Usage)
776 : @code
777 : // main thread
778 : srv.start();
779 : ioc.run(); // Blocks until work completes
780 : srv.join(); // Safe: called after ioc.run() returns
781 : @endcode
782 :
783 : @par WARNING: Deadlock Scenarios
784 : Calling `join()` from the wrong context causes deadlock:
785 :
786 : @code
787 : // WRONG: calling join() from inside a worker coroutine
788 : void run( launcher launch ) override
789 : {
790 : launch( ex, [this]() -> capy::task<>
791 : {
792 : srv_.join(); // DEADLOCK: blocks the executor
793 : co_return;
794 : }());
795 : }
796 :
797 : // WRONG: calling join() while ioc.run() is still active
798 : std::thread t( [&]{ ioc.run(); } );
799 : srv.stop();
800 : srv.join(); // DEADLOCK: ioc.run() still running in thread t
801 : @endcode
802 :
803 : @par Thread Safety
804 : May be called from any thread, but will deadlock if called
805 : from within the io_context event loop or from a worker coroutine.
806 :
807 : @see stop, start
808 : */
809 : void join();
810 :
811 : private:
812 : capy::task<> do_stop();
813 : };
814 :
815 : #ifdef _MSC_VER
816 : #pragma warning(pop)
817 : #endif
818 :
819 : } // namespace boost::corosio
820 :
821 : #endif
|