include/boost/corosio/tcp_server.hpp

78.2% Lines (111/142) 91.2% List of functions (31/34)
tcp_server.hpp
f(x) Functions (34)
Function Calls Lines Blocks
boost::corosio::tcp_server::idle_push(boost::corosio::tcp_server::worker_base*) :182 45x 100.0% 100.0% boost::corosio::tcp_server::idle_pop() :188 9x 100.0% 100.0% boost::corosio::tcp_server::idle_empty() const :196 9x 100.0% 100.0% boost::corosio::tcp_server::active_push(boost::corosio::tcp_server::worker_base*) :202 3x 87.5% 75.0% boost::corosio::tcp_server::active_remove(boost::corosio::tcp_server::worker_base*) :213 9x 80.0% 64.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::promise_type<boost::corosio::tcp_server::launch_coro<boost::corosio::io_context::executor_type>&, boost::corosio::io_context::executor_type, std::stop_token, boost::corosio::tcp_server*&, boost::capy::task<void>&, boost::corosio::tcp_server::worker_base*&>(boost::corosio::tcp_server::launch_coro<boost::corosio::io_context::executor_type>&&, boost::corosio::io_context::executor_type, std::stop_token, boost::corosio::tcp_server*&, boost::capy::task<void>&, boost::corosio::tcp_server::worker_base*&) :252 3x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::get_return_object() :260 3x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::initial_suspend() :265 3x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::final_suspend() :269 3x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::return_void() :273 3x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::unhandled_exception() :274 0 0.0% 0.0% auto boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::await_transform<boost::capy::task<void> >(boost::capy::task<void>&&) :281 3x 100.0% 100.0% auto boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::await_transform<boost::corosio::tcp_server::push_awaitable>(boost::corosio::tcp_server::push_awaitable&&) :281 3x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::launch_wrapper(std::__n4861::coroutine_handle<boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type>) :309 3x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::~launch_wrapper() :314 3x 75.0% 75.0% boost::corosio::tcp_server::launch_coro<boost::corosio::io_context::executor_type>::operator()(boost::corosio::io_context::executor_type, std::stop_token, boost::corosio::tcp_server*, boost::capy::task<void>, boost::corosio::tcp_server::worker_base*) :334 3x 100.0% 46.0% boost::corosio::tcp_server::push_awaitable::push_awaitable(boost::corosio::tcp_server&, boost::corosio::tcp_server::worker_base&) :354 9x 100.0% 100.0% boost::corosio::tcp_server::push_awaitable::await_ready() const :360 9x 100.0% 100.0% boost::corosio::tcp_server::push_awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :366 9x 100.0% 100.0% boost::corosio::tcp_server::push_awaitable::await_resume() :373 9x 50.0% 80.0% boost::corosio::tcp_server::pop_awaitable::pop_awaitable(boost::corosio::tcp_server&) :399 9x 100.0% 100.0% boost::corosio::tcp_server::pop_awaitable::await_ready() const :401 9x 100.0% 100.0% boost::corosio::tcp_server::pop_awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :407 0 0.0% 0.0% boost::corosio::tcp_server::pop_awaitable::await_resume() :417 9x 75.0% 75.0% boost::corosio::tcp_server::push(boost::corosio::tcp_server::worker_base&) :426 9x 100.0% 100.0% boost::corosio::tcp_server::push_sync(boost::corosio::tcp_server::worker_base&) :433 0 0.0% 0.0% boost::corosio::tcp_server::pop() :450 9x 100.0% 100.0% boost::corosio::tcp_server::launcher::launcher(boost::corosio::tcp_server&, boost::corosio::tcp_server::worker_base&) :515 3x 100.0% 100.0% boost::corosio::tcp_server::launcher::~launcher() :521 3x 75.0% 67.0% void boost::corosio::tcp_server::launcher::operator()<boost::corosio::io_context::executor_type>(boost::corosio::io_context::executor_type const&, boost::capy::task<void>) :548 3x 80.0% 55.0% boost::corosio::tcp_server::launcher::operator()<boost::corosio::io_context::executor_type>(boost::corosio::io_context::executor_type const&, boost::capy::task<void>)::guard_t::~guard_t() :563 3x 91.7% 67.0% boost::corosio::tcp_server::tcp_server<boost::corosio::io_context, boost::corosio::io_context::executor_type>(boost::corosio::io_context&, boost::corosio::io_context::executor_type) :600 9x 100.0% 100.0% void boost::corosio::tcp_server::set_workers<std::vector<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> >, std::allocator<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> > > > >(std::vector<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> >, std::allocator<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> > > >&&) :664 9x 100.0% 100.0% boost::corosio::tcp_server::set_workers<std::vector<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> >, std::allocator<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> > > > >(std::vector<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> >, std::allocator<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> > > >&&)::{lambda(void*)#1}::operator()(void*) const :676 9x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_TCP_SERVER_HPP
11 #define BOOST_COROSIO_TCP_SERVER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/except.hpp>
15 #include <boost/corosio/tcp_acceptor.hpp>
16 #include <boost/corosio/tcp_socket.hpp>
17 #include <boost/corosio/io_context.hpp>
18 #include <boost/corosio/endpoint.hpp>
19 #include <boost/capy/task.hpp>
20 #include <boost/capy/concept/execution_context.hpp>
21 #include <boost/capy/concept/io_awaitable.hpp>
22 #include <boost/capy/concept/executor.hpp>
23 #include <boost/capy/ex/any_executor.hpp>
24 #include <boost/capy/ex/frame_allocator.hpp>
25 #include <boost/capy/ex/io_env.hpp>
26 #include <boost/capy/ex/run_async.hpp>
27
28 #include <coroutine>
29 #include <memory>
30 #include <ranges>
31 #include <vector>
32
33 namespace boost::corosio {
34
35 #ifdef _MSC_VER
36 #pragma warning(push)
37 #pragma warning(disable : 4251) // class needs to have dll-interface
38 #endif
39
40 /** TCP server with pooled workers.
41
42 This class manages a pool of reusable worker objects that handle
43 incoming connections. When a connection arrives, an idle worker
44 is dispatched to handle it. After the connection completes, the
45 worker returns to the pool for reuse, avoiding allocation overhead
46 per connection.
47
48 Workers are set via @ref set_workers as a forward range of
49 pointer-like objects (e.g., `unique_ptr<worker_base>`). The server
50 takes ownership of the container via type erasure.
51
52 @par Thread Safety
53 Distinct objects: Safe.
54 Shared objects: Unsafe.
55
56 @par Lifecycle
57 The server operates in three states:
58
59 - **Stopped**: Initial state, or after @ref join completes.
60 - **Running**: After @ref start, actively accepting connections.
61 - **Stopping**: After @ref stop, draining active work.
62
63 State transitions:
64 @code
65 [Stopped] --start()--> [Running] --stop()--> [Stopping] --join()--> [Stopped]
66 @endcode
67
68 @par Running the Server
69 @code
70 io_context ioc;
71 tcp_server srv(ioc, ioc.get_executor());
72 srv.set_workers(make_workers(ioc, 100));
73 srv.bind(endpoint{address_v4::any(), 8080});
74 srv.start();
75 ioc.run(); // Blocks until all work completes
76 @endcode
77
78 @par Graceful Shutdown
79 To shut down gracefully, call @ref stop then drain the io_context:
80 @code
81 // From a signal handler or timer callback:
82 srv.stop();
83
84 // ioc.run() returns after pending work drains.
85 // Then from the thread that called ioc.run():
86 srv.join(); // Wait for accept loops to finish
87 @endcode
88
89 @par Restart After Stop
90 The server can be restarted after a complete shutdown cycle.
91 You must drain the io_context and call @ref join before restarting:
92 @code
93 srv.start();
94 ioc.run_for( 10s ); // Run for a while
95 srv.stop(); // Signal shutdown
96 ioc.run(); // REQUIRED: drain pending completions
97 srv.join(); // REQUIRED: wait for accept loops
98
99 // Now safe to restart
100 srv.start();
101 ioc.run();
102 @endcode
103
104 @par WARNING: What NOT to Do
105 - Do NOT call @ref join from inside a worker coroutine (deadlock).
106 - Do NOT call @ref join from a thread running `ioc.run()` (deadlock).
107 - Do NOT call @ref start without completing @ref join after @ref stop.
108 - Do NOT call `ioc.stop()` for graceful shutdown; use @ref stop instead.
109
110 @par Example
111 @code
112 class my_worker : public tcp_server::worker_base
113 {
114 corosio::tcp_socket sock_;
115 capy::any_executor ex_;
116 public:
117 my_worker(io_context& ctx)
118 : sock_(ctx)
119 , ex_(ctx.get_executor())
120 {
121 }
122
123 corosio::tcp_socket& socket() override { return sock_; }
124
125 void run(launcher launch) override
126 {
127 launch(ex_, [](corosio::tcp_socket* sock) -> capy::task<>
128 {
129 // handle connection using sock
130 co_return;
131 }(&sock_));
132 }
133 };
134
135 auto make_workers(io_context& ctx, int n)
136 {
137 std::vector<std::unique_ptr<tcp_server::worker_base>> v;
138 v.reserve(n);
139 for(int i = 0; i < n; ++i)
140 v.push_back(std::make_unique<my_worker>(ctx));
141 return v;
142 }
143
144 io_context ioc;
145 tcp_server srv(ioc, ioc.get_executor());
146 srv.set_workers(make_workers(ioc, 100));
147 @endcode
148
149 @see worker_base, set_workers, launcher
150 */
151 class BOOST_COROSIO_DECL tcp_server
152 {
153 public:
154 class worker_base; ///< Abstract base for connection handlers.
155 class launcher; ///< Move-only handle to launch worker coroutines.
156
157 private:
158 struct waiter
159 {
160 waiter* next;
161 std::coroutine_handle<> h;
162 detail::continuation_op cont_op;
163 worker_base* w;
164 };
165
166 struct impl;
167
168 static impl* make_impl(capy::execution_context& ctx);
169
170 impl* impl_;
171 capy::any_executor ex_;
172 waiter* waiters_ = nullptr;
173 worker_base* idle_head_ = nullptr; // Forward list: available workers
174 worker_base* active_head_ =
175 nullptr; // Doubly linked: workers handling connections
176 worker_base* active_tail_ = nullptr; // Tail for O(1) push_back
177 std::size_t active_accepts_ = 0; // Number of active do_accept coroutines
178 std::shared_ptr<void> storage_; // Owns the worker container (type-erased)
179 bool running_ = false;
180
181 // Idle list (forward/singly linked) - push front, pop front
182 45x void idle_push(worker_base* w) noexcept
183 {
184 45x w->next_ = idle_head_;
185 45x idle_head_ = w;
186 45x }
187
188 9x worker_base* idle_pop() noexcept
189 {
190 9x auto* w = idle_head_;
191 9x if (w)
192 9x idle_head_ = w->next_;
193 9x return w;
194 }
195
196 9x bool idle_empty() const noexcept
197 {
198 9x return idle_head_ == nullptr;
199 }
200
201 // Active list (doubly linked) - push back, remove anywhere
202 3x void active_push(worker_base* w) noexcept
203 {
204 3x w->next_ = nullptr;
205 3x w->prev_ = active_tail_;
206 3x if (active_tail_)
207 active_tail_->next_ = w;
208 else
209 3x active_head_ = w;
210 3x active_tail_ = w;
211 3x }
212
213 9x void active_remove(worker_base* w) noexcept
214 {
215 // Skip if not in active list (e.g., after failed accept)
216 9x if (w != active_head_ && w->prev_ == nullptr)
217 6x return;
218 3x if (w->prev_)
219 w->prev_->next_ = w->next_;
220 else
221 3x active_head_ = w->next_;
222 3x if (w->next_)
223 w->next_->prev_ = w->prev_;
224 else
225 3x active_tail_ = w->prev_;
226 3x w->prev_ = nullptr; // Mark as not in active list
227 }
228
229 template<capy::Executor Ex>
230 struct launch_wrapper
231 {
232 struct promise_type
233 {
234 Ex ex; // Executor stored directly in frame (outlives child tasks)
235 capy::io_env env_;
236
237 // For regular coroutines: first arg is executor, second is stop token
238 template<class E, class S, class... Args>
239 requires capy::Executor<std::decay_t<E>>
240 promise_type(E e, S s, Args&&...)
241 : ex(std::move(e))
242 , env_{
243 capy::executor_ref(ex), std::move(s),
244 capy::get_current_frame_allocator()}
245 {
246 }
247
248 // For lambda coroutines: first arg is closure, second is executor, third is stop token
249 template<class Closure, class E, class S, class... Args>
250 requires(!capy::Executor<std::decay_t<Closure>> &&
251 capy::Executor<std::decay_t<E>>)
252 3x promise_type(Closure&&, E e, S s, Args&&...)
253 3x : ex(std::move(e))
254 3x , env_{
255 3x capy::executor_ref(ex), std::move(s),
256 3x capy::get_current_frame_allocator()}
257 {
258 3x }
259
260 3x launch_wrapper get_return_object() noexcept
261 {
262 return {
263 3x std::coroutine_handle<promise_type>::from_promise(*this)};
264 }
265 3x std::suspend_always initial_suspend() noexcept
266 {
267 3x return {};
268 }
269 3x std::suspend_never final_suspend() noexcept
270 {
271 3x return {};
272 }
273 3x void return_void() noexcept {}
274 void unhandled_exception()
275 {
276 std::terminate();
277 }
278
279 // Inject io_env for IoAwaitable
280 template<capy::IoAwaitable Awaitable>
281 6x auto await_transform(Awaitable&& a)
282 {
283 using AwaitableT = std::decay_t<Awaitable>;
284 struct adapter
285 {
286 AwaitableT aw;
287 capy::io_env const* env;
288
289 bool await_ready()
290 {
291 return aw.await_ready();
292 }
293 decltype(auto) await_resume()
294 {
295 return aw.await_resume();
296 }
297
298 auto await_suspend(std::coroutine_handle<promise_type> h)
299 {
300 return aw.await_suspend(h, env);
301 }
302 };
303 12x return adapter{std::forward<Awaitable>(a), &env_};
304 6x }
305 };
306
307 std::coroutine_handle<promise_type> h;
308
309 3x launch_wrapper(std::coroutine_handle<promise_type> handle) noexcept
310 3x : h(handle)
311 {
312 3x }
313
314 3x ~launch_wrapper()
315 {
316 3x if (h)
317 h.destroy();
318 3x }
319
320 launch_wrapper(launch_wrapper&& o) noexcept
321 : h(std::exchange(o.h, nullptr))
322 {
323 }
324
325 launch_wrapper(launch_wrapper const&) = delete;
326 launch_wrapper& operator=(launch_wrapper const&) = delete;
327 launch_wrapper& operator=(launch_wrapper&&) = delete;
328 };
329
330 // Named functor to avoid incomplete lambda type in coroutine promise
331 template<class Executor>
332 struct launch_coro
333 {
334 3x launch_wrapper<Executor> operator()(
335 Executor,
336 std::stop_token,
337 tcp_server* self,
338 capy::task<void> t,
339 worker_base* wp)
340 {
341 // Executor and stop token stored in promise via constructor
342 co_await std::move(t);
343 co_await self->push(*wp); // worker goes back to idle list
344 6x }
345 };
346
347 class push_awaitable
348 {
349 tcp_server& self_;
350 worker_base& w_;
351 detail::continuation_op cont_op_;
352
353 public:
354 9x push_awaitable(tcp_server& self, worker_base& w) noexcept
355 9x : self_(self)
356 9x , w_(w)
357 {
358 9x }
359
360 9x bool await_ready() const noexcept
361 {
362 9x return false;
363 }
364
365 std::coroutine_handle<>
366 9x await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
367 {
368 // Symmetric transfer to server's executor
369 9x cont_op_.cont.h = h;
370 9x return self_.ex_.dispatch(cont_op_.cont);
371 }
372
373 9x void await_resume() noexcept
374 {
375 // Running on server executor - safe to modify lists
376 // Remove from active (if present), then wake waiter or add to idle
377 9x self_.active_remove(&w_);
378 9x if (self_.waiters_)
379 {
380 auto* wait = self_.waiters_;
381 self_.waiters_ = wait->next;
382 wait->w = &w_;
383 wait->cont_op.cont.h = wait->h;
384 self_.ex_.post(wait->cont_op.cont);
385 }
386 else
387 {
388 9x self_.idle_push(&w_);
389 }
390 9x }
391 };
392
393 class pop_awaitable
394 {
395 tcp_server& self_;
396 waiter wait_;
397
398 public:
399 9x pop_awaitable(tcp_server& self) noexcept : self_(self), wait_{} {}
400
401 9x bool await_ready() const noexcept
402 {
403 9x return !self_.idle_empty();
404 }
405
406 bool
407 await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
408 {
409 // Running on server executor (do_accept runs there)
410 wait_.h = h;
411 wait_.w = nullptr;
412 wait_.next = self_.waiters_;
413 self_.waiters_ = &wait_;
414 return true;
415 }
416
417 9x worker_base& await_resume() noexcept
418 {
419 // Running on server executor
420 9x if (wait_.w)
421 return *wait_.w; // Woken by push_awaitable
422 9x return *self_.idle_pop();
423 }
424 };
425
426 9x push_awaitable push(worker_base& w)
427 {
428 9x return push_awaitable{*this, w};
429 }
430
431 // Synchronous version for destructor/guard paths
432 // Must be called from server executor context
433 void push_sync(worker_base& w) noexcept
434 {
435 active_remove(&w);
436 if (waiters_)
437 {
438 auto* wait = waiters_;
439 waiters_ = wait->next;
440 wait->w = &w;
441 wait->cont_op.cont.h = wait->h;
442 ex_.post(wait->cont_op.cont);
443 }
444 else
445 {
446 idle_push(&w);
447 }
448 }
449
450 9x pop_awaitable pop()
451 {
452 9x return pop_awaitable{*this};
453 }
454
455 capy::task<void> do_accept(tcp_acceptor& acc);
456
457 public:
458 /** Abstract base class for connection handlers.
459
460 Derive from this class to implement custom connection handling.
461 Each worker owns a socket and is reused across multiple
462 connections to avoid per-connection allocation.
463
464 @see tcp_server, launcher
465 */
466 class BOOST_COROSIO_DECL worker_base
467 {
468 // Ordered largest to smallest for optimal packing
469 std::stop_source stop_; // ~16 bytes
470 worker_base* next_ = nullptr; // 8 bytes - used by idle and active lists
471 worker_base* prev_ = nullptr; // 8 bytes - used only by active list
472
473 friend class tcp_server;
474
475 public:
476 /// Construct a worker.
477 worker_base();
478
479 /// Destroy the worker.
480 virtual ~worker_base();
481
482 /** Handle an accepted connection.
483
484 Called when this worker is dispatched to handle a new
485 connection. The implementation must invoke the launcher
486 exactly once to start the handling coroutine.
487
488 @param launch Handle to launch the connection coroutine.
489 */
490 virtual void run(launcher launch) = 0;
491
492 /// Return the socket used for connections.
493 virtual corosio::tcp_socket& socket() = 0;
494 };
495
496 /** Move-only handle to launch a worker coroutine.
497
498 Passed to @ref worker_base::run to start the connection-handling
499 coroutine. The launcher ensures the worker returns to the idle
500 pool when the coroutine completes or if launching fails.
501
502 The launcher must be invoked exactly once via `operator()`.
503 If destroyed without invoking, the worker is returned to the
504 idle pool automatically.
505
506 @see worker_base::run
507 */
508 class BOOST_COROSIO_DECL launcher
509 {
510 tcp_server* srv_;
511 worker_base* w_;
512
513 friend class tcp_server;
514
515 3x launcher(tcp_server& srv, worker_base& w) noexcept : srv_(&srv), w_(&w)
516 {
517 3x }
518
519 public:
520 /// Return the worker to the pool if not launched.
521 3x ~launcher()
522 {
523 3x if (w_)
524 srv_->push_sync(*w_);
525 3x }
526
527 launcher(launcher&& o) noexcept
528 : srv_(o.srv_)
529 , w_(std::exchange(o.w_, nullptr))
530 {
531 }
532 launcher(launcher const&) = delete;
533 launcher& operator=(launcher const&) = delete;
534 launcher& operator=(launcher&&) = delete;
535
536 /** Launch the connection-handling coroutine.
537
538 Starts the given coroutine on the specified executor. When
539 the coroutine completes, the worker is automatically returned
540 to the idle pool.
541
542 @param ex The executor to run the coroutine on.
543 @param task The coroutine to execute.
544
545 @throws std::logic_error If this launcher was already invoked.
546 */
547 template<class Executor>
548 3x void operator()(Executor const& ex, capy::task<void> task)
549 {
550 3x if (!w_)
551 detail::throw_logic_error(); // launcher already invoked
552
553 3x auto* w = std::exchange(w_, nullptr);
554
555 // Worker is being dispatched - add to active list
556 3x srv_->active_push(w);
557
558 // Return worker to pool if coroutine setup throws
559 struct guard_t
560 {
561 tcp_server* srv;
562 worker_base* w;
563 3x ~guard_t()
564 {
565 3x if (w)
566 srv->push_sync(*w);
567 3x }
568 3x } guard{srv_, w};
569
570 // Reset worker's stop source for this connection
571 3x w->stop_ = {};
572 3x auto st = w->stop_.get_token();
573
574 3x auto wrapper =
575 3x launch_coro<Executor>{}(ex, st, srv_, std::move(task), w);
576
577 // Executor and stop token stored in promise via constructor
578 3x ex.post(std::exchange(wrapper.h, nullptr)); // Release before post
579 3x guard.w = nullptr; // Success - dismiss guard
580 3x }
581 };
582
583 /** Construct a TCP server.
584
585 @tparam Ctx Execution context type satisfying ExecutionContext.
586 @tparam Ex Executor type satisfying Executor.
587
588 @param ctx The execution context for socket operations.
589 @param ex The executor for dispatching coroutines.
590
591 @par Example
592 @code
593 tcp_server srv(ctx, ctx.get_executor());
594 srv.set_workers(make_workers(ctx, 100));
595 srv.bind(endpoint{...});
596 srv.start();
597 @endcode
598 */
599 template<capy::ExecutionContext Ctx, capy::Executor Ex>
600 9x tcp_server(Ctx& ctx, Ex ex) : impl_(make_impl(ctx))
601 9x , ex_(std::move(ex))
602 {
603 9x }
604
605 public:
606 /// Destroy the server, stopping all accept loops.
607 ~tcp_server();
608
609 tcp_server(tcp_server const&) = delete;
610 tcp_server& operator=(tcp_server const&) = delete;
611
612 /** Move construct from another server.
613
614 @param o The source server. After the move, @p o is
615 in a valid but unspecified state.
616 */
617 tcp_server(tcp_server&& o) noexcept;
618
619 /** Move assign from another server.
620
621 @param o The source server. After the move, @p o is
622 in a valid but unspecified state.
623
624 @return `*this`.
625 */
626 tcp_server& operator=(tcp_server&& o) noexcept;
627
628 /** Bind to a local endpoint.
629
630 Creates an acceptor listening on the specified endpoint.
631 Multiple endpoints can be bound by calling this method
632 multiple times before @ref start.
633
634 @param ep The local endpoint to bind to.
635
636 @return The error code if binding fails.
637 */
638 std::error_code bind(endpoint ep);
639
640 /** Set the worker pool.
641
642 Replaces any existing workers with the given range. Any
643 previous workers are released and the idle/active lists
644 are cleared before populating with new workers.
645
646 @tparam Range Forward range of pointer-like objects to worker_base.
647
648 @param workers Range of workers to manage. Each element must
649 support `std::to_address()` yielding `worker_base*`.
650
651 @par Example
652 @code
653 std::vector<std::unique_ptr<my_worker>> workers;
654 for(int i = 0; i < 100; ++i)
655 workers.push_back(std::make_unique<my_worker>(ctx));
656 srv.set_workers(std::move(workers));
657 @endcode
658 */
659 template<std::ranges::forward_range Range>
660 requires std::convertible_to<
661 decltype(std::to_address(
662 std::declval<std::ranges::range_value_t<Range>&>())),
663 worker_base*>
664 9x void set_workers(Range&& workers)
665 {
666 // Clear existing state
667 9x storage_.reset();
668 9x idle_head_ = nullptr;
669 9x active_head_ = nullptr;
670 9x active_tail_ = nullptr;
671
672 // Take ownership and populate idle list
673 using StorageType = std::decay_t<Range>;
674 9x auto* p = new StorageType(std::forward<Range>(workers));
675 9x storage_ = std::shared_ptr<void>(
676 9x p, [](void* ptr) { delete static_cast<StorageType*>(ptr); });
677 45x for (auto&& elem : *static_cast<StorageType*>(p))
678 36x idle_push(std::to_address(elem));
679 9x }
680
681 /** Start accepting connections.
682
683 Launches accept loops for all bound endpoints. Incoming
684 connections are dispatched to idle workers from the pool.
685
686 Calling `start()` on an already-running server has no effect.
687
688 @par Preconditions
689 - At least one endpoint bound via @ref bind.
690 - Workers provided to the constructor.
691 - If restarting, @ref join must have completed first.
692
693 @par Effects
694 Creates one accept coroutine per bound endpoint. Each coroutine
695 runs on the server's executor, waiting for connections and
696 dispatching them to idle workers.
697
698 @par Restart Sequence
699 To restart after stopping, complete the full shutdown cycle:
700 @code
701 srv.start();
702 ioc.run_for( 1s );
703 srv.stop(); // 1. Signal shutdown
704 ioc.run(); // 2. Drain remaining completions
705 srv.join(); // 3. Wait for accept loops
706
707 // Now safe to restart
708 srv.start();
709 ioc.run();
710 @endcode
711
712 @par Thread Safety
713 Not thread safe.
714
715 @throws std::logic_error If a previous session has not been
716 joined (accept loops still active).
717 */
718 void start();
719
720 /** Return the local endpoint for the i-th bound port.
721
722 @param index Zero-based index into the list of bound ports.
723
724 @return The local endpoint, or a default-constructed endpoint
725 if @p index is out of range or the acceptor is not open.
726 */
727 endpoint local_endpoint(std::size_t index = 0) const noexcept;
728
729 /** Stop accepting connections.
730
731 Signals all listening ports to stop accepting new connections
732 and requests cancellation of active workers via their stop tokens.
733
734 This function returns immediately; it does not wait for workers
735 to finish. Pending I/O operations complete asynchronously.
736
737 Calling `stop()` on a non-running server has no effect.
738
739 @par Effects
740 - Closes all acceptors (pending accepts complete with error).
741 - Requests stop on each active worker's stop token.
742 - Workers observing their stop token should exit promptly.
743
744 @par Postconditions
745 No new connections will be accepted. Active workers continue
746 until they observe their stop token or complete naturally.
747
748 @par What Happens Next
749 After calling `stop()`:
750 1. Let `ioc.run()` return (drains pending completions).
751 2. Call @ref join to wait for accept loops to finish.
752 3. Only then is it safe to restart or destroy the server.
753
754 @par Thread Safety
755 Not thread safe.
756
757 @see join, start
758 */
759 void stop();
760
761 /** Block until all accept loops complete.
762
763 Blocks the calling thread until all accept coroutines launched
764 by @ref start have finished executing. This synchronizes the
765 shutdown sequence, ensuring the server is fully stopped before
766 restarting or destroying it.
767
768 @par Preconditions
769 @ref stop has been called and `ioc.run()` has returned.
770
771 @par Postconditions
772 All accept loops have completed. The server is in the stopped
773 state and may be restarted via @ref start.
774
775 @par Example (Correct Usage)
776 @code
777 // main thread
778 srv.start();
779 ioc.run(); // Blocks until work completes
780 srv.join(); // Safe: called after ioc.run() returns
781 @endcode
782
783 @par WARNING: Deadlock Scenarios
784 Calling `join()` from the wrong context causes deadlock:
785
786 @code
787 // WRONG: calling join() from inside a worker coroutine
788 void run( launcher launch ) override
789 {
790 launch( ex, [this]() -> capy::task<>
791 {
792 srv_.join(); // DEADLOCK: blocks the executor
793 co_return;
794 }());
795 }
796
797 // WRONG: calling join() while ioc.run() is still active
798 std::thread t( [&]{ ioc.run(); } );
799 srv.stop();
800 srv.join(); // DEADLOCK: ioc.run() still running in thread t
801 @endcode
802
803 @par Thread Safety
804 May be called from any thread, but will deadlock if called
805 from within the io_context event loop or from a worker coroutine.
806
807 @see stop, start
808 */
809 void join();
810
811 private:
812 capy::task<> do_stop();
813 };
814
815 #ifdef _MSC_VER
816 #pragma warning(pop)
817 #endif
818
819 } // namespace boost::corosio
820
821 #endif
822