LCOV - code coverage report
Current view: top level - corosio/native/detail/posix - posix_stream_file.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 86.1 % 137 118 19
Test Date: 2026-03-26 16:40:44 Functions: 94.7 % 19 18 1

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_POSIX
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/corosio/stream_file.hpp>
      19                 : #include <boost/corosio/file_base.hpp>
      20                 : #include <boost/corosio/detail/intrusive.hpp>
      21                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      22                 : #include <boost/corosio/detail/scheduler_op.hpp>
      23                 : #include <boost/corosio/detail/continuation_op.hpp>
      24                 : #include <boost/corosio/detail/thread_pool.hpp>
      25                 : #include <boost/corosio/detail/scheduler.hpp>
      26                 : #include <boost/corosio/detail/buffer_param.hpp>
      27                 : #include <boost/corosio/native/detail/make_err.hpp>
      28                 : #include <boost/capy/ex/executor_ref.hpp>
      29                 : #include <boost/capy/error.hpp>
      30                 : #include <boost/capy/buffers.hpp>
      31                 : 
      32                 : #include <atomic>
      33                 : #include <coroutine>
      34                 : #include <cstddef>
      35                 : #include <cstdint>
      36                 : #include <filesystem>
      37                 : #include <limits>
      38                 : #include <memory>
      39                 : #include <optional>
      40                 : #include <stop_token>
      41                 : #include <system_error>
      42                 : 
      43                 : #include <errno.h>
      44                 : #include <fcntl.h>
      45                 : #include <sys/stat.h>
      46                 : #include <sys/uio.h>
      47                 : #include <unistd.h>
      48                 : 
      49                 : /*
      50                 :     POSIX Stream File Implementation
      51                 :     =================================
      52                 : 
      53                 :     Regular files cannot be monitored by epoll/kqueue/select — the kernel
      54                 :     always reports them as ready. Blocking I/O (pread/pwrite) is dispatched
      55                 :     to a shared thread pool, with completion posted back to the scheduler.
      56                 : 
      57                 :     This follows the same pattern as posix_resolver: pool_work_item for
      58                 :     dispatch, scheduler_op for completion, shared_from_this for lifetime.
      59                 : 
      60                 :     Completion Flow
      61                 :     ---------------
      62                 :     1. read_some() sets up file_read_op, posts to thread pool
      63                 :     2. Pool thread runs preadv() (blocking)
      64                 :     3. Pool thread stores results, posts scheduler_op to scheduler
      65                 :     4. Scheduler invokes op() which resumes the coroutine
      66                 : 
      67                 :     Single-Inflight Constraint
      68                 :     --------------------------
      69                 :     Only one asynchronous operation may be in flight at a time on a
      70                 :     given file object. Concurrent read and write is not supported
      71                 :     because both share offset_ without synchronization.
      72                 : */
      73                 : 
      74                 : namespace boost::corosio::detail {
      75                 : 
      76                 : struct scheduler;
      77                 : class posix_stream_file_service;
      78                 : 
      79                 : /** Stream file implementation for POSIX backends.
      80                 : 
      81                 :     Each instance contains embedded operation objects (read_op_, write_op_)
      82                 :     that are reused across calls. This avoids per-operation heap allocation.
      83                 : */
      84                 : class posix_stream_file final
      85                 :     : public stream_file::implementation
      86                 :     , public std::enable_shared_from_this<posix_stream_file>
      87                 :     , public intrusive_list<posix_stream_file>::node
      88                 : {
      89                 :     friend class posix_stream_file_service;
      90                 : 
      91                 : public:
      92                 :     static constexpr std::size_t max_buffers = 16;
      93                 : 
      94                 :     /** Operation state for a single file read or write. */
      95                 :     struct file_op : scheduler_op
      96                 :     {
      97                 :         struct canceller
      98                 :         {
      99                 :             file_op* op;
     100 HIT           1 :             void operator()() const noexcept
     101                 :             {
     102               1 :                 op->request_cancel();
     103               1 :             }
     104                 :         };
     105                 : 
     106                 :         // Coroutine state
     107                 :         std::coroutine_handle<> h;
     108                 :         detail::continuation_op cont_op;
     109                 :         capy::executor_ref ex;
     110                 : 
     111                 :         // Output pointers
     112                 :         std::error_code* ec_out = nullptr;
     113                 :         std::size_t* bytes_out  = nullptr;
     114                 : 
     115                 :         // Buffer data (copied from buffer_param at submission time)
     116                 :         iovec iovecs[max_buffers];
     117                 :         int iovec_count = 0;
     118                 : 
     119                 :         // Result storage (populated by worker thread)
     120                 :         int errn                    = 0;
     121                 :         std::size_t bytes_transferred = 0;
     122                 :         bool is_read                = false;
     123                 : 
     124                 :         // Thread coordination
     125                 :         std::atomic<bool> cancelled{false};
     126                 :         std::optional<std::stop_callback<canceller>> stop_cb;
     127                 : 
     128                 :         /// Prevents use-after-free when file is closed with pending ops.
     129                 :         std::shared_ptr<void> impl_ref;
     130                 : 
     131              52 :         file_op() = default;
     132                 : 
     133              12 :         void reset() noexcept
     134                 :         {
     135              12 :             iovec_count       = 0;
     136              12 :             errn              = 0;
     137              12 :             bytes_transferred = 0;
     138              12 :             is_read           = false;
     139              12 :             cancelled.store(false, std::memory_order_relaxed);
     140              12 :             stop_cb.reset();
     141              12 :             impl_ref.reset();
     142              12 :             ec_out    = nullptr;
     143              12 :             bytes_out = nullptr;
     144              12 :         }
     145                 : 
     146                 :         void operator()() override;
     147                 :         void destroy() override;
     148                 : 
     149             141 :         void request_cancel() noexcept
     150                 :         {
     151             141 :             cancelled.store(true, std::memory_order_release);
     152             141 :         }
     153                 : 
     154              12 :         void start(std::stop_token const& token)
     155                 :         {
     156              12 :             cancelled.store(false, std::memory_order_release);
     157              12 :             stop_cb.reset();
     158              12 :             if (token.stop_possible())
     159               1 :                 stop_cb.emplace(token, canceller{this});
     160              12 :         }
     161                 :     };
     162                 : 
     163                 :     /** Pool work item for thread pool dispatch. */
     164                 :     struct pool_op : pool_work_item
     165                 :     {
     166                 :         posix_stream_file* file_ = nullptr;
     167                 :         std::shared_ptr<posix_stream_file> ref_;
     168                 :     };
     169                 : 
     170                 :     explicit posix_stream_file(posix_stream_file_service& svc) noexcept;
     171                 : 
     172                 :     // -- io_stream::implementation --
     173                 : 
     174                 :     std::coroutine_handle<> read_some(
     175                 :         std::coroutine_handle<>,
     176                 :         capy::executor_ref,
     177                 :         buffer_param,
     178                 :         std::stop_token,
     179                 :         std::error_code*,
     180                 :         std::size_t*) override;
     181                 : 
     182                 :     std::coroutine_handle<> write_some(
     183                 :         std::coroutine_handle<>,
     184                 :         capy::executor_ref,
     185                 :         buffer_param,
     186                 :         std::stop_token,
     187                 :         std::error_code*,
     188                 :         std::size_t*) override;
     189                 : 
     190                 :     // -- stream_file::implementation --
     191                 : 
     192              81 :     native_handle_type native_handle() const noexcept override
     193                 :     {
     194              81 :         return fd_;
     195                 :     }
     196                 : 
     197              70 :     void cancel() noexcept override
     198                 :     {
     199              70 :         read_op_.request_cancel();
     200              70 :         write_op_.request_cancel();
     201              70 :     }
     202                 : 
     203                 :     std::uint64_t size() const override;
     204                 :     void resize(std::uint64_t new_size) override;
     205                 :     void sync_data() override;
     206                 :     void sync_all() override;
     207                 :     native_handle_type release() override;
     208                 :     void assign(native_handle_type handle) override;
     209                 :     std::uint64_t seek(std::int64_t offset, file_base::seek_basis origin) override;
     210                 : 
     211                 :     // -- Internal --
     212                 : 
     213                 :     /** Open the file and store the fd. */
     214                 :     std::error_code open_file(
     215                 :         std::filesystem::path const& path, file_base::flags mode);
     216                 : 
     217                 :     /** Close the file descriptor. */
     218                 :     void close_file() noexcept;
     219                 : 
     220                 : private:
     221                 :     posix_stream_file_service& svc_;
     222                 :     int fd_ = -1;
     223                 :     std::uint64_t offset_ = 0;
     224                 : 
     225                 :     file_op read_op_;
     226                 :     file_op write_op_;
     227                 :     pool_op read_pool_op_;
     228                 :     pool_op write_pool_op_;
     229                 : 
     230                 :     static void do_read_work(pool_work_item*) noexcept;
     231                 :     static void do_write_work(pool_work_item*) noexcept;
     232                 : };
     233                 : 
     234                 : // ---------------------------------------------------------------------------
     235                 : // Inline implementation
     236                 : // ---------------------------------------------------------------------------
     237                 : 
     238                 : inline
     239              26 : posix_stream_file::posix_stream_file(posix_stream_file_service& svc) noexcept
     240              26 :     : svc_(svc)
     241                 : {
     242              26 : }
     243                 : 
     244                 : inline std::error_code
     245              19 : posix_stream_file::open_file(
     246                 :     std::filesystem::path const& path, file_base::flags mode)
     247                 : {
     248              19 :     close_file();
     249                 : 
     250              19 :     int oflags = 0;
     251                 : 
     252                 :     // Access mode
     253              19 :     unsigned access = static_cast<unsigned>(mode) & 3u;
     254              19 :     if (access == static_cast<unsigned>(file_base::read_write))
     255               2 :         oflags |= O_RDWR;
     256              17 :     else if (access == static_cast<unsigned>(file_base::write_only))
     257               7 :         oflags |= O_WRONLY;
     258                 :     else
     259              10 :         oflags |= O_RDONLY;
     260                 : 
     261                 :     // Creation flags
     262              19 :     if ((mode & file_base::create) != file_base::flags(0))
     263               6 :         oflags |= O_CREAT;
     264              19 :     if ((mode & file_base::exclusive) != file_base::flags(0))
     265               1 :         oflags |= O_EXCL;
     266              19 :     if ((mode & file_base::truncate) != file_base::flags(0))
     267               5 :         oflags |= O_TRUNC;
     268              19 :     if ((mode & file_base::append) != file_base::flags(0))
     269               1 :         oflags |= O_APPEND;
     270              19 :     if ((mode & file_base::sync_all_on_write) != file_base::flags(0))
     271 MIS           0 :         oflags |= O_SYNC;
     272                 : 
     273 HIT          19 :     int fd = ::open(path.c_str(), oflags, 0666);
     274              19 :     if (fd < 0)
     275               2 :         return make_err(errno);
     276                 : 
     277              17 :     fd_     = fd;
     278              17 :     offset_ = 0;
     279                 : 
     280                 :     // Append mode: position at end-of-file (preadv/pwritev use
     281                 :     // explicit offsets, so O_APPEND alone is not sufficient).
     282              17 :     if ((mode & file_base::append) != file_base::flags(0))
     283                 :     {
     284                 :         struct stat st;
     285               1 :         if (::fstat(fd, &st) < 0)
     286                 :         {
     287 MIS           0 :             int err = errno;
     288               0 :             ::close(fd);
     289               0 :             fd_ = -1;
     290               0 :             return make_err(err);
     291                 :         }
     292 HIT           1 :         offset_ = static_cast<std::uint64_t>(st.st_size);
     293                 :     }
     294                 : 
     295                 : #ifdef POSIX_FADV_SEQUENTIAL
     296              17 :     ::posix_fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
     297                 : #endif
     298                 : 
     299              17 :     return {};
     300                 : }
     301                 : 
     302                 : inline void
     303              89 : posix_stream_file::close_file() noexcept
     304                 : {
     305              89 :     if (fd_ >= 0)
     306                 :     {
     307              17 :         ::close(fd_);
     308              17 :         fd_ = -1;
     309                 :     }
     310              89 : }
     311                 : 
     312                 : inline std::uint64_t
     313               3 : posix_stream_file::size() const
     314                 : {
     315                 :     struct stat st;
     316               3 :     if (::fstat(fd_, &st) < 0)
     317 MIS           0 :         throw_system_error(make_err(errno), "stream_file::size");
     318 HIT           3 :     return static_cast<std::uint64_t>(st.st_size);
     319                 : }
     320                 : 
     321                 : inline void
     322               1 : posix_stream_file::resize(std::uint64_t new_size)
     323                 : {
     324               1 :     if (new_size > static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
     325 MIS           0 :         throw_system_error(make_err(EOVERFLOW), "stream_file::resize");
     326 HIT           1 :     if (::ftruncate(fd_, static_cast<off_t>(new_size)) < 0)
     327 MIS           0 :         throw_system_error(make_err(errno), "stream_file::resize");
     328 HIT           1 : }
     329                 : 
     330                 : inline void
     331               1 : posix_stream_file::sync_data()
     332                 : {
     333                 : #if BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
     334               1 :     if (::fdatasync(fd_) < 0)
     335                 : #else // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
     336                 :     if (::fsync(fd_) < 0)
     337                 : #endif // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
     338 MIS           0 :         throw_system_error(make_err(errno), "stream_file::sync_data");
     339 HIT           1 : }
     340                 : 
     341                 : inline void
     342               1 : posix_stream_file::sync_all()
     343                 : {
     344               1 :     if (::fsync(fd_) < 0)
     345 MIS           0 :         throw_system_error(make_err(errno), "stream_file::sync_all");
     346 HIT           1 : }
     347                 : 
     348                 : inline native_handle_type
     349               1 : posix_stream_file::release()
     350                 : {
     351               1 :     int fd = fd_;
     352               1 :     fd_ = -1;
     353               1 :     offset_ = 0;
     354               1 :     return fd;
     355                 : }
     356                 : 
     357                 : inline void
     358               1 : posix_stream_file::assign(native_handle_type handle)
     359                 : {
     360               1 :     close_file();
     361               1 :     fd_ = handle;
     362               1 :     offset_ = 0;
     363               1 : }
     364                 : 
     365                 : inline std::uint64_t
     366               7 : posix_stream_file::seek(std::int64_t offset, file_base::seek_basis origin)
     367                 : {
     368                 :     // We track offset_ ourselves (not the kernel fd offset)
     369                 :     // because preadv/pwritev use explicit offsets.
     370                 :     std::int64_t new_pos;
     371                 : 
     372               7 :     if (origin == file_base::seek_set)
     373                 :     {
     374               3 :         new_pos = offset;
     375                 :     }
     376               4 :     else if (origin == file_base::seek_cur)
     377                 :     {
     378               2 :         new_pos = static_cast<std::int64_t>(offset_) + offset;
     379                 :     }
     380                 :     else
     381                 :     {
     382                 :         struct stat st;
     383               2 :         if (::fstat(fd_, &st) < 0)
     384 MIS           0 :             throw_system_error(make_err(errno), "stream_file::seek");
     385 HIT           2 :         new_pos = st.st_size + offset;
     386                 :     }
     387                 : 
     388               7 :     if (new_pos < 0)
     389               3 :         throw_system_error(make_err(EINVAL), "stream_file::seek");
     390               4 :     if (new_pos > static_cast<std::int64_t>(std::numeric_limits<off_t>::max()))
     391 MIS           0 :         throw_system_error(make_err(EOVERFLOW), "stream_file::seek");
     392                 : 
     393 HIT           4 :     offset_ = static_cast<std::uint64_t>(new_pos);
     394                 : 
     395               4 :     return offset_;
     396                 : }
     397                 : 
     398                 : // -- file_op completion handler --
     399                 : // (read_some, write_some, do_read_work, do_write_work are
     400                 : //  defined in posix_stream_file_service.hpp after the service)
     401                 : 
     402                 : inline void
     403              12 : posix_stream_file::file_op::operator()()
     404                 : {
     405              12 :     stop_cb.reset();
     406                 : 
     407              12 :     bool const was_cancelled = cancelled.load(std::memory_order_acquire);
     408                 : 
     409              12 :     if (ec_out)
     410                 :     {
     411              12 :         if (was_cancelled)
     412               1 :             *ec_out = capy::error::canceled;
     413              11 :         else if (errn != 0)
     414 MIS           0 :             *ec_out = make_err(errn);
     415 HIT          11 :         else if (is_read && bytes_transferred == 0)
     416               1 :             *ec_out = capy::error::eof;
     417                 :         else
     418              10 :             *ec_out = {};
     419                 :     }
     420                 : 
     421              12 :     if (bytes_out)
     422              12 :         *bytes_out = was_cancelled ? 0 : bytes_transferred;
     423                 : 
     424                 :     // Move impl_ref to a local so members remain valid through
     425                 :     // dispatch — impl_ref may be the last shared_ptr keeping
     426                 :     // the parent posix_stream_file (which embeds this file_op) alive.
     427              12 :     auto prevent_destroy = std::move(impl_ref);
     428              12 :     ex.on_work_finished();
     429              12 :     cont_op.cont.h = h;
     430              12 :     dispatch_coro(ex, cont_op.cont).resume();
     431              12 : }
     432                 : 
     433                 : inline void
     434 MIS           0 : posix_stream_file::file_op::destroy()
     435                 : {
     436               0 :     stop_cb.reset();
     437               0 :     auto local_ex = ex;
     438               0 :     impl_ref.reset();
     439               0 :     local_ex.on_work_finished();
     440               0 : }
     441                 : 
     442                 : } // namespace boost::corosio::detail
     443                 : 
     444                 : #endif // BOOST_COROSIO_POSIX
     445                 : 
     446                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
        

Generated by: LCOV version 2.3