TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
12 :
13 : #include <boost/corosio/tcp_socket.hpp>
14 : #include <boost/corosio/native/detail/reactor/reactor_basic_socket.hpp>
15 : #include <boost/corosio/detail/dispatch_coro.hpp>
16 : #include <boost/capy/buffers.hpp>
17 :
18 : #include <coroutine>
19 :
20 : #include <errno.h>
21 : #include <sys/socket.h>
22 : #include <sys/uio.h>
23 :
24 : namespace boost::corosio::detail {
25 :
26 : /** CRTP base for reactor-backed stream socket implementations.
27 :
28 : Inherits shared data members and cancel/close/register logic
29 : from reactor_basic_socket. Adds the TCP-specific remote
30 : endpoint, shutdown, and I/O dispatch (connect, read, write).
31 :
32 : @tparam Derived The concrete socket type (CRTP).
33 : @tparam Service The backend's socket service type.
34 : @tparam ConnOp The backend's connect op type.
35 : @tparam ReadOp The backend's read op type.
36 : @tparam WriteOp The backend's write op type.
37 : @tparam DescState The backend's descriptor_state type.
38 : */
39 : template<
40 : class Derived,
41 : class Service,
42 : class ConnOp,
43 : class ReadOp,
44 : class WriteOp,
45 : class DescState>
46 : class reactor_stream_socket
47 : : public reactor_basic_socket<
48 : Derived,
49 : tcp_socket::implementation,
50 : Service,
51 : DescState>
52 : {
53 : using base_type = reactor_basic_socket<
54 : Derived,
55 : tcp_socket::implementation,
56 : Service,
57 : DescState>;
58 : friend base_type;
59 : friend Derived;
60 :
61 HIT 23838 : explicit reactor_stream_socket(Service& svc) noexcept : base_type(svc) {}
62 :
63 : protected:
64 : endpoint remote_endpoint_;
65 :
66 : public:
67 : /// Pending connect operation slot.
68 : ConnOp conn_;
69 :
70 : /// Pending read operation slot.
71 : ReadOp rd_;
72 :
73 : /// Pending write operation slot.
74 : WriteOp wr_;
75 :
76 23838 : ~reactor_stream_socket() override = default;
77 :
78 : /// Return the cached remote endpoint.
79 42 : endpoint remote_endpoint() const noexcept override
80 : {
81 42 : return remote_endpoint_;
82 : }
83 :
84 : /// Shut down part or all of the full-duplex connection.
85 6 : std::error_code shutdown(tcp_socket::shutdown_type what) noexcept override
86 : {
87 : int how;
88 6 : switch (what)
89 : {
90 2 : case tcp_socket::shutdown_receive:
91 2 : how = SHUT_RD;
92 2 : break;
93 2 : case tcp_socket::shutdown_send:
94 2 : how = SHUT_WR;
95 2 : break;
96 2 : case tcp_socket::shutdown_both:
97 2 : how = SHUT_RDWR;
98 2 : break;
99 MIS 0 : default:
100 0 : return make_err(EINVAL);
101 : }
102 HIT 6 : if (::shutdown(this->fd_, how) != 0)
103 MIS 0 : return make_err(errno);
104 HIT 6 : return {};
105 : }
106 :
107 : /// Cache local and remote endpoints.
108 15832 : void set_endpoints(endpoint local, endpoint remote) noexcept
109 : {
110 15832 : this->local_endpoint_ = local;
111 15832 : remote_endpoint_ = remote;
112 15832 : }
113 :
114 : /** Shared connect dispatch.
115 :
116 : Tries the connect syscall speculatively. On synchronous
117 : completion, returns via inline budget or posts through queue.
118 : On EINPROGRESS, registers with the reactor.
119 : */
120 : std::coroutine_handle<> do_connect(
121 : std::coroutine_handle<>,
122 : capy::executor_ref,
123 : endpoint,
124 : std::stop_token const&,
125 : std::error_code*);
126 :
127 : /** Shared scatter-read dispatch.
128 :
129 : Tries readv() speculatively. On success or hard error,
130 : returns via inline budget or posts through queue.
131 : On EAGAIN, registers with the reactor.
132 : */
133 : std::coroutine_handle<> do_read_some(
134 : std::coroutine_handle<>,
135 : capy::executor_ref,
136 : buffer_param,
137 : std::stop_token const&,
138 : std::error_code*,
139 : std::size_t*);
140 :
141 : /** Shared gather-write dispatch.
142 :
143 : Tries the write via WriteOp::write_policy speculatively.
144 : On success or hard error, returns via inline budget or
145 : posts through queue. On EAGAIN, registers with the reactor.
146 : */
147 : std::coroutine_handle<> do_write_some(
148 : std::coroutine_handle<>,
149 : capy::executor_ref,
150 : buffer_param,
151 : std::stop_token const&,
152 : std::error_code*,
153 : std::size_t*);
154 :
155 : /** Close the socket and cancel pending operations.
156 :
157 : Extends the base do_close_socket() to also reset
158 : the remote endpoint.
159 : */
160 71494 : void do_close_socket() noexcept
161 : {
162 71494 : base_type::do_close_socket();
163 71494 : remote_endpoint_ = endpoint{};
164 71494 : }
165 :
166 : private:
167 : // CRTP callbacks for reactor_basic_socket cancel/close
168 :
169 : template<class Op>
170 192 : reactor_op_base** op_to_desc_slot(Op& op) noexcept
171 : {
172 192 : if (&op == static_cast<void*>(&conn_))
173 MIS 0 : return &this->desc_state_.connect_op;
174 HIT 192 : if (&op == static_cast<void*>(&rd_))
175 192 : return &this->desc_state_.read_op;
176 MIS 0 : if (&op == static_cast<void*>(&wr_))
177 0 : return &this->desc_state_.write_op;
178 0 : return nullptr;
179 : }
180 :
181 : template<class Op>
182 0 : bool* op_to_cancel_flag(Op& op) noexcept
183 : {
184 0 : if (&op == static_cast<void*>(&conn_))
185 0 : return &this->desc_state_.connect_cancel_pending;
186 0 : if (&op == static_cast<void*>(&rd_))
187 0 : return &this->desc_state_.read_cancel_pending;
188 0 : if (&op == static_cast<void*>(&wr_))
189 0 : return &this->desc_state_.write_cancel_pending;
190 0 : return nullptr;
191 : }
192 :
193 : template<class Fn>
194 HIT 71679 : void for_each_op(Fn fn) noexcept
195 : {
196 71679 : fn(conn_);
197 71679 : fn(rd_);
198 71679 : fn(wr_);
199 71679 : }
200 :
201 : template<class Fn>
202 71679 : void for_each_desc_entry(Fn fn) noexcept
203 : {
204 71679 : fn(conn_, this->desc_state_.connect_op);
205 71679 : fn(rd_, this->desc_state_.read_op);
206 71679 : fn(wr_, this->desc_state_.write_op);
207 71679 : }
208 : };
209 :
210 : template<
211 : class Derived,
212 : class Service,
213 : class ConnOp,
214 : class ReadOp,
215 : class WriteOp,
216 : class DescState>
217 : std::coroutine_handle<>
218 7921 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState>::
219 : do_connect(
220 : std::coroutine_handle<> h,
221 : capy::executor_ref ex,
222 : endpoint ep,
223 : std::stop_token const& token,
224 : std::error_code* ec)
225 : {
226 7921 : auto& op = conn_;
227 :
228 7921 : sockaddr_storage storage{};
229 7921 : socklen_t addrlen = to_sockaddr(ep, socket_family(this->fd_), storage);
230 : int result =
231 7921 : ::connect(this->fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
232 :
233 7921 : if (result == 0)
234 : {
235 MIS 0 : sockaddr_storage local_storage{};
236 0 : socklen_t local_len = sizeof(local_storage);
237 0 : if (::getsockname(
238 : this->fd_, reinterpret_cast<sockaddr*>(&local_storage),
239 0 : &local_len) == 0)
240 0 : this->local_endpoint_ = from_sockaddr(local_storage);
241 0 : remote_endpoint_ = ep;
242 : }
243 :
244 HIT 7921 : if (result == 0 || errno != EINPROGRESS)
245 : {
246 MIS 0 : int err = (result < 0) ? errno : 0;
247 0 : if (this->svc_.scheduler().try_consume_inline_budget())
248 : {
249 0 : *ec = err ? make_err(err) : std::error_code{};
250 0 : op.cont_op.cont.h = h;
251 0 : return dispatch_coro(ex, op.cont_op.cont);
252 : }
253 0 : op.reset();
254 0 : op.h = h;
255 0 : op.ex = ex;
256 0 : op.ec_out = ec;
257 0 : op.fd = this->fd_;
258 0 : op.target_endpoint = ep;
259 0 : op.start(token, static_cast<Derived*>(this));
260 0 : op.impl_ptr = this->shared_from_this();
261 0 : op.complete(err, 0);
262 0 : this->svc_.post(&op);
263 0 : return std::noop_coroutine();
264 : }
265 :
266 : // EINPROGRESS — register with reactor
267 HIT 7921 : op.reset();
268 7921 : op.h = h;
269 7921 : op.ex = ex;
270 7921 : op.ec_out = ec;
271 7921 : op.fd = this->fd_;
272 7921 : op.target_endpoint = ep;
273 7921 : op.start(token, static_cast<Derived*>(this));
274 7921 : op.impl_ptr = this->shared_from_this();
275 :
276 7921 : this->register_op(
277 7921 : op, this->desc_state_.connect_op, this->desc_state_.write_ready,
278 7921 : this->desc_state_.connect_cancel_pending);
279 7921 : return std::noop_coroutine();
280 : }
281 :
282 : template<
283 : class Derived,
284 : class Service,
285 : class ConnOp,
286 : class ReadOp,
287 : class WriteOp,
288 : class DescState>
289 : std::coroutine_handle<>
290 207017 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState>::
291 : do_read_some(
292 : std::coroutine_handle<> h,
293 : capy::executor_ref ex,
294 : buffer_param param,
295 : std::stop_token const& token,
296 : std::error_code* ec,
297 : std::size_t* bytes_out)
298 : {
299 207017 : auto& op = rd_;
300 207017 : op.reset();
301 :
302 207017 : capy::mutable_buffer bufs[ReadOp::max_buffers];
303 207017 : op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
304 :
305 207017 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
306 : {
307 2 : op.empty_buffer_read = true;
308 2 : op.h = h;
309 2 : op.ex = ex;
310 2 : op.ec_out = ec;
311 2 : op.bytes_out = bytes_out;
312 2 : op.start(token, static_cast<Derived*>(this));
313 2 : op.impl_ptr = this->shared_from_this();
314 2 : op.complete(0, 0);
315 2 : this->svc_.post(&op);
316 2 : return std::noop_coroutine();
317 : }
318 :
319 414030 : for (int i = 0; i < op.iovec_count; ++i)
320 : {
321 207015 : op.iovecs[i].iov_base = bufs[i].data();
322 207015 : op.iovecs[i].iov_len = bufs[i].size();
323 : }
324 :
325 : // Speculative read
326 : ssize_t n;
327 : do
328 : {
329 207015 : n = ::readv(this->fd_, op.iovecs, op.iovec_count);
330 : }
331 207015 : while (n < 0 && errno == EINTR);
332 :
333 207015 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
334 : {
335 206627 : int err = (n < 0) ? errno : 0;
336 206627 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
337 :
338 206627 : if (this->svc_.scheduler().try_consume_inline_budget())
339 : {
340 165336 : if (err)
341 MIS 0 : *ec = make_err(err);
342 HIT 165336 : else if (n == 0)
343 10 : *ec = capy::error::eof;
344 : else
345 165326 : *ec = {};
346 165336 : *bytes_out = bytes;
347 165336 : op.cont_op.cont.h = h;
348 165336 : return dispatch_coro(ex, op.cont_op.cont);
349 : }
350 41291 : op.h = h;
351 41291 : op.ex = ex;
352 41291 : op.ec_out = ec;
353 41291 : op.bytes_out = bytes_out;
354 41291 : op.start(token, static_cast<Derived*>(this));
355 41291 : op.impl_ptr = this->shared_from_this();
356 41291 : op.complete(err, bytes);
357 41291 : this->svc_.post(&op);
358 41291 : return std::noop_coroutine();
359 : }
360 :
361 : // EAGAIN — register with reactor
362 388 : op.h = h;
363 388 : op.ex = ex;
364 388 : op.ec_out = ec;
365 388 : op.bytes_out = bytes_out;
366 388 : op.fd = this->fd_;
367 388 : op.start(token, static_cast<Derived*>(this));
368 388 : op.impl_ptr = this->shared_from_this();
369 :
370 388 : this->register_op(
371 388 : op, this->desc_state_.read_op, this->desc_state_.read_ready,
372 388 : this->desc_state_.read_cancel_pending);
373 388 : return std::noop_coroutine();
374 : }
375 :
376 : template<
377 : class Derived,
378 : class Service,
379 : class ConnOp,
380 : class ReadOp,
381 : class WriteOp,
382 : class DescState>
383 : std::coroutine_handle<>
384 206719 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState>::
385 : do_write_some(
386 : std::coroutine_handle<> h,
387 : capy::executor_ref ex,
388 : buffer_param param,
389 : std::stop_token const& token,
390 : std::error_code* ec,
391 : std::size_t* bytes_out)
392 : {
393 206719 : auto& op = wr_;
394 206719 : op.reset();
395 :
396 206719 : capy::mutable_buffer bufs[WriteOp::max_buffers];
397 206719 : op.iovec_count =
398 206719 : static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
399 :
400 206719 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
401 : {
402 2 : op.h = h;
403 2 : op.ex = ex;
404 2 : op.ec_out = ec;
405 2 : op.bytes_out = bytes_out;
406 2 : op.start(token, static_cast<Derived*>(this));
407 2 : op.impl_ptr = this->shared_from_this();
408 2 : op.complete(0, 0);
409 2 : this->svc_.post(&op);
410 2 : return std::noop_coroutine();
411 : }
412 :
413 413434 : for (int i = 0; i < op.iovec_count; ++i)
414 : {
415 206717 : op.iovecs[i].iov_base = bufs[i].data();
416 206717 : op.iovecs[i].iov_len = bufs[i].size();
417 : }
418 :
419 : // Speculative write via backend-specific write policy
420 : ssize_t n =
421 206717 : WriteOp::write_policy::write(this->fd_, op.iovecs, op.iovec_count);
422 :
423 206717 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
424 : {
425 206717 : int err = (n < 0) ? errno : 0;
426 206717 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
427 :
428 206717 : if (this->svc_.scheduler().try_consume_inline_budget())
429 : {
430 165391 : *ec = err ? make_err(err) : std::error_code{};
431 165391 : *bytes_out = bytes;
432 165391 : op.cont_op.cont.h = h;
433 165391 : return dispatch_coro(ex, op.cont_op.cont);
434 : }
435 41326 : op.h = h;
436 41326 : op.ex = ex;
437 41326 : op.ec_out = ec;
438 41326 : op.bytes_out = bytes_out;
439 41326 : op.start(token, static_cast<Derived*>(this));
440 41326 : op.impl_ptr = this->shared_from_this();
441 41326 : op.complete(err, bytes);
442 41326 : this->svc_.post(&op);
443 41326 : return std::noop_coroutine();
444 : }
445 :
446 : // EAGAIN — register with reactor
447 MIS 0 : op.h = h;
448 0 : op.ex = ex;
449 0 : op.ec_out = ec;
450 0 : op.bytes_out = bytes_out;
451 0 : op.fd = this->fd_;
452 0 : op.start(token, static_cast<Derived*>(this));
453 0 : op.impl_ptr = this->shared_from_this();
454 :
455 0 : this->register_op(
456 0 : op, this->desc_state_.write_op, this->desc_state_.write_ready,
457 0 : this->desc_state_.write_cancel_pending);
458 0 : return std::noop_coroutine();
459 : }
460 :
461 : } // namespace boost::corosio::detail
462 :
463 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
|