LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_op.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 68.2 % 154 105 49
Test Date: 2026-03-26 16:40:44 Functions: 76.5 % 68 52 16

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      12                 : 
      13                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      14                 : #include <boost/corosio/io/io_object.hpp>
      15                 : #include <boost/corosio/endpoint.hpp>
      16                 : #include <boost/corosio/detail/continuation_op.hpp>
      17                 : #include <boost/capy/ex/executor_ref.hpp>
      18                 : 
      19                 : #include <atomic>
      20                 : #include <coroutine>
      21                 : #include <cstddef>
      22                 : #include <memory>
      23                 : #include <optional>
      24                 : #include <stop_token>
      25                 : #include <system_error>
      26                 : 
      27                 : #include <errno.h>
      28                 : 
      29                 : #include <netinet/in.h>
      30                 : #include <sys/socket.h>
      31                 : #include <sys/uio.h>
      32                 : 
      33                 : namespace boost::corosio::detail {
      34                 : 
      35                 : /** Base operation for reactor-based backends.
      36                 : 
      37                 :     Holds per-operation state that depends on the concrete backend
      38                 :     socket/acceptor types: coroutine handle, executor, output
      39                 :     pointers, file descriptor, stop_callback, and type-specific
      40                 :     impl pointers.
      41                 : 
      42                 :     Fields shared across all backends (errn, bytes_transferred,
      43                 :     cancelled, impl_ptr, perform_io, complete) live in
      44                 :     reactor_op_base so the scheduler and descriptor_state can
      45                 :     access them without template instantiation.
      46                 : 
      47                 :     @tparam Socket The backend socket impl type (forward-declared).
      48                 :     @tparam Acceptor The backend acceptor impl type (forward-declared).
      49                 : */
      50                 : template<class Socket, class Acceptor>
      51                 : struct reactor_op : reactor_op_base
      52                 : {
      53                 :     /// Stop-token callback that invokes cancel() on the target op.
      54                 :     struct canceller
      55                 :     {
      56                 :         reactor_op* op;
      57 HIT         200 :         void operator()() const noexcept
      58                 :         {
      59             200 :             op->cancel();
      60             200 :         }
      61                 :     };
      62                 : 
      63                 :     /// Caller's coroutine handle to resume on completion.
      64                 :     std::coroutine_handle<> h;
      65                 : 
      66                 :     /// Scheduler-ready continuation for executor dispatch/post (wraps h).
      67                 :     detail::continuation_op cont_op;
      68                 : 
      69                 :     /// Executor for dispatching the completion.
      70                 :     capy::executor_ref ex;
      71                 : 
      72                 :     /// Output pointer for the error code.
      73                 :     std::error_code* ec_out = nullptr;
      74                 : 
      75                 :     /// Output pointer for bytes transferred.
      76                 :     std::size_t* bytes_out = nullptr;
      77                 : 
      78                 :     /// File descriptor this operation targets.
      79                 :     int fd = -1;
      80                 : 
      81                 :     /// Stop-token callback registration.
      82                 :     std::optional<std::stop_callback<canceller>> stop_cb;
      83                 : 
      84                 :     /// Owning socket impl (for stop_token cancellation).
      85                 :     Socket* socket_impl_ = nullptr;
      86                 : 
      87                 :     /// Owning acceptor impl (for stop_token cancellation).
      88                 :     Acceptor* acceptor_impl_ = nullptr;
      89                 : 
      90           72045 :     reactor_op() = default;
      91                 : 
      92                 :     /// Reset operation state for reuse.
      93          429659 :     void reset() noexcept
      94                 :     {
      95          429659 :         fd                = -1;
      96          429659 :         errn              = 0;
      97          429659 :         bytes_transferred = 0;
      98          429659 :         cancelled.store(false, std::memory_order_relaxed);
      99          429659 :         impl_ptr.reset();
     100          429659 :         socket_impl_   = nullptr;
     101          429659 :         acceptor_impl_ = nullptr;
     102          429659 :     }
     103                 : 
     104                 :     /// Return true if this is a read-direction operation.
     105           41336 :     virtual bool is_read_operation() const noexcept
     106                 :     {
     107           41336 :         return false;
     108                 :     }
     109                 : 
     110                 :     /// Cancel this operation via the owning impl.
     111                 :     virtual void cancel() noexcept = 0;
     112                 : 
     113                 :     /// Destroy without invoking.
     114 MIS           0 :     void destroy() override
     115                 :     {
     116               0 :         stop_cb.reset();
     117               0 :         reactor_op_base::destroy();
     118               0 :     }
     119                 : 
     120                 :     /// Arm the stop-token callback for a socket operation.
     121 HIT       90974 :     void start(std::stop_token const& token, Socket* impl)
     122                 :     {
     123           90974 :         cancelled.store(false, std::memory_order_release);
     124           90974 :         stop_cb.reset();
     125           90974 :         socket_impl_   = impl;
     126           90974 :         acceptor_impl_ = nullptr;
     127                 : 
     128           90974 :         if (token.stop_possible())
     129             198 :             stop_cb.emplace(token, canceller{this});
     130           90974 :     }
     131                 : 
     132                 :     /// Arm the stop-token callback for an acceptor operation.
     133            7928 :     void start(std::stop_token const& token, Acceptor* impl)
     134                 :     {
     135            7928 :         cancelled.store(false, std::memory_order_release);
     136            7928 :         stop_cb.reset();
     137            7928 :         socket_impl_   = nullptr;
     138            7928 :         acceptor_impl_ = impl;
     139                 : 
     140            7928 :         if (token.stop_possible())
     141               9 :             stop_cb.emplace(token, canceller{this});
     142            7928 :     }
     143                 : };
     144                 : 
     145                 : /** Shared connect operation.
     146                 : 
     147                 :     Checks SO_ERROR for connect completion status. The operator()()
     148                 :     and cancel() are provided by the concrete backend type.
     149                 : 
     150                 :     @tparam Base The backend's base op type.
     151                 : */
     152                 : template<class Base>
     153                 : struct reactor_connect_op : Base
     154                 : {
     155                 :     /// Endpoint to connect to.
     156                 :     endpoint target_endpoint;
     157                 : 
     158                 :     /// Reset operation state for reuse.
     159            7931 :     void reset() noexcept
     160                 :     {
     161            7931 :         Base::reset();
     162            7931 :         target_endpoint = endpoint{};
     163            7931 :     }
     164                 : 
     165            7920 :     void perform_io() noexcept override
     166                 :     {
     167            7920 :         int err       = 0;
     168            7920 :         socklen_t len = sizeof(err);
     169            7920 :         if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     170 MIS           0 :             err = errno;
     171 HIT        7920 :         this->complete(err, 0);
     172            7920 :     }
     173                 : };
     174                 : 
     175                 : /** Shared scatter-read operation.
     176                 : 
     177                 :     Uses readv() with an EINTR retry loop.
     178                 : 
     179                 :     @tparam Base The backend's base op type.
     180                 : */
     181                 : template<class Base>
     182                 : struct reactor_read_op : Base
     183                 : {
     184                 :     /// Maximum scatter-gather buffer count.
     185                 :     static constexpr std::size_t max_buffers = 16;
     186                 : 
     187                 :     /// Scatter-gather I/O vectors.
     188                 :     iovec iovecs[max_buffers];
     189                 : 
     190                 :     /// Number of active I/O vectors.
     191                 :     int iovec_count = 0;
     192                 : 
     193                 :     /// True for zero-length reads (completed immediately).
     194                 :     bool empty_buffer_read = false;
     195                 : 
     196                 :     /// Return true (this is a read-direction operation).
     197           41382 :     bool is_read_operation() const noexcept override
     198                 :     {
     199           41382 :         return !empty_buffer_read;
     200                 :     }
     201                 : 
     202          207017 :     void reset() noexcept
     203                 :     {
     204          207017 :         Base::reset();
     205          207017 :         iovec_count       = 0;
     206          207017 :         empty_buffer_read = false;
     207          207017 :     }
     208                 : 
     209             327 :     void perform_io() noexcept override
     210                 :     {
     211                 :         ssize_t n;
     212                 :         do
     213                 :         {
     214             327 :             n = ::readv(this->fd, iovecs, iovec_count);
     215                 :         }
     216             327 :         while (n < 0 && errno == EINTR);
     217                 : 
     218             327 :         if (n >= 0)
     219              97 :             this->complete(0, static_cast<std::size_t>(n));
     220                 :         else
     221             230 :             this->complete(errno, 0);
     222             327 :     }
     223                 : };
     224                 : 
     225                 : /** Shared gather-write operation.
     226                 : 
     227                 :     Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
     228                 :     which returns ssize_t (bytes written or -1 with errno set).
     229                 : 
     230                 :     @tparam Base The backend's base op type.
     231                 :     @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
     232                 : */
     233                 : template<class Base, class WritePolicy>
     234                 : struct reactor_write_op : Base
     235                 : {
     236                 :     /// The write syscall policy type.
     237                 :     using write_policy = WritePolicy;
     238                 : 
     239                 :     /// Maximum scatter-gather buffer count.
     240                 :     static constexpr std::size_t max_buffers = 16;
     241                 : 
     242                 :     /// Scatter-gather I/O vectors.
     243                 :     iovec iovecs[max_buffers];
     244                 : 
     245                 :     /// Number of active I/O vectors.
     246                 :     int iovec_count = 0;
     247                 : 
     248          206719 :     void reset() noexcept
     249                 :     {
     250          206719 :         Base::reset();
     251          206719 :         iovec_count = 0;
     252          206719 :     }
     253                 : 
     254 MIS           0 :     void perform_io() noexcept override
     255                 :     {
     256               0 :         ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
     257               0 :         if (n >= 0)
     258               0 :             this->complete(0, static_cast<std::size_t>(n));
     259                 :         else
     260               0 :             this->complete(errno, 0);
     261               0 :     }
     262                 : };
     263                 : 
     264                 : /** Shared accept operation.
     265                 : 
     266                 :     Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
     267                 :     which returns the accepted fd or -1 with errno set.
     268                 : 
     269                 :     @tparam Base The backend's base op type.
     270                 :     @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
     271                 : */
     272                 : template<class Base, class AcceptPolicy>
     273                 : struct reactor_accept_op : Base
     274                 : {
     275                 :     /// File descriptor of the accepted connection.
     276                 :     int accepted_fd = -1;
     277                 : 
     278                 :     /// Pointer to the peer socket implementation.
     279                 :     io_object::implementation* peer_impl = nullptr;
     280                 : 
     281                 :     /// Output pointer for the accepted implementation.
     282                 :     io_object::implementation** impl_out = nullptr;
     283                 : 
     284                 :     /// Peer address storage filled by accept.
     285                 :     sockaddr_storage peer_storage{};
     286                 : 
     287 HIT        7928 :     void reset() noexcept
     288                 :     {
     289            7928 :         Base::reset();
     290            7928 :         accepted_fd  = -1;
     291            7928 :         peer_impl    = nullptr;
     292            7928 :         impl_out     = nullptr;
     293            7928 :         peer_storage = {};
     294            7928 :     }
     295                 : 
     296            7912 :     void perform_io() noexcept override
     297                 :     {
     298            7912 :         int new_fd = AcceptPolicy::do_accept(this->fd, peer_storage);
     299            7912 :         if (new_fd >= 0)
     300                 :         {
     301            7912 :             accepted_fd = new_fd;
     302            7912 :             this->complete(0, 0);
     303                 :         }
     304                 :         else
     305                 :         {
     306 MIS           0 :             this->complete(errno, 0);
     307                 :         }
     308 HIT        7912 :     }
     309                 : };
     310                 : 
     311                 : /** Shared connected send operation for datagram sockets.
     312                 : 
     313                 :     Uses sendmsg() with msg_name=nullptr (connected mode).
     314                 : 
     315                 :     @tparam Base The backend's base op type.
     316                 : */
     317                 : template<class Base>
     318                 : struct reactor_send_op : Base
     319                 : {
     320                 :     /// Maximum scatter-gather buffer count.
     321                 :     static constexpr std::size_t max_buffers = 16;
     322                 : 
     323                 :     /// Scatter-gather I/O vectors.
     324                 :     iovec iovecs[max_buffers];
     325                 : 
     326                 :     /// Number of active I/O vectors.
     327                 :     int iovec_count = 0;
     328                 : 
     329               6 :     void reset() noexcept
     330                 :     {
     331               6 :         Base::reset();
     332               6 :         iovec_count = 0;
     333               6 :     }
     334                 : 
     335 MIS           0 :     void perform_io() noexcept override
     336                 :     {
     337               0 :         msghdr msg{};
     338               0 :         msg.msg_iov    = iovecs;
     339               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     340                 : 
     341                 : #ifdef MSG_NOSIGNAL
     342               0 :         constexpr int send_flags = MSG_NOSIGNAL;
     343                 : #else
     344                 :         constexpr int send_flags = 0;
     345                 : #endif
     346                 : 
     347                 :         ssize_t n;
     348                 :         do
     349                 :         {
     350               0 :             n = ::sendmsg(this->fd, &msg, send_flags);
     351                 :         }
     352               0 :         while (n < 0 && errno == EINTR);
     353                 : 
     354               0 :         if (n >= 0)
     355               0 :             this->complete(0, static_cast<std::size_t>(n));
     356                 :         else
     357               0 :             this->complete(errno, 0);
     358               0 :     }
     359                 : };
     360                 : 
     361                 : /** Shared connected recv operation for datagram sockets.
     362                 : 
     363                 :     Uses recvmsg() with msg_name=nullptr (connected mode).
     364                 :     Unlike reactor_read_op, does not map n==0 to EOF
     365                 :     (zero-length datagrams are valid).
     366                 : 
     367                 :     @tparam Base The backend's base op type.
     368                 : */
     369                 : template<class Base>
     370                 : struct reactor_recv_op : Base
     371                 : {
     372                 :     /// Maximum scatter-gather buffer count.
     373                 :     static constexpr std::size_t max_buffers = 16;
     374                 : 
     375                 :     /// Scatter-gather I/O vectors.
     376                 :     iovec iovecs[max_buffers];
     377                 : 
     378                 :     /// Number of active I/O vectors.
     379                 :     int iovec_count = 0;
     380                 : 
     381                 :     /// Return true (this is a read-direction operation).
     382 HIT           2 :     bool is_read_operation() const noexcept override
     383                 :     {
     384               2 :         return true;
     385                 :     }
     386                 : 
     387               4 :     void reset() noexcept
     388                 :     {
     389               4 :         Base::reset();
     390               4 :         iovec_count = 0;
     391               4 :     }
     392                 : 
     393 MIS           0 :     void perform_io() noexcept override
     394                 :     {
     395               0 :         msghdr msg{};
     396               0 :         msg.msg_iov    = iovecs;
     397               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     398                 : 
     399                 :         ssize_t n;
     400                 :         do
     401                 :         {
     402               0 :             n = ::recvmsg(this->fd, &msg, 0);
     403                 :         }
     404               0 :         while (n < 0 && errno == EINTR);
     405                 : 
     406               0 :         if (n >= 0)
     407               0 :             this->complete(0, static_cast<std::size_t>(n));
     408                 :         else
     409               0 :             this->complete(errno, 0);
     410               0 :     }
     411                 : };
     412                 : 
     413                 : /** Shared send_to operation for datagram sockets.
     414                 : 
     415                 :     Uses sendmsg() with the destination endpoint in msg_name.
     416                 : 
     417                 :     @tparam Base The backend's base op type.
     418                 : */
     419                 : template<class Base>
     420                 : struct reactor_send_to_op : Base
     421                 : {
     422                 :     /// Maximum scatter-gather buffer count.
     423                 :     static constexpr std::size_t max_buffers = 16;
     424                 : 
     425                 :     /// Scatter-gather I/O vectors.
     426                 :     iovec iovecs[max_buffers];
     427                 : 
     428                 :     /// Number of active I/O vectors.
     429                 :     int iovec_count = 0;
     430                 : 
     431                 :     /// Destination address storage.
     432                 :     sockaddr_storage dest_storage{};
     433                 : 
     434                 :     /// Destination address length.
     435                 :     socklen_t dest_len = 0;
     436                 : 
     437 HIT          22 :     void reset() noexcept
     438                 :     {
     439              22 :         Base::reset();
     440              22 :         iovec_count  = 0;
     441              22 :         dest_storage = {};
     442              22 :         dest_len     = 0;
     443              22 :     }
     444                 : 
     445 MIS           0 :     void perform_io() noexcept override
     446                 :     {
     447               0 :         msghdr msg{};
     448               0 :         msg.msg_name    = &dest_storage;
     449               0 :         msg.msg_namelen = dest_len;
     450               0 :         msg.msg_iov     = iovecs;
     451               0 :         msg.msg_iovlen  = static_cast<std::size_t>(iovec_count);
     452                 : 
     453                 : #ifdef MSG_NOSIGNAL
     454               0 :         constexpr int send_flags = MSG_NOSIGNAL;
     455                 : #else
     456                 :         constexpr int send_flags = 0;
     457                 : #endif
     458                 : 
     459                 :         ssize_t n;
     460                 :         do
     461                 :         {
     462               0 :             n = ::sendmsg(this->fd, &msg, send_flags);
     463                 :         }
     464               0 :         while (n < 0 && errno == EINTR);
     465                 : 
     466               0 :         if (n >= 0)
     467               0 :             this->complete(0, static_cast<std::size_t>(n));
     468                 :         else
     469               0 :             this->complete(errno, 0);
     470               0 :     }
     471                 : };
     472                 : 
     473                 : /** Shared recv_from operation for datagram sockets.
     474                 : 
     475                 :     Uses recvmsg() with msg_name to capture the source endpoint.
     476                 : 
     477                 :     @tparam Base The backend's base op type.
     478                 : */
     479                 : template<class Base>
     480                 : struct reactor_recv_from_op : Base
     481                 : {
     482                 :     /// Maximum scatter-gather buffer count.
     483                 :     static constexpr std::size_t max_buffers = 16;
     484                 : 
     485                 :     /// Scatter-gather I/O vectors.
     486                 :     iovec iovecs[max_buffers];
     487                 : 
     488                 :     /// Number of active I/O vectors.
     489                 :     int iovec_count = 0;
     490                 : 
     491                 :     /// Source address storage filled by recvmsg.
     492                 :     sockaddr_storage source_storage{};
     493                 : 
     494                 :     /// Output pointer for the source endpoint (set by do_recv_from).
     495                 :     endpoint* source_out = nullptr;
     496                 : 
     497                 :     /// Return true (this is a read-direction operation).
     498               0 :     bool is_read_operation() const noexcept override
     499                 :     {
     500               0 :         return true;
     501                 :     }
     502                 : 
     503 HIT          32 :     void reset() noexcept
     504                 :     {
     505              32 :         Base::reset();
     506              32 :         iovec_count    = 0;
     507              32 :         source_storage = {};
     508              32 :         source_out     = nullptr;
     509              32 :     }
     510                 : 
     511               2 :     void perform_io() noexcept override
     512                 :     {
     513               2 :         msghdr msg{};
     514               2 :         msg.msg_name    = &source_storage;
     515               2 :         msg.msg_namelen = sizeof(source_storage);
     516               2 :         msg.msg_iov     = iovecs;
     517               2 :         msg.msg_iovlen  = static_cast<std::size_t>(iovec_count);
     518                 : 
     519                 :         ssize_t n;
     520                 :         do
     521                 :         {
     522               2 :             n = ::recvmsg(this->fd, &msg, 0);
     523                 :         }
     524               2 :         while (n < 0 && errno == EINTR);
     525                 : 
     526               2 :         if (n >= 0)
     527               2 :             this->complete(0, static_cast<std::size_t>(n));
     528                 :         else
     529 MIS           0 :             this->complete(errno, 0);
     530 HIT           2 :     }
     531                 : };
     532                 : 
     533                 : } // namespace boost::corosio::detail
     534                 : 
     535                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
        

Generated by: LCOV version 2.3