TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
12 :
13 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 : #include <boost/corosio/io/io_object.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/corosio/detail/continuation_op.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 :
19 : #include <atomic>
20 : #include <coroutine>
21 : #include <cstddef>
22 : #include <memory>
23 : #include <optional>
24 : #include <stop_token>
25 : #include <system_error>
26 :
27 : #include <errno.h>
28 :
29 : #include <netinet/in.h>
30 : #include <sys/socket.h>
31 : #include <sys/uio.h>
32 :
33 : namespace boost::corosio::detail {
34 :
35 : /** Base operation for reactor-based backends.
36 :
37 : Holds per-operation state that depends on the concrete backend
38 : socket/acceptor types: coroutine handle, executor, output
39 : pointers, file descriptor, stop_callback, and type-specific
40 : impl pointers.
41 :
42 : Fields shared across all backends (errn, bytes_transferred,
43 : cancelled, impl_ptr, perform_io, complete) live in
44 : reactor_op_base so the scheduler and descriptor_state can
45 : access them without template instantiation.
46 :
47 : @tparam Socket The backend socket impl type (forward-declared).
48 : @tparam Acceptor The backend acceptor impl type (forward-declared).
49 : */
50 : template<class Socket, class Acceptor>
51 : struct reactor_op : reactor_op_base
52 : {
53 : /// Stop-token callback that invokes cancel() on the target op.
54 : struct canceller
55 : {
56 : reactor_op* op;
57 HIT 200 : void operator()() const noexcept
58 : {
59 200 : op->cancel();
60 200 : }
61 : };
62 :
63 : /// Caller's coroutine handle to resume on completion.
64 : std::coroutine_handle<> h;
65 :
66 : /// Scheduler-ready continuation for executor dispatch/post (wraps h).
67 : detail::continuation_op cont_op;
68 :
69 : /// Executor for dispatching the completion.
70 : capy::executor_ref ex;
71 :
72 : /// Output pointer for the error code.
73 : std::error_code* ec_out = nullptr;
74 :
75 : /// Output pointer for bytes transferred.
76 : std::size_t* bytes_out = nullptr;
77 :
78 : /// File descriptor this operation targets.
79 : int fd = -1;
80 :
81 : /// Stop-token callback registration.
82 : std::optional<std::stop_callback<canceller>> stop_cb;
83 :
84 : /// Owning socket impl (for stop_token cancellation).
85 : Socket* socket_impl_ = nullptr;
86 :
87 : /// Owning acceptor impl (for stop_token cancellation).
88 : Acceptor* acceptor_impl_ = nullptr;
89 :
90 72045 : reactor_op() = default;
91 :
92 : /// Reset operation state for reuse.
93 429659 : void reset() noexcept
94 : {
95 429659 : fd = -1;
96 429659 : errn = 0;
97 429659 : bytes_transferred = 0;
98 429659 : cancelled.store(false, std::memory_order_relaxed);
99 429659 : impl_ptr.reset();
100 429659 : socket_impl_ = nullptr;
101 429659 : acceptor_impl_ = nullptr;
102 429659 : }
103 :
104 : /// Return true if this is a read-direction operation.
105 41336 : virtual bool is_read_operation() const noexcept
106 : {
107 41336 : return false;
108 : }
109 :
110 : /// Cancel this operation via the owning impl.
111 : virtual void cancel() noexcept = 0;
112 :
113 : /// Destroy without invoking.
114 MIS 0 : void destroy() override
115 : {
116 0 : stop_cb.reset();
117 0 : reactor_op_base::destroy();
118 0 : }
119 :
120 : /// Arm the stop-token callback for a socket operation.
121 HIT 90974 : void start(std::stop_token const& token, Socket* impl)
122 : {
123 90974 : cancelled.store(false, std::memory_order_release);
124 90974 : stop_cb.reset();
125 90974 : socket_impl_ = impl;
126 90974 : acceptor_impl_ = nullptr;
127 :
128 90974 : if (token.stop_possible())
129 198 : stop_cb.emplace(token, canceller{this});
130 90974 : }
131 :
132 : /// Arm the stop-token callback for an acceptor operation.
133 7928 : void start(std::stop_token const& token, Acceptor* impl)
134 : {
135 7928 : cancelled.store(false, std::memory_order_release);
136 7928 : stop_cb.reset();
137 7928 : socket_impl_ = nullptr;
138 7928 : acceptor_impl_ = impl;
139 :
140 7928 : if (token.stop_possible())
141 9 : stop_cb.emplace(token, canceller{this});
142 7928 : }
143 : };
144 :
145 : /** Shared connect operation.
146 :
147 : Checks SO_ERROR for connect completion status. The operator()()
148 : and cancel() are provided by the concrete backend type.
149 :
150 : @tparam Base The backend's base op type.
151 : */
152 : template<class Base>
153 : struct reactor_connect_op : Base
154 : {
155 : /// Endpoint to connect to.
156 : endpoint target_endpoint;
157 :
158 : /// Reset operation state for reuse.
159 7931 : void reset() noexcept
160 : {
161 7931 : Base::reset();
162 7931 : target_endpoint = endpoint{};
163 7931 : }
164 :
165 7920 : void perform_io() noexcept override
166 : {
167 7920 : int err = 0;
168 7920 : socklen_t len = sizeof(err);
169 7920 : if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
170 MIS 0 : err = errno;
171 HIT 7920 : this->complete(err, 0);
172 7920 : }
173 : };
174 :
175 : /** Shared scatter-read operation.
176 :
177 : Uses readv() with an EINTR retry loop.
178 :
179 : @tparam Base The backend's base op type.
180 : */
181 : template<class Base>
182 : struct reactor_read_op : Base
183 : {
184 : /// Maximum scatter-gather buffer count.
185 : static constexpr std::size_t max_buffers = 16;
186 :
187 : /// Scatter-gather I/O vectors.
188 : iovec iovecs[max_buffers];
189 :
190 : /// Number of active I/O vectors.
191 : int iovec_count = 0;
192 :
193 : /// True for zero-length reads (completed immediately).
194 : bool empty_buffer_read = false;
195 :
196 : /// Return true (this is a read-direction operation).
197 41382 : bool is_read_operation() const noexcept override
198 : {
199 41382 : return !empty_buffer_read;
200 : }
201 :
202 207017 : void reset() noexcept
203 : {
204 207017 : Base::reset();
205 207017 : iovec_count = 0;
206 207017 : empty_buffer_read = false;
207 207017 : }
208 :
209 327 : void perform_io() noexcept override
210 : {
211 : ssize_t n;
212 : do
213 : {
214 327 : n = ::readv(this->fd, iovecs, iovec_count);
215 : }
216 327 : while (n < 0 && errno == EINTR);
217 :
218 327 : if (n >= 0)
219 97 : this->complete(0, static_cast<std::size_t>(n));
220 : else
221 230 : this->complete(errno, 0);
222 327 : }
223 : };
224 :
225 : /** Shared gather-write operation.
226 :
227 : Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
228 : which returns ssize_t (bytes written or -1 with errno set).
229 :
230 : @tparam Base The backend's base op type.
231 : @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
232 : */
233 : template<class Base, class WritePolicy>
234 : struct reactor_write_op : Base
235 : {
236 : /// The write syscall policy type.
237 : using write_policy = WritePolicy;
238 :
239 : /// Maximum scatter-gather buffer count.
240 : static constexpr std::size_t max_buffers = 16;
241 :
242 : /// Scatter-gather I/O vectors.
243 : iovec iovecs[max_buffers];
244 :
245 : /// Number of active I/O vectors.
246 : int iovec_count = 0;
247 :
248 206719 : void reset() noexcept
249 : {
250 206719 : Base::reset();
251 206719 : iovec_count = 0;
252 206719 : }
253 :
254 MIS 0 : void perform_io() noexcept override
255 : {
256 0 : ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
257 0 : if (n >= 0)
258 0 : this->complete(0, static_cast<std::size_t>(n));
259 : else
260 0 : this->complete(errno, 0);
261 0 : }
262 : };
263 :
264 : /** Shared accept operation.
265 :
266 : Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
267 : which returns the accepted fd or -1 with errno set.
268 :
269 : @tparam Base The backend's base op type.
270 : @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
271 : */
272 : template<class Base, class AcceptPolicy>
273 : struct reactor_accept_op : Base
274 : {
275 : /// File descriptor of the accepted connection.
276 : int accepted_fd = -1;
277 :
278 : /// Pointer to the peer socket implementation.
279 : io_object::implementation* peer_impl = nullptr;
280 :
281 : /// Output pointer for the accepted implementation.
282 : io_object::implementation** impl_out = nullptr;
283 :
284 : /// Peer address storage filled by accept.
285 : sockaddr_storage peer_storage{};
286 :
287 HIT 7928 : void reset() noexcept
288 : {
289 7928 : Base::reset();
290 7928 : accepted_fd = -1;
291 7928 : peer_impl = nullptr;
292 7928 : impl_out = nullptr;
293 7928 : peer_storage = {};
294 7928 : }
295 :
296 7912 : void perform_io() noexcept override
297 : {
298 7912 : int new_fd = AcceptPolicy::do_accept(this->fd, peer_storage);
299 7912 : if (new_fd >= 0)
300 : {
301 7912 : accepted_fd = new_fd;
302 7912 : this->complete(0, 0);
303 : }
304 : else
305 : {
306 MIS 0 : this->complete(errno, 0);
307 : }
308 HIT 7912 : }
309 : };
310 :
311 : /** Shared connected send operation for datagram sockets.
312 :
313 : Uses sendmsg() with msg_name=nullptr (connected mode).
314 :
315 : @tparam Base The backend's base op type.
316 : */
317 : template<class Base>
318 : struct reactor_send_op : Base
319 : {
320 : /// Maximum scatter-gather buffer count.
321 : static constexpr std::size_t max_buffers = 16;
322 :
323 : /// Scatter-gather I/O vectors.
324 : iovec iovecs[max_buffers];
325 :
326 : /// Number of active I/O vectors.
327 : int iovec_count = 0;
328 :
329 6 : void reset() noexcept
330 : {
331 6 : Base::reset();
332 6 : iovec_count = 0;
333 6 : }
334 :
335 MIS 0 : void perform_io() noexcept override
336 : {
337 0 : msghdr msg{};
338 0 : msg.msg_iov = iovecs;
339 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
340 :
341 : #ifdef MSG_NOSIGNAL
342 0 : constexpr int send_flags = MSG_NOSIGNAL;
343 : #else
344 : constexpr int send_flags = 0;
345 : #endif
346 :
347 : ssize_t n;
348 : do
349 : {
350 0 : n = ::sendmsg(this->fd, &msg, send_flags);
351 : }
352 0 : while (n < 0 && errno == EINTR);
353 :
354 0 : if (n >= 0)
355 0 : this->complete(0, static_cast<std::size_t>(n));
356 : else
357 0 : this->complete(errno, 0);
358 0 : }
359 : };
360 :
361 : /** Shared connected recv operation for datagram sockets.
362 :
363 : Uses recvmsg() with msg_name=nullptr (connected mode).
364 : Unlike reactor_read_op, does not map n==0 to EOF
365 : (zero-length datagrams are valid).
366 :
367 : @tparam Base The backend's base op type.
368 : */
369 : template<class Base>
370 : struct reactor_recv_op : Base
371 : {
372 : /// Maximum scatter-gather buffer count.
373 : static constexpr std::size_t max_buffers = 16;
374 :
375 : /// Scatter-gather I/O vectors.
376 : iovec iovecs[max_buffers];
377 :
378 : /// Number of active I/O vectors.
379 : int iovec_count = 0;
380 :
381 : /// Return true (this is a read-direction operation).
382 HIT 2 : bool is_read_operation() const noexcept override
383 : {
384 2 : return true;
385 : }
386 :
387 4 : void reset() noexcept
388 : {
389 4 : Base::reset();
390 4 : iovec_count = 0;
391 4 : }
392 :
393 MIS 0 : void perform_io() noexcept override
394 : {
395 0 : msghdr msg{};
396 0 : msg.msg_iov = iovecs;
397 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
398 :
399 : ssize_t n;
400 : do
401 : {
402 0 : n = ::recvmsg(this->fd, &msg, 0);
403 : }
404 0 : while (n < 0 && errno == EINTR);
405 :
406 0 : if (n >= 0)
407 0 : this->complete(0, static_cast<std::size_t>(n));
408 : else
409 0 : this->complete(errno, 0);
410 0 : }
411 : };
412 :
413 : /** Shared send_to operation for datagram sockets.
414 :
415 : Uses sendmsg() with the destination endpoint in msg_name.
416 :
417 : @tparam Base The backend's base op type.
418 : */
419 : template<class Base>
420 : struct reactor_send_to_op : Base
421 : {
422 : /// Maximum scatter-gather buffer count.
423 : static constexpr std::size_t max_buffers = 16;
424 :
425 : /// Scatter-gather I/O vectors.
426 : iovec iovecs[max_buffers];
427 :
428 : /// Number of active I/O vectors.
429 : int iovec_count = 0;
430 :
431 : /// Destination address storage.
432 : sockaddr_storage dest_storage{};
433 :
434 : /// Destination address length.
435 : socklen_t dest_len = 0;
436 :
437 HIT 22 : void reset() noexcept
438 : {
439 22 : Base::reset();
440 22 : iovec_count = 0;
441 22 : dest_storage = {};
442 22 : dest_len = 0;
443 22 : }
444 :
445 MIS 0 : void perform_io() noexcept override
446 : {
447 0 : msghdr msg{};
448 0 : msg.msg_name = &dest_storage;
449 0 : msg.msg_namelen = dest_len;
450 0 : msg.msg_iov = iovecs;
451 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
452 :
453 : #ifdef MSG_NOSIGNAL
454 0 : constexpr int send_flags = MSG_NOSIGNAL;
455 : #else
456 : constexpr int send_flags = 0;
457 : #endif
458 :
459 : ssize_t n;
460 : do
461 : {
462 0 : n = ::sendmsg(this->fd, &msg, send_flags);
463 : }
464 0 : while (n < 0 && errno == EINTR);
465 :
466 0 : if (n >= 0)
467 0 : this->complete(0, static_cast<std::size_t>(n));
468 : else
469 0 : this->complete(errno, 0);
470 0 : }
471 : };
472 :
473 : /** Shared recv_from operation for datagram sockets.
474 :
475 : Uses recvmsg() with msg_name to capture the source endpoint.
476 :
477 : @tparam Base The backend's base op type.
478 : */
479 : template<class Base>
480 : struct reactor_recv_from_op : Base
481 : {
482 : /// Maximum scatter-gather buffer count.
483 : static constexpr std::size_t max_buffers = 16;
484 :
485 : /// Scatter-gather I/O vectors.
486 : iovec iovecs[max_buffers];
487 :
488 : /// Number of active I/O vectors.
489 : int iovec_count = 0;
490 :
491 : /// Source address storage filled by recvmsg.
492 : sockaddr_storage source_storage{};
493 :
494 : /// Output pointer for the source endpoint (set by do_recv_from).
495 : endpoint* source_out = nullptr;
496 :
497 : /// Return true (this is a read-direction operation).
498 0 : bool is_read_operation() const noexcept override
499 : {
500 0 : return true;
501 : }
502 :
503 HIT 32 : void reset() noexcept
504 : {
505 32 : Base::reset();
506 32 : iovec_count = 0;
507 32 : source_storage = {};
508 32 : source_out = nullptr;
509 32 : }
510 :
511 2 : void perform_io() noexcept override
512 : {
513 2 : msghdr msg{};
514 2 : msg.msg_name = &source_storage;
515 2 : msg.msg_namelen = sizeof(source_storage);
516 2 : msg.msg_iov = iovecs;
517 2 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
518 :
519 : ssize_t n;
520 : do
521 : {
522 2 : n = ::recvmsg(this->fd, &msg, 0);
523 : }
524 2 : while (n < 0 && errno == EINTR);
525 :
526 2 : if (n >= 0)
527 2 : this->complete(0, static_cast<std::size_t>(n));
528 : else
529 MIS 0 : this->complete(errno, 0);
530 HIT 2 : }
531 : };
532 :
533 : } // namespace boost::corosio::detail
534 :
535 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
|