TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/select/select_tcp_acceptor.hpp>
22 : #include <boost/corosio/native/detail/select/select_tcp_service.hpp>
23 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25 :
26 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27 :
28 : #include <memory>
29 : #include <mutex>
30 : #include <utility>
31 :
32 : #include <errno.h>
33 : #include <fcntl.h>
34 : #include <netinet/in.h>
35 : #include <sys/select.h>
36 : #include <sys/socket.h>
37 : #include <unistd.h>
38 :
39 : namespace boost::corosio::detail {
40 :
41 : /// State for select acceptor service.
42 : using select_tcp_acceptor_state =
43 : reactor_service_state<select_scheduler, select_tcp_acceptor>;
44 :
45 : /** select acceptor service implementation.
46 :
47 : Inherits from tcp_acceptor_service to enable runtime polymorphism.
48 : Uses key_type = tcp_acceptor_service for service lookup.
49 : */
50 : class BOOST_COROSIO_DECL select_tcp_acceptor_service final
51 : : public tcp_acceptor_service
52 : {
53 : public:
54 : explicit select_tcp_acceptor_service(capy::execution_context& ctx);
55 : ~select_tcp_acceptor_service() override;
56 :
57 : select_tcp_acceptor_service(select_tcp_acceptor_service const&) = delete;
58 : select_tcp_acceptor_service&
59 : operator=(select_tcp_acceptor_service const&) = delete;
60 :
61 : void shutdown() override;
62 :
63 : io_object::implementation* construct() override;
64 : void destroy(io_object::implementation*) override;
65 : void close(io_object::handle&) override;
66 : std::error_code open_acceptor_socket(
67 : tcp_acceptor::implementation& impl,
68 : int family,
69 : int type,
70 : int protocol) override;
71 : std::error_code
72 : bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
73 : std::error_code
74 : listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
75 :
76 HIT 116 : select_scheduler& scheduler() const noexcept
77 : {
78 116 : return state_->sched_;
79 : }
80 : void post(scheduler_op* op);
81 : void work_started() noexcept;
82 : void work_finished() noexcept;
83 :
84 : /** Get the TCP service for creating peer sockets during accept. */
85 : select_tcp_service* tcp_service() const noexcept;
86 :
87 : private:
88 : capy::execution_context& ctx_;
89 : std::unique_ptr<select_tcp_acceptor_state> state_;
90 : };
91 :
92 : inline void
93 MIS 0 : select_accept_op::cancel() noexcept
94 : {
95 0 : if (acceptor_impl_)
96 0 : acceptor_impl_->cancel_single_op(*this);
97 : else
98 0 : request_cancel();
99 0 : }
100 :
101 : inline void
102 HIT 3547 : select_accept_op::operator()()
103 : {
104 3547 : complete_accept_op<select_tcp_socket>(*this);
105 3547 : }
106 :
107 61 : inline select_tcp_acceptor::select_tcp_acceptor(
108 61 : select_tcp_acceptor_service& svc) noexcept
109 61 : : reactor_acceptor(svc)
110 : {
111 61 : }
112 :
113 : inline std::coroutine_handle<>
114 3547 : select_tcp_acceptor::accept(
115 : std::coroutine_handle<> h,
116 : capy::executor_ref ex,
117 : std::stop_token token,
118 : std::error_code* ec,
119 : io_object::implementation** impl_out)
120 : {
121 3547 : auto& op = acc_;
122 3547 : op.reset();
123 3547 : op.h = h;
124 3547 : op.ex = ex;
125 3547 : op.ec_out = ec;
126 3547 : op.impl_out = impl_out;
127 3547 : op.fd = fd_;
128 3547 : op.start(token, this);
129 :
130 3547 : sockaddr_storage peer_storage{};
131 3547 : socklen_t addrlen = sizeof(peer_storage);
132 : int accepted;
133 : do
134 : {
135 : accepted =
136 3547 : ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
137 : }
138 3547 : while (accepted < 0 && errno == EINTR);
139 :
140 3547 : if (accepted >= 0)
141 : {
142 2 : if (accepted >= FD_SETSIZE)
143 : {
144 MIS 0 : ::close(accepted);
145 0 : op.complete(EINVAL, 0);
146 0 : op.impl_ptr = shared_from_this();
147 0 : svc_.post(&op);
148 0 : return std::noop_coroutine();
149 : }
150 :
151 HIT 2 : int flags = ::fcntl(accepted, F_GETFL, 0);
152 2 : if (flags == -1)
153 : {
154 MIS 0 : int err = errno;
155 0 : ::close(accepted);
156 0 : op.complete(err, 0);
157 0 : op.impl_ptr = shared_from_this();
158 0 : svc_.post(&op);
159 0 : return std::noop_coroutine();
160 : }
161 :
162 HIT 2 : if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
163 : {
164 MIS 0 : int err = errno;
165 0 : ::close(accepted);
166 0 : op.complete(err, 0);
167 0 : op.impl_ptr = shared_from_this();
168 0 : svc_.post(&op);
169 0 : return std::noop_coroutine();
170 : }
171 :
172 HIT 2 : if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
173 : {
174 MIS 0 : int err = errno;
175 0 : ::close(accepted);
176 0 : op.complete(err, 0);
177 0 : op.impl_ptr = shared_from_this();
178 0 : svc_.post(&op);
179 0 : return std::noop_coroutine();
180 : }
181 :
182 : {
183 HIT 2 : std::lock_guard lock(desc_state_.mutex);
184 2 : desc_state_.read_ready = false;
185 2 : }
186 :
187 2 : if (svc_.scheduler().try_consume_inline_budget())
188 : {
189 MIS 0 : auto* socket_svc = svc_.tcp_service();
190 0 : if (socket_svc)
191 : {
192 : auto& impl =
193 0 : static_cast<select_tcp_socket&>(*socket_svc->construct());
194 0 : impl.set_socket(accepted);
195 :
196 0 : impl.desc_state_.fd = accepted;
197 : {
198 0 : std::lock_guard lock(impl.desc_state_.mutex);
199 0 : impl.desc_state_.read_op = nullptr;
200 0 : impl.desc_state_.write_op = nullptr;
201 0 : impl.desc_state_.connect_op = nullptr;
202 0 : }
203 0 : socket_svc->scheduler().register_descriptor(
204 : accepted, &impl.desc_state_);
205 :
206 0 : impl.set_endpoints(
207 : local_endpoint_, from_sockaddr(peer_storage));
208 :
209 0 : *ec = {};
210 0 : if (impl_out)
211 0 : *impl_out = &impl;
212 : }
213 : else
214 : {
215 0 : ::close(accepted);
216 0 : *ec = make_err(ENOENT);
217 0 : if (impl_out)
218 0 : *impl_out = nullptr;
219 : }
220 0 : op.cont_op.cont.h = h;
221 0 : return dispatch_coro(ex, op.cont_op.cont);
222 : }
223 :
224 HIT 2 : op.accepted_fd = accepted;
225 2 : op.peer_storage = peer_storage;
226 2 : op.complete(0, 0);
227 2 : op.impl_ptr = shared_from_this();
228 2 : svc_.post(&op);
229 2 : return std::noop_coroutine();
230 : }
231 :
232 3545 : if (errno == EAGAIN || errno == EWOULDBLOCK)
233 : {
234 3545 : op.impl_ptr = shared_from_this();
235 3545 : svc_.work_started();
236 :
237 3545 : std::lock_guard lock(desc_state_.mutex);
238 3545 : bool io_done = false;
239 3545 : if (desc_state_.read_ready)
240 : {
241 MIS 0 : desc_state_.read_ready = false;
242 0 : op.perform_io();
243 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
244 0 : if (!io_done)
245 0 : op.errn = 0;
246 : }
247 :
248 HIT 3545 : if (io_done || op.cancelled.load(std::memory_order_acquire))
249 : {
250 MIS 0 : svc_.post(&op);
251 0 : svc_.work_finished();
252 : }
253 : else
254 : {
255 HIT 3545 : desc_state_.read_op = &op;
256 : }
257 3545 : return std::noop_coroutine();
258 3545 : }
259 :
260 MIS 0 : op.complete(errno, 0);
261 0 : op.impl_ptr = shared_from_this();
262 0 : svc_.post(&op);
263 0 : return std::noop_coroutine();
264 : }
265 :
266 : inline void
267 HIT 2 : select_tcp_acceptor::cancel() noexcept
268 : {
269 2 : do_cancel();
270 2 : }
271 :
272 : inline void
273 240 : select_tcp_acceptor::close_socket() noexcept
274 : {
275 240 : do_close_socket();
276 240 : }
277 :
278 195 : inline select_tcp_acceptor_service::select_tcp_acceptor_service(
279 195 : capy::execution_context& ctx)
280 195 : : ctx_(ctx)
281 195 : , state_(
282 : std::make_unique<select_tcp_acceptor_state>(
283 195 : ctx.use_service<select_scheduler>()))
284 : {
285 195 : }
286 :
287 390 : inline select_tcp_acceptor_service::~select_tcp_acceptor_service() {}
288 :
289 : inline void
290 195 : select_tcp_acceptor_service::shutdown()
291 : {
292 195 : std::lock_guard lock(state_->mutex_);
293 :
294 195 : while (auto* impl = state_->impl_list_.pop_front())
295 MIS 0 : impl->close_socket();
296 :
297 : // Don't clear impl_ptrs_ here — same rationale as
298 : // select_tcp_service::shutdown(). Let ~state_ release ptrs
299 : // after scheduler shutdown has drained all queued ops.
300 HIT 195 : }
301 :
302 : inline io_object::implementation*
303 61 : select_tcp_acceptor_service::construct()
304 : {
305 61 : auto impl = std::make_shared<select_tcp_acceptor>(*this);
306 61 : auto* raw = impl.get();
307 :
308 61 : std::lock_guard lock(state_->mutex_);
309 61 : state_->impl_ptrs_.emplace(raw, std::move(impl));
310 61 : state_->impl_list_.push_back(raw);
311 :
312 61 : return raw;
313 61 : }
314 :
315 : inline void
316 61 : select_tcp_acceptor_service::destroy(io_object::implementation* impl)
317 : {
318 61 : auto* select_impl = static_cast<select_tcp_acceptor*>(impl);
319 61 : select_impl->close_socket();
320 61 : std::lock_guard lock(state_->mutex_);
321 61 : state_->impl_list_.remove(select_impl);
322 61 : state_->impl_ptrs_.erase(select_impl);
323 61 : }
324 :
325 : inline void
326 120 : select_tcp_acceptor_service::close(io_object::handle& h)
327 : {
328 120 : static_cast<select_tcp_acceptor*>(h.get())->close_socket();
329 120 : }
330 :
331 : inline std::error_code
332 59 : select_tcp_acceptor_service::open_acceptor_socket(
333 : tcp_acceptor::implementation& impl, int family, int type, int protocol)
334 : {
335 59 : auto* select_impl = static_cast<select_tcp_acceptor*>(&impl);
336 59 : select_impl->close_socket();
337 :
338 59 : int fd = ::socket(family, type, protocol);
339 59 : if (fd < 0)
340 MIS 0 : return make_err(errno);
341 :
342 HIT 59 : int flags = ::fcntl(fd, F_GETFL, 0);
343 59 : if (flags == -1)
344 : {
345 MIS 0 : int errn = errno;
346 0 : ::close(fd);
347 0 : return make_err(errn);
348 : }
349 HIT 59 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
350 : {
351 MIS 0 : int errn = errno;
352 0 : ::close(fd);
353 0 : return make_err(errn);
354 : }
355 HIT 59 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
356 : {
357 MIS 0 : int errn = errno;
358 0 : ::close(fd);
359 0 : return make_err(errn);
360 : }
361 :
362 HIT 59 : if (fd >= FD_SETSIZE)
363 : {
364 MIS 0 : ::close(fd);
365 0 : return make_err(EMFILE);
366 : }
367 :
368 HIT 59 : if (family == AF_INET6)
369 : {
370 8 : int val = 0; // dual-stack default
371 8 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
372 : }
373 :
374 : #ifdef SO_NOSIGPIPE
375 : {
376 : int nosig = 1;
377 : ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
378 : }
379 : #endif
380 :
381 59 : select_impl->fd_ = fd;
382 :
383 : // Set up descriptor state but do NOT register with reactor yet
384 : // (registration happens in do_listen via reactor_acceptor base)
385 59 : select_impl->desc_state_.fd = fd;
386 : {
387 59 : std::lock_guard lock(select_impl->desc_state_.mutex);
388 59 : select_impl->desc_state_.read_op = nullptr;
389 59 : }
390 :
391 59 : return {};
392 : }
393 :
394 : inline std::error_code
395 58 : select_tcp_acceptor_service::bind_acceptor(
396 : tcp_acceptor::implementation& impl, endpoint ep)
397 : {
398 58 : return static_cast<select_tcp_acceptor*>(&impl)->do_bind(ep);
399 : }
400 :
401 : inline std::error_code
402 57 : select_tcp_acceptor_service::listen_acceptor(
403 : tcp_acceptor::implementation& impl, int backlog)
404 : {
405 57 : return static_cast<select_tcp_acceptor*>(&impl)->do_listen(backlog);
406 : }
407 :
408 : inline void
409 5 : select_tcp_acceptor_service::post(scheduler_op* op)
410 : {
411 5 : state_->sched_.post(op);
412 5 : }
413 :
414 : inline void
415 3545 : select_tcp_acceptor_service::work_started() noexcept
416 : {
417 3545 : state_->sched_.work_started();
418 3545 : }
419 :
420 : inline void
421 3 : select_tcp_acceptor_service::work_finished() noexcept
422 : {
423 3 : state_->sched_.work_finished();
424 3 : }
425 :
426 : inline select_tcp_service*
427 3544 : select_tcp_acceptor_service::tcp_service() const noexcept
428 : {
429 3544 : auto* svc = ctx_.find_service<detail::tcp_service>();
430 3544 : return svc ? dynamic_cast<select_tcp_service*>(svc) : nullptr;
431 : }
432 :
433 : } // namespace boost::corosio::detail
434 :
435 : #endif // BOOST_COROSIO_HAS_SELECT
436 :
437 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
|