include/boost/corosio/native/detail/reactor/reactor_op.hpp

68.5% Lines (263/384) 76.5% List of functions (52/68)
reactor_op.hpp
f(x) Functions (68)
Function Calls Lines Blocks
boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>::canceller::operator()() const :57 104x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_udp_socket, boost::corosio::detail::epoll_tcp_acceptor>::canceller::operator()() const :57 1x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>::canceller::operator()() const :57 94x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_udp_socket, boost::corosio::detail::select_tcp_acceptor>::canceller::operator()() const :57 1x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>::reactor_op() :90 39617x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_udp_socket, boost::corosio::detail::epoll_tcp_acceptor>::reactor_op() :90 195x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>::reactor_op() :90 32038x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_udp_socket, boost::corosio::detail::select_tcp_acceptor>::reactor_op() :90 195x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>::reset() :93 198655x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_udp_socket, boost::corosio::detail::epoll_tcp_acceptor>::reset() :93 37x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>::reset() :93 230930x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_udp_socket, boost::corosio::detail::select_tcp_acceptor>::reset() :93 37x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>::is_read_operation() const :105 18962x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_udp_socket, boost::corosio::detail::epoll_tcp_acceptor>::is_read_operation() const :105 8x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>::is_read_operation() const :105 22358x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_udp_socket, boost::corosio::detail::select_tcp_acceptor>::is_read_operation() const :105 8x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>::destroy() :114 0 0.0% 0.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_udp_socket, boost::corosio::detail::epoll_tcp_acceptor>::destroy() :114 0 0.0% 0.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>::destroy() :114 0 0.0% 0.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_udp_socket, boost::corosio::detail::select_tcp_acceptor>::destroy() :114 0 0.0% 0.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>::start(std::stop_token const&, boost::corosio::detail::epoll_tcp_socket*) :121 42488x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_udp_socket, boost::corosio::detail::epoll_tcp_acceptor>::start(std::stop_token const&, boost::corosio::detail::epoll_udp_socket*) :121 22x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>::start(std::stop_token const&, boost::corosio::detail::select_tcp_socket*) :121 48442x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_udp_socket, boost::corosio::detail::select_tcp_acceptor>::start(std::stop_token const&, boost::corosio::detail::select_udp_socket*) :121 22x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>::start(std::stop_token const&, boost::corosio::detail::epoll_tcp_acceptor*) :133 4381x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>::start(std::stop_token const&, boost::corosio::detail::select_tcp_acceptor*) :133 3547x 87.5% 71.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::epoll_datagram_op>::reset() :159 5x 100.0% 100.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::epoll_op>::reset() :159 4374x 100.0% 100.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::select_datagram_op>::reset() :159 5x 100.0% 100.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::select_op>::reset() :159 3547x 100.0% 100.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::epoll_datagram_op>::perform_io() :165 0 0.0% 0.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::epoll_op>::perform_io() :165 4373x 85.7% 80.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::select_datagram_op>::perform_io() :165 0 0.0% 0.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::select_op>::perform_io() :165 3547x 85.7% 80.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::epoll_op>::is_read_operation() const :197 18995x 100.0% 100.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::select_op>::is_read_operation() const :197 22387x 100.0% 100.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::epoll_op>::reset() :202 95026x 100.0% 100.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::select_op>::reset() :202 111991x 100.0% 100.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::epoll_op>::perform_io() :209 143x 100.0% 100.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::select_op>::perform_io() :209 184x 100.0% 100.0% boost::corosio::detail::reactor_write_op<boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_write_policy>::reset() :248 94874x 100.0% 100.0% boost::corosio::detail::reactor_write_op<boost::corosio::detail::select_op, boost::corosio::detail::select_write_policy>::reset() :248 111845x 100.0% 100.0% boost::corosio::detail::reactor_write_op<boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_write_policy>::perform_io() :254 0 0.0% 0.0% boost::corosio::detail::reactor_write_op<boost::corosio::detail::select_op, boost::corosio::detail::select_write_policy>::perform_io() :254 0 0.0% 0.0% boost::corosio::detail::reactor_accept_op<boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_accept_policy>::reset() :287 4381x 100.0% 100.0% boost::corosio::detail::reactor_accept_op<boost::corosio::detail::select_op, boost::corosio::detail::select_accept_policy>::reset() :287 3547x 100.0% 100.0% boost::corosio::detail::reactor_accept_op<boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_accept_policy>::perform_io() :296 4370x 85.7% 80.0% boost::corosio::detail::reactor_accept_op<boost::corosio::detail::select_op, boost::corosio::detail::select_accept_policy>::perform_io() :296 3542x 85.7% 80.0% boost::corosio::detail::reactor_send_op<boost::corosio::detail::epoll_datagram_op>::reset() :329 3x 100.0% 100.0% boost::corosio::detail::reactor_send_op<boost::corosio::detail::select_datagram_op>::reset() :329 3x 100.0% 100.0% boost::corosio::detail::reactor_send_op<boost::corosio::detail::epoll_datagram_op>::perform_io() :335 0 0.0% 0.0% boost::corosio::detail::reactor_send_op<boost::corosio::detail::select_datagram_op>::perform_io() :335 0 0.0% 0.0% boost::corosio::detail::reactor_recv_op<boost::corosio::detail::epoll_datagram_op>::is_read_operation() const :382 1x 100.0% 100.0% boost::corosio::detail::reactor_recv_op<boost::corosio::detail::select_datagram_op>::is_read_operation() const :382 1x 100.0% 100.0% boost::corosio::detail::reactor_recv_op<boost::corosio::detail::epoll_datagram_op>::reset() :387 2x 100.0% 100.0% boost::corosio::detail::reactor_recv_op<boost::corosio::detail::select_datagram_op>::reset() :387 2x 100.0% 100.0% boost::corosio::detail::reactor_recv_op<boost::corosio::detail::epoll_datagram_op>::perform_io() :393 0 0.0% 0.0% boost::corosio::detail::reactor_recv_op<boost::corosio::detail::select_datagram_op>::perform_io() :393 0 0.0% 0.0% boost::corosio::detail::reactor_send_to_op<boost::corosio::detail::epoll_datagram_op>::reset() :437 11x 100.0% 100.0% boost::corosio::detail::reactor_send_to_op<boost::corosio::detail::select_datagram_op>::reset() :437 11x 100.0% 100.0% boost::corosio::detail::reactor_send_to_op<boost::corosio::detail::epoll_datagram_op>::perform_io() :445 0 0.0% 0.0% boost::corosio::detail::reactor_send_to_op<boost::corosio::detail::select_datagram_op>::perform_io() :445 0 0.0% 0.0% boost::corosio::detail::reactor_recv_from_op<boost::corosio::detail::epoll_datagram_op>::is_read_operation() const :498 0 0.0% 0.0% boost::corosio::detail::reactor_recv_from_op<boost::corosio::detail::select_datagram_op>::is_read_operation() const :498 0 0.0% 0.0% boost::corosio::detail::reactor_recv_from_op<boost::corosio::detail::epoll_datagram_op>::reset() :503 16x 100.0% 100.0% boost::corosio::detail::reactor_recv_from_op<boost::corosio::detail::select_datagram_op>::reset() :503 16x 100.0% 100.0% boost::corosio::detail::reactor_recv_from_op<boost::corosio::detail::epoll_datagram_op>::perform_io() :511 1x 91.7% 75.0% boost::corosio::detail::reactor_recv_from_op<boost::corosio::detail::select_datagram_op>::perform_io() :511 1x 91.7% 75.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
12
13 #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 #include <boost/corosio/io/io_object.hpp>
15 #include <boost/corosio/endpoint.hpp>
16 #include <boost/corosio/detail/continuation_op.hpp>
17 #include <boost/capy/ex/executor_ref.hpp>
18
19 #include <atomic>
20 #include <coroutine>
21 #include <cstddef>
22 #include <memory>
23 #include <optional>
24 #include <stop_token>
25 #include <system_error>
26
27 #include <errno.h>
28
29 #include <netinet/in.h>
30 #include <sys/socket.h>
31 #include <sys/uio.h>
32
33 namespace boost::corosio::detail {
34
35 /** Base operation for reactor-based backends.
36
37 Holds per-operation state that depends on the concrete backend
38 socket/acceptor types: coroutine handle, executor, output
39 pointers, file descriptor, stop_callback, and type-specific
40 impl pointers.
41
42 Fields shared across all backends (errn, bytes_transferred,
43 cancelled, impl_ptr, perform_io, complete) live in
44 reactor_op_base so the scheduler and descriptor_state can
45 access them without template instantiation.
46
47 @tparam Socket The backend socket impl type (forward-declared).
48 @tparam Acceptor The backend acceptor impl type (forward-declared).
49 */
50 template<class Socket, class Acceptor>
51 struct reactor_op : reactor_op_base
52 {
53 /// Stop-token callback that invokes cancel() on the target op.
54 struct canceller
55 {
56 reactor_op* op;
57 200x void operator()() const noexcept
58 {
59 200x op->cancel();
60 200x }
61 };
62
63 /// Caller's coroutine handle to resume on completion.
64 std::coroutine_handle<> h;
65
66 /// Scheduler-ready continuation for executor dispatch/post (wraps h).
67 detail::continuation_op cont_op;
68
69 /// Executor for dispatching the completion.
70 capy::executor_ref ex;
71
72 /// Output pointer for the error code.
73 std::error_code* ec_out = nullptr;
74
75 /// Output pointer for bytes transferred.
76 std::size_t* bytes_out = nullptr;
77
78 /// File descriptor this operation targets.
79 int fd = -1;
80
81 /// Stop-token callback registration.
82 std::optional<std::stop_callback<canceller>> stop_cb;
83
84 /// Owning socket impl (for stop_token cancellation).
85 Socket* socket_impl_ = nullptr;
86
87 /// Owning acceptor impl (for stop_token cancellation).
88 Acceptor* acceptor_impl_ = nullptr;
89
90 72045x reactor_op() = default;
91
92 /// Reset operation state for reuse.
93 429659x void reset() noexcept
94 {
95 429659x fd = -1;
96 429659x errn = 0;
97 429659x bytes_transferred = 0;
98 429659x cancelled.store(false, std::memory_order_relaxed);
99 429659x impl_ptr.reset();
100 429659x socket_impl_ = nullptr;
101 429659x acceptor_impl_ = nullptr;
102 429659x }
103
104 /// Return true if this is a read-direction operation.
105 41336x virtual bool is_read_operation() const noexcept
106 {
107 41336x return false;
108 }
109
110 /// Cancel this operation via the owning impl.
111 virtual void cancel() noexcept = 0;
112
113 /// Destroy without invoking.
114 void destroy() override
115 {
116 stop_cb.reset();
117 reactor_op_base::destroy();
118 }
119
120 /// Arm the stop-token callback for a socket operation.
121 90974x void start(std::stop_token const& token, Socket* impl)
122 {
123 90974x cancelled.store(false, std::memory_order_release);
124 90974x stop_cb.reset();
125 90974x socket_impl_ = impl;
126 90974x acceptor_impl_ = nullptr;
127
128 90974x if (token.stop_possible())
129 198x stop_cb.emplace(token, canceller{this});
130 90974x }
131
132 /// Arm the stop-token callback for an acceptor operation.
133 7928x void start(std::stop_token const& token, Acceptor* impl)
134 {
135 7928x cancelled.store(false, std::memory_order_release);
136 7928x stop_cb.reset();
137 7928x socket_impl_ = nullptr;
138 7928x acceptor_impl_ = impl;
139
140 7928x if (token.stop_possible())
141 9x stop_cb.emplace(token, canceller{this});
142 7928x }
143 };
144
145 /** Shared connect operation.
146
147 Checks SO_ERROR for connect completion status. The operator()()
148 and cancel() are provided by the concrete backend type.
149
150 @tparam Base The backend's base op type.
151 */
152 template<class Base>
153 struct reactor_connect_op : Base
154 {
155 /// Endpoint to connect to.
156 endpoint target_endpoint;
157
158 /// Reset operation state for reuse.
159 7931x void reset() noexcept
160 {
161 7931x Base::reset();
162 7931x target_endpoint = endpoint{};
163 7931x }
164
165 7920x void perform_io() noexcept override
166 {
167 7920x int err = 0;
168 7920x socklen_t len = sizeof(err);
169 7920x if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
170 err = errno;
171 7920x this->complete(err, 0);
172 7920x }
173 };
174
175 /** Shared scatter-read operation.
176
177 Uses readv() with an EINTR retry loop.
178
179 @tparam Base The backend's base op type.
180 */
181 template<class Base>
182 struct reactor_read_op : Base
183 {
184 /// Maximum scatter-gather buffer count.
185 static constexpr std::size_t max_buffers = 16;
186
187 /// Scatter-gather I/O vectors.
188 iovec iovecs[max_buffers];
189
190 /// Number of active I/O vectors.
191 int iovec_count = 0;
192
193 /// True for zero-length reads (completed immediately).
194 bool empty_buffer_read = false;
195
196 /// Return true (this is a read-direction operation).
197 41382x bool is_read_operation() const noexcept override
198 {
199 41382x return !empty_buffer_read;
200 }
201
202 207017x void reset() noexcept
203 {
204 207017x Base::reset();
205 207017x iovec_count = 0;
206 207017x empty_buffer_read = false;
207 207017x }
208
209 327x void perform_io() noexcept override
210 {
211 ssize_t n;
212 do
213 {
214 327x n = ::readv(this->fd, iovecs, iovec_count);
215 }
216 327x while (n < 0 && errno == EINTR);
217
218 327x if (n >= 0)
219 97x this->complete(0, static_cast<std::size_t>(n));
220 else
221 230x this->complete(errno, 0);
222 327x }
223 };
224
225 /** Shared gather-write operation.
226
227 Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
228 which returns ssize_t (bytes written or -1 with errno set).
229
230 @tparam Base The backend's base op type.
231 @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
232 */
233 template<class Base, class WritePolicy>
234 struct reactor_write_op : Base
235 {
236 /// The write syscall policy type.
237 using write_policy = WritePolicy;
238
239 /// Maximum scatter-gather buffer count.
240 static constexpr std::size_t max_buffers = 16;
241
242 /// Scatter-gather I/O vectors.
243 iovec iovecs[max_buffers];
244
245 /// Number of active I/O vectors.
246 int iovec_count = 0;
247
248 206719x void reset() noexcept
249 {
250 206719x Base::reset();
251 206719x iovec_count = 0;
252 206719x }
253
254 void perform_io() noexcept override
255 {
256 ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
257 if (n >= 0)
258 this->complete(0, static_cast<std::size_t>(n));
259 else
260 this->complete(errno, 0);
261 }
262 };
263
264 /** Shared accept operation.
265
266 Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
267 which returns the accepted fd or -1 with errno set.
268
269 @tparam Base The backend's base op type.
270 @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
271 */
272 template<class Base, class AcceptPolicy>
273 struct reactor_accept_op : Base
274 {
275 /// File descriptor of the accepted connection.
276 int accepted_fd = -1;
277
278 /// Pointer to the peer socket implementation.
279 io_object::implementation* peer_impl = nullptr;
280
281 /// Output pointer for the accepted implementation.
282 io_object::implementation** impl_out = nullptr;
283
284 /// Peer address storage filled by accept.
285 sockaddr_storage peer_storage{};
286
287 7928x void reset() noexcept
288 {
289 7928x Base::reset();
290 7928x accepted_fd = -1;
291 7928x peer_impl = nullptr;
292 7928x impl_out = nullptr;
293 7928x peer_storage = {};
294 7928x }
295
296 7912x void perform_io() noexcept override
297 {
298 7912x int new_fd = AcceptPolicy::do_accept(this->fd, peer_storage);
299 7912x if (new_fd >= 0)
300 {
301 7912x accepted_fd = new_fd;
302 7912x this->complete(0, 0);
303 }
304 else
305 {
306 this->complete(errno, 0);
307 }
308 7912x }
309 };
310
311 /** Shared connected send operation for datagram sockets.
312
313 Uses sendmsg() with msg_name=nullptr (connected mode).
314
315 @tparam Base The backend's base op type.
316 */
317 template<class Base>
318 struct reactor_send_op : Base
319 {
320 /// Maximum scatter-gather buffer count.
321 static constexpr std::size_t max_buffers = 16;
322
323 /// Scatter-gather I/O vectors.
324 iovec iovecs[max_buffers];
325
326 /// Number of active I/O vectors.
327 int iovec_count = 0;
328
329 6x void reset() noexcept
330 {
331 6x Base::reset();
332 6x iovec_count = 0;
333 6x }
334
335 void perform_io() noexcept override
336 {
337 msghdr msg{};
338 msg.msg_iov = iovecs;
339 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
340
341 #ifdef MSG_NOSIGNAL
342 constexpr int send_flags = MSG_NOSIGNAL;
343 #else
344 constexpr int send_flags = 0;
345 #endif
346
347 ssize_t n;
348 do
349 {
350 n = ::sendmsg(this->fd, &msg, send_flags);
351 }
352 while (n < 0 && errno == EINTR);
353
354 if (n >= 0)
355 this->complete(0, static_cast<std::size_t>(n));
356 else
357 this->complete(errno, 0);
358 }
359 };
360
361 /** Shared connected recv operation for datagram sockets.
362
363 Uses recvmsg() with msg_name=nullptr (connected mode).
364 Unlike reactor_read_op, does not map n==0 to EOF
365 (zero-length datagrams are valid).
366
367 @tparam Base The backend's base op type.
368 */
369 template<class Base>
370 struct reactor_recv_op : Base
371 {
372 /// Maximum scatter-gather buffer count.
373 static constexpr std::size_t max_buffers = 16;
374
375 /// Scatter-gather I/O vectors.
376 iovec iovecs[max_buffers];
377
378 /// Number of active I/O vectors.
379 int iovec_count = 0;
380
381 /// Return true (this is a read-direction operation).
382 2x bool is_read_operation() const noexcept override
383 {
384 2x return true;
385 }
386
387 4x void reset() noexcept
388 {
389 4x Base::reset();
390 4x iovec_count = 0;
391 4x }
392
393 void perform_io() noexcept override
394 {
395 msghdr msg{};
396 msg.msg_iov = iovecs;
397 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
398
399 ssize_t n;
400 do
401 {
402 n = ::recvmsg(this->fd, &msg, 0);
403 }
404 while (n < 0 && errno == EINTR);
405
406 if (n >= 0)
407 this->complete(0, static_cast<std::size_t>(n));
408 else
409 this->complete(errno, 0);
410 }
411 };
412
413 /** Shared send_to operation for datagram sockets.
414
415 Uses sendmsg() with the destination endpoint in msg_name.
416
417 @tparam Base The backend's base op type.
418 */
419 template<class Base>
420 struct reactor_send_to_op : Base
421 {
422 /// Maximum scatter-gather buffer count.
423 static constexpr std::size_t max_buffers = 16;
424
425 /// Scatter-gather I/O vectors.
426 iovec iovecs[max_buffers];
427
428 /// Number of active I/O vectors.
429 int iovec_count = 0;
430
431 /// Destination address storage.
432 sockaddr_storage dest_storage{};
433
434 /// Destination address length.
435 socklen_t dest_len = 0;
436
437 22x void reset() noexcept
438 {
439 22x Base::reset();
440 22x iovec_count = 0;
441 22x dest_storage = {};
442 22x dest_len = 0;
443 22x }
444
445 void perform_io() noexcept override
446 {
447 msghdr msg{};
448 msg.msg_name = &dest_storage;
449 msg.msg_namelen = dest_len;
450 msg.msg_iov = iovecs;
451 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
452
453 #ifdef MSG_NOSIGNAL
454 constexpr int send_flags = MSG_NOSIGNAL;
455 #else
456 constexpr int send_flags = 0;
457 #endif
458
459 ssize_t n;
460 do
461 {
462 n = ::sendmsg(this->fd, &msg, send_flags);
463 }
464 while (n < 0 && errno == EINTR);
465
466 if (n >= 0)
467 this->complete(0, static_cast<std::size_t>(n));
468 else
469 this->complete(errno, 0);
470 }
471 };
472
473 /** Shared recv_from operation for datagram sockets.
474
475 Uses recvmsg() with msg_name to capture the source endpoint.
476
477 @tparam Base The backend's base op type.
478 */
479 template<class Base>
480 struct reactor_recv_from_op : Base
481 {
482 /// Maximum scatter-gather buffer count.
483 static constexpr std::size_t max_buffers = 16;
484
485 /// Scatter-gather I/O vectors.
486 iovec iovecs[max_buffers];
487
488 /// Number of active I/O vectors.
489 int iovec_count = 0;
490
491 /// Source address storage filled by recvmsg.
492 sockaddr_storage source_storage{};
493
494 /// Output pointer for the source endpoint (set by do_recv_from).
495 endpoint* source_out = nullptr;
496
497 /// Return true (this is a read-direction operation).
498 bool is_read_operation() const noexcept override
499 {
500 return true;
501 }
502
503 32x void reset() noexcept
504 {
505 32x Base::reset();
506 32x iovec_count = 0;
507 32x source_storage = {};
508 32x source_out = nullptr;
509 32x }
510
511 2x void perform_io() noexcept override
512 {
513 2x msghdr msg{};
514 2x msg.msg_name = &source_storage;
515 2x msg.msg_namelen = sizeof(source_storage);
516 2x msg.msg_iov = iovecs;
517 2x msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
518
519 ssize_t n;
520 do
521 {
522 2x n = ::recvmsg(this->fd, &msg, 0);
523 }
524 2x while (n < 0 && errno == EINTR);
525
526 2x if (n >= 0)
527 2x this->complete(0, static_cast<std::size_t>(n));
528 else
529 this->complete(errno, 0);
530 2x }
531 };
532
533 } // namespace boost::corosio::detail
534
535 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
536