TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_tcp_acceptor.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_tcp_service.hpp>
23 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25 :
26 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27 :
28 : #include <memory>
29 : #include <mutex>
30 : #include <utility>
31 :
32 : #include <errno.h>
33 : #include <netinet/in.h>
34 : #include <sys/epoll.h>
35 : #include <sys/socket.h>
36 : #include <unistd.h>
37 :
38 : namespace boost::corosio::detail {
39 :
40 : /// State for epoll acceptor service.
41 : using epoll_tcp_acceptor_state =
42 : reactor_service_state<epoll_scheduler, epoll_tcp_acceptor>;
43 :
44 : /** epoll acceptor service implementation.
45 :
46 : Inherits from tcp_acceptor_service to enable runtime polymorphism.
47 : Uses key_type = tcp_acceptor_service for service lookup.
48 : */
49 : class BOOST_COROSIO_DECL epoll_tcp_acceptor_service final
50 : : public tcp_acceptor_service
51 : {
52 : public:
53 : explicit epoll_tcp_acceptor_service(capy::execution_context& ctx);
54 : ~epoll_tcp_acceptor_service() override;
55 :
56 : epoll_tcp_acceptor_service(epoll_tcp_acceptor_service const&) = delete;
57 : epoll_tcp_acceptor_service&
58 : operator=(epoll_tcp_acceptor_service const&) = delete;
59 :
60 : void shutdown() override;
61 :
62 : io_object::implementation* construct() override;
63 : void destroy(io_object::implementation*) override;
64 : void close(io_object::handle&) override;
65 : std::error_code open_acceptor_socket(
66 : tcp_acceptor::implementation& impl,
67 : int family,
68 : int type,
69 : int protocol) override;
70 : std::error_code
71 : bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
72 : std::error_code
73 : listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
74 :
75 HIT 152 : epoll_scheduler& scheduler() const noexcept
76 : {
77 152 : return state_->sched_;
78 : }
79 : void post(scheduler_op* op);
80 : void work_started() noexcept;
81 : void work_finished() noexcept;
82 :
83 : /** Get the TCP service for creating peer sockets during accept. */
84 : epoll_tcp_service* tcp_service() const noexcept;
85 :
86 : private:
87 : capy::execution_context& ctx_;
88 : std::unique_ptr<epoll_tcp_acceptor_state> state_;
89 : };
90 :
91 : inline void
92 6 : epoll_accept_op::cancel() noexcept
93 : {
94 6 : if (acceptor_impl_)
95 6 : acceptor_impl_->cancel_single_op(*this);
96 : else
97 MIS 0 : request_cancel();
98 HIT 6 : }
99 :
100 : inline void
101 4381 : epoll_accept_op::operator()()
102 : {
103 4381 : complete_accept_op<epoll_tcp_socket>(*this);
104 4381 : }
105 :
106 80 : inline epoll_tcp_acceptor::epoll_tcp_acceptor(
107 80 : epoll_tcp_acceptor_service& svc) noexcept
108 80 : : reactor_acceptor(svc)
109 : {
110 80 : }
111 :
112 : inline std::coroutine_handle<>
113 4381 : epoll_tcp_acceptor::accept(
114 : std::coroutine_handle<> h,
115 : capy::executor_ref ex,
116 : std::stop_token token,
117 : std::error_code* ec,
118 : io_object::implementation** impl_out)
119 : {
120 4381 : auto& op = acc_;
121 4381 : op.reset();
122 4381 : op.h = h;
123 4381 : op.ex = ex;
124 4381 : op.ec_out = ec;
125 4381 : op.impl_out = impl_out;
126 4381 : op.fd = fd_;
127 4381 : op.start(token, this);
128 :
129 4381 : sockaddr_storage peer_storage{};
130 4381 : socklen_t addrlen = sizeof(peer_storage);
131 : int accepted;
132 : do
133 : {
134 4381 : accepted = ::accept4(
135 : fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
136 : SOCK_NONBLOCK | SOCK_CLOEXEC);
137 : }
138 4381 : while (accepted < 0 && errno == EINTR);
139 :
140 4381 : if (accepted >= 0)
141 : {
142 : {
143 2 : std::lock_guard lock(desc_state_.mutex);
144 2 : desc_state_.read_ready = false;
145 2 : }
146 :
147 2 : if (svc_.scheduler().try_consume_inline_budget())
148 : {
149 MIS 0 : auto* socket_svc = svc_.tcp_service();
150 0 : if (socket_svc)
151 : {
152 : auto& impl =
153 0 : static_cast<epoll_tcp_socket&>(*socket_svc->construct());
154 0 : impl.set_socket(accepted);
155 :
156 0 : impl.desc_state_.fd = accepted;
157 : {
158 0 : std::lock_guard lock(impl.desc_state_.mutex);
159 0 : impl.desc_state_.read_op = nullptr;
160 0 : impl.desc_state_.write_op = nullptr;
161 0 : impl.desc_state_.connect_op = nullptr;
162 0 : }
163 0 : socket_svc->scheduler().register_descriptor(
164 : accepted, &impl.desc_state_);
165 :
166 0 : impl.set_endpoints(
167 : local_endpoint_, from_sockaddr(peer_storage));
168 :
169 0 : *ec = {};
170 0 : if (impl_out)
171 0 : *impl_out = &impl;
172 : }
173 : else
174 : {
175 0 : ::close(accepted);
176 0 : *ec = make_err(ENOENT);
177 0 : if (impl_out)
178 0 : *impl_out = nullptr;
179 : }
180 0 : op.cont_op.cont.h = h;
181 0 : return dispatch_coro(ex, op.cont_op.cont);
182 : }
183 :
184 HIT 2 : op.accepted_fd = accepted;
185 2 : op.peer_storage = peer_storage;
186 2 : op.complete(0, 0);
187 2 : op.impl_ptr = shared_from_this();
188 2 : svc_.post(&op);
189 2 : return std::noop_coroutine();
190 : }
191 :
192 4379 : if (errno == EAGAIN || errno == EWOULDBLOCK)
193 : {
194 4379 : op.impl_ptr = shared_from_this();
195 4379 : svc_.work_started();
196 :
197 4379 : std::lock_guard lock(desc_state_.mutex);
198 4379 : bool io_done = false;
199 4379 : if (desc_state_.read_ready)
200 : {
201 MIS 0 : desc_state_.read_ready = false;
202 0 : op.perform_io();
203 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
204 0 : if (!io_done)
205 0 : op.errn = 0;
206 : }
207 :
208 HIT 4379 : if (io_done || op.cancelled.load(std::memory_order_acquire))
209 : {
210 MIS 0 : svc_.post(&op);
211 0 : svc_.work_finished();
212 : }
213 : else
214 : {
215 HIT 4379 : desc_state_.read_op = &op;
216 : }
217 4379 : return std::noop_coroutine();
218 4379 : }
219 :
220 MIS 0 : op.complete(errno, 0);
221 0 : op.impl_ptr = shared_from_this();
222 0 : svc_.post(&op);
223 : // completion is always posted to scheduler queue, never inline.
224 0 : return std::noop_coroutine();
225 : }
226 :
227 : inline void
228 HIT 2 : epoll_tcp_acceptor::cancel() noexcept
229 : {
230 2 : do_cancel();
231 2 : }
232 :
233 : inline void
234 318 : epoll_tcp_acceptor::close_socket() noexcept
235 : {
236 318 : do_close_socket();
237 318 : }
238 :
239 320 : inline epoll_tcp_acceptor_service::epoll_tcp_acceptor_service(
240 320 : capy::execution_context& ctx)
241 320 : : ctx_(ctx)
242 320 : , state_(
243 : std::make_unique<epoll_tcp_acceptor_state>(
244 320 : ctx.use_service<epoll_scheduler>()))
245 : {
246 320 : }
247 :
248 640 : inline epoll_tcp_acceptor_service::~epoll_tcp_acceptor_service() {}
249 :
250 : inline void
251 320 : epoll_tcp_acceptor_service::shutdown()
252 : {
253 320 : std::lock_guard lock(state_->mutex_);
254 :
255 320 : while (auto* impl = state_->impl_list_.pop_front())
256 MIS 0 : impl->close_socket();
257 :
258 : // Don't clear impl_ptrs_ here — same rationale as
259 : // epoll_tcp_service::shutdown(). Let ~state_ release ptrs
260 : // after scheduler shutdown has drained all queued ops.
261 HIT 320 : }
262 :
263 : inline io_object::implementation*
264 80 : epoll_tcp_acceptor_service::construct()
265 : {
266 80 : auto impl = std::make_shared<epoll_tcp_acceptor>(*this);
267 80 : auto* raw = impl.get();
268 :
269 80 : std::lock_guard lock(state_->mutex_);
270 80 : state_->impl_ptrs_.emplace(raw, std::move(impl));
271 80 : state_->impl_list_.push_back(raw);
272 :
273 80 : return raw;
274 80 : }
275 :
276 : inline void
277 80 : epoll_tcp_acceptor_service::destroy(io_object::implementation* impl)
278 : {
279 80 : auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(impl);
280 80 : epoll_impl->close_socket();
281 80 : std::lock_guard lock(state_->mutex_);
282 80 : state_->impl_list_.remove(epoll_impl);
283 80 : state_->impl_ptrs_.erase(epoll_impl);
284 80 : }
285 :
286 : inline void
287 159 : epoll_tcp_acceptor_service::close(io_object::handle& h)
288 : {
289 159 : static_cast<epoll_tcp_acceptor*>(h.get())->close_socket();
290 159 : }
291 :
292 : inline std::error_code
293 79 : epoll_tcp_acceptor_service::open_acceptor_socket(
294 : tcp_acceptor::implementation& impl, int family, int type, int protocol)
295 : {
296 79 : auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(&impl);
297 79 : epoll_impl->close_socket();
298 :
299 79 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
300 79 : if (fd < 0)
301 MIS 0 : return make_err(errno);
302 :
303 HIT 79 : if (family == AF_INET6)
304 : {
305 8 : int val = 0; // dual-stack default
306 8 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
307 : }
308 :
309 79 : epoll_impl->fd_ = fd;
310 :
311 : // Set up descriptor state but do NOT register with epoll yet
312 79 : epoll_impl->desc_state_.fd = fd;
313 : {
314 79 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
315 79 : epoll_impl->desc_state_.read_op = nullptr;
316 79 : }
317 :
318 79 : return {};
319 : }
320 :
321 : inline std::error_code
322 78 : epoll_tcp_acceptor_service::bind_acceptor(
323 : tcp_acceptor::implementation& impl, endpoint ep)
324 : {
325 78 : return static_cast<epoll_tcp_acceptor*>(&impl)->do_bind(ep);
326 : }
327 :
328 : inline std::error_code
329 75 : epoll_tcp_acceptor_service::listen_acceptor(
330 : tcp_acceptor::implementation& impl, int backlog)
331 : {
332 75 : return static_cast<epoll_tcp_acceptor*>(&impl)->do_listen(backlog);
333 : }
334 :
335 : inline void
336 11 : epoll_tcp_acceptor_service::post(scheduler_op* op)
337 : {
338 11 : state_->sched_.post(op);
339 11 : }
340 :
341 : inline void
342 4379 : epoll_tcp_acceptor_service::work_started() noexcept
343 : {
344 4379 : state_->sched_.work_started();
345 4379 : }
346 :
347 : inline void
348 9 : epoll_tcp_acceptor_service::work_finished() noexcept
349 : {
350 9 : state_->sched_.work_finished();
351 9 : }
352 :
353 : inline epoll_tcp_service*
354 4372 : epoll_tcp_acceptor_service::tcp_service() const noexcept
355 : {
356 4372 : auto* svc = ctx_.find_service<detail::tcp_service>();
357 4372 : return svc ? dynamic_cast<epoll_tcp_service*>(svc) : nullptr;
358 : }
359 :
360 : } // namespace boost::corosio::detail
361 :
362 : #endif // BOOST_COROSIO_HAS_EPOLL
363 :
364 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
|