LCOV - code coverage report
Current view: top level - corosio/native/detail/posix - posix_stream_file_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 84.6 % 143 121 22
Test Date: 2026-03-26 16:40:44 Functions: 100.0 % 17 17

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_POSIX
      16                 : 
      17                 : #include <boost/corosio/native/detail/posix/posix_stream_file.hpp>
      18                 : #include <boost/corosio/detail/file_service.hpp>
      19                 : #include <boost/corosio/detail/thread_pool.hpp>
      20                 : 
      21                 : #include <mutex>
      22                 : #include <unordered_map>
      23                 : 
      24                 : namespace boost::corosio::detail {
      25                 : 
      26                 : /** Stream file service for POSIX backends.
      27                 : 
      28                 :     Owns all posix_stream_file instances. Thread lifecycle is
      29                 :     managed by the thread_pool service (shared with resolver).
      30                 : */
      31                 : class BOOST_COROSIO_DECL posix_stream_file_service final
      32                 :     : public file_service
      33                 : {
      34                 : public:
      35 HIT         515 :     posix_stream_file_service(
      36                 :         capy::execution_context& ctx, scheduler& sched)
      37            1030 :         : sched_(&sched)
      38             515 :         , pool_(get_or_create_pool(ctx))
      39                 :     {
      40             515 :     }
      41                 : 
      42            1030 :     ~posix_stream_file_service() override = default;
      43                 : 
      44                 :     posix_stream_file_service(posix_stream_file_service const&)            = delete;
      45                 :     posix_stream_file_service& operator=(posix_stream_file_service const&) = delete;
      46                 : 
      47              26 :     io_object::implementation* construct() override
      48                 :     {
      49              26 :         auto ptr   = std::make_shared<posix_stream_file>(*this);
      50              26 :         auto* impl = ptr.get();
      51                 : 
      52                 :         {
      53              26 :             std::lock_guard<std::mutex> lock(mutex_);
      54              26 :             file_list_.push_back(impl);
      55              26 :             file_ptrs_[impl] = std::move(ptr);
      56              26 :         }
      57                 : 
      58              26 :         return impl;
      59              26 :     }
      60                 : 
      61              26 :     void destroy(io_object::implementation* p) override
      62                 :     {
      63              26 :         auto& impl = static_cast<posix_stream_file&>(*p);
      64              26 :         impl.cancel();
      65              26 :         impl.close_file();
      66              26 :         destroy_impl(impl);
      67              26 :     }
      68                 : 
      69              43 :     void close(io_object::handle& h) override
      70                 :     {
      71              43 :         if (h.get())
      72                 :         {
      73              43 :             auto& impl = static_cast<posix_stream_file&>(*h.get());
      74              43 :             impl.cancel();
      75              43 :             impl.close_file();
      76                 :         }
      77              43 :     }
      78                 : 
      79              19 :     std::error_code open_file(
      80                 :         stream_file::implementation& impl,
      81                 :         std::filesystem::path const& path,
      82                 :         file_base::flags mode) override
      83                 :     {
      84              19 :         return static_cast<posix_stream_file&>(impl).open_file(path, mode);
      85                 :     }
      86                 : 
      87             515 :     void shutdown() override
      88                 :     {
      89             515 :         std::lock_guard<std::mutex> lock(mutex_);
      90             515 :         for (auto* impl = file_list_.pop_front(); impl != nullptr;
      91 MIS           0 :              impl       = file_list_.pop_front())
      92                 :         {
      93               0 :             impl->cancel();
      94               0 :             impl->close_file();
      95                 :         }
      96 HIT         515 :         file_ptrs_.clear();
      97             515 :     }
      98                 : 
      99              26 :     void destroy_impl(posix_stream_file& impl)
     100                 :     {
     101              26 :         std::lock_guard<std::mutex> lock(mutex_);
     102              26 :         file_list_.remove(&impl);
     103              26 :         file_ptrs_.erase(&impl);
     104              26 :     }
     105                 : 
     106              12 :     void post(scheduler_op* op)
     107                 :     {
     108              12 :         sched_->post(op);
     109              12 :     }
     110                 : 
     111                 :     void work_started() noexcept
     112                 :     {
     113                 :         sched_->work_started();
     114                 :     }
     115                 : 
     116                 :     void work_finished() noexcept
     117                 :     {
     118                 :         sched_->work_finished();
     119                 :     }
     120                 : 
     121              12 :     thread_pool& pool() noexcept
     122                 :     {
     123              12 :         return pool_;
     124                 :     }
     125                 : 
     126                 : private:
     127             515 :     static thread_pool& get_or_create_pool(capy::execution_context& ctx)
     128                 :     {
     129             515 :         auto* p = ctx.find_service<thread_pool>();
     130             515 :         if (p)
     131             515 :             return *p;
     132 MIS           0 :         return ctx.make_service<thread_pool>();
     133                 :     }
     134                 : 
     135                 :     scheduler* sched_;
     136                 :     thread_pool& pool_;
     137                 :     std::mutex mutex_;
     138                 :     intrusive_list<posix_stream_file> file_list_;
     139                 :     std::unordered_map<posix_stream_file*, std::shared_ptr<posix_stream_file>>
     140                 :         file_ptrs_;
     141                 : };
     142                 : 
     143                 : /** Get or create the stream file service for the given context. */
     144                 : inline posix_stream_file_service&
     145 HIT         515 : get_stream_file_service(capy::execution_context& ctx, scheduler& sched)
     146                 : {
     147             515 :     return ctx.make_service<posix_stream_file_service>(sched);
     148                 : }
     149                 : 
     150                 : // ---------------------------------------------------------------------------
     151                 : // posix_stream_file inline implementations (require complete service type)
     152                 : // ---------------------------------------------------------------------------
     153                 : 
     154                 : inline std::coroutine_handle<>
     155               6 : posix_stream_file::read_some(
     156                 :     std::coroutine_handle<> h,
     157                 :     capy::executor_ref ex,
     158                 :     buffer_param param,
     159                 :     std::stop_token token,
     160                 :     std::error_code* ec,
     161                 :     std::size_t* bytes_out)
     162                 : {
     163               6 :     auto& op = read_op_;
     164               6 :     op.reset();
     165               6 :     op.is_read = true;
     166                 : 
     167               6 :     capy::mutable_buffer bufs[max_buffers];
     168               6 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
     169                 : 
     170               6 :     if (op.iovec_count == 0)
     171                 :     {
     172 MIS           0 :         *ec        = {};
     173               0 :         *bytes_out = 0;
     174               0 :         op.cont_op.cont.h = h;
     175               0 :         return dispatch_coro(ex, op.cont_op.cont);
     176                 :     }
     177                 : 
     178 HIT          12 :     for (int i = 0; i < op.iovec_count; ++i)
     179                 :     {
     180               6 :         op.iovecs[i].iov_base = bufs[i].data();
     181               6 :         op.iovecs[i].iov_len  = bufs[i].size();
     182                 :     }
     183                 : 
     184               6 :     op.h         = h;
     185               6 :     op.ex        = ex;
     186               6 :     op.ec_out    = ec;
     187               6 :     op.bytes_out = bytes_out;
     188               6 :     op.start(token);
     189                 : 
     190               6 :     op.ex.on_work_started();
     191                 : 
     192               6 :     read_pool_op_.file_ = this;
     193               6 :     read_pool_op_.ref_  = this->shared_from_this();
     194               6 :     read_pool_op_.func_ = &posix_stream_file::do_read_work;
     195               6 :     if (!svc_.pool().post(&read_pool_op_))
     196                 :     {
     197 MIS           0 :         op.impl_ref = std::move(read_pool_op_.ref_);
     198               0 :         op.cancelled.store(true, std::memory_order_release);
     199               0 :         svc_.post(&read_op_);
     200                 :     }
     201 HIT           6 :     return std::noop_coroutine();
     202                 : }
     203                 : 
     204                 : inline void
     205               6 : posix_stream_file::do_read_work(pool_work_item* w) noexcept
     206                 : {
     207               6 :     auto* pw   = static_cast<pool_op*>(w);
     208               6 :     auto* self = pw->file_;
     209               6 :     auto& op   = self->read_op_;
     210                 : 
     211               6 :     if (!op.cancelled.load(std::memory_order_acquire))
     212                 :     {
     213                 :         ssize_t n;
     214                 :         do
     215                 :         {
     216              10 :             n = ::preadv(self->fd_, op.iovecs, op.iovec_count,
     217               5 :                          static_cast<off_t>(self->offset_));
     218                 :         }
     219               5 :         while (n < 0 && errno == EINTR);
     220                 : 
     221               5 :         if (n >= 0)
     222                 :         {
     223               5 :             op.errn              = 0;
     224               5 :             op.bytes_transferred = static_cast<std::size_t>(n);
     225               5 :             self->offset_ += static_cast<std::uint64_t>(n);
     226                 :         }
     227                 :         else
     228                 :         {
     229 MIS           0 :             op.errn              = errno;
     230               0 :             op.bytes_transferred = 0;
     231                 :         }
     232                 :     }
     233                 : 
     234 HIT           6 :     op.impl_ref = std::move(pw->ref_);
     235               6 :     self->svc_.post(&op);
     236               6 : }
     237                 : 
     238                 : inline std::coroutine_handle<>
     239               6 : posix_stream_file::write_some(
     240                 :     std::coroutine_handle<> h,
     241                 :     capy::executor_ref ex,
     242                 :     buffer_param param,
     243                 :     std::stop_token token,
     244                 :     std::error_code* ec,
     245                 :     std::size_t* bytes_out)
     246                 : {
     247               6 :     auto& op = write_op_;
     248               6 :     op.reset();
     249               6 :     op.is_read = false;
     250                 : 
     251               6 :     capy::mutable_buffer bufs[max_buffers];
     252               6 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
     253                 : 
     254               6 :     if (op.iovec_count == 0)
     255                 :     {
     256 MIS           0 :         *ec        = {};
     257               0 :         *bytes_out = 0;
     258               0 :         op.cont_op.cont.h = h;
     259               0 :         return dispatch_coro(ex, op.cont_op.cont);
     260                 :     }
     261                 : 
     262 HIT          12 :     for (int i = 0; i < op.iovec_count; ++i)
     263                 :     {
     264               6 :         op.iovecs[i].iov_base = bufs[i].data();
     265               6 :         op.iovecs[i].iov_len  = bufs[i].size();
     266                 :     }
     267                 : 
     268               6 :     op.h         = h;
     269               6 :     op.ex        = ex;
     270               6 :     op.ec_out    = ec;
     271               6 :     op.bytes_out = bytes_out;
     272               6 :     op.start(token);
     273                 : 
     274               6 :     op.ex.on_work_started();
     275                 : 
     276               6 :     write_pool_op_.file_ = this;
     277               6 :     write_pool_op_.ref_  = this->shared_from_this();
     278               6 :     write_pool_op_.func_ = &posix_stream_file::do_write_work;
     279               6 :     if (!svc_.pool().post(&write_pool_op_))
     280                 :     {
     281 MIS           0 :         op.impl_ref = std::move(write_pool_op_.ref_);
     282               0 :         op.cancelled.store(true, std::memory_order_release);
     283               0 :         svc_.post(&write_op_);
     284                 :     }
     285 HIT           6 :     return std::noop_coroutine();
     286                 : }
     287                 : 
     288                 : inline void
     289               6 : posix_stream_file::do_write_work(pool_work_item* w) noexcept
     290                 : {
     291               6 :     auto* pw   = static_cast<pool_op*>(w);
     292               6 :     auto* self = pw->file_;
     293               6 :     auto& op   = self->write_op_;
     294                 : 
     295               6 :     if (!op.cancelled.load(std::memory_order_acquire))
     296                 :     {
     297                 :         ssize_t n;
     298                 :         do
     299                 :         {
     300              12 :             n = ::pwritev(self->fd_, op.iovecs, op.iovec_count,
     301               6 :                           static_cast<off_t>(self->offset_));
     302                 :         }
     303               6 :         while (n < 0 && errno == EINTR);
     304                 : 
     305               6 :         if (n >= 0)
     306                 :         {
     307               6 :             op.errn              = 0;
     308               6 :             op.bytes_transferred = static_cast<std::size_t>(n);
     309               6 :             self->offset_ += static_cast<std::uint64_t>(n);
     310                 :         }
     311                 :         else
     312                 :         {
     313 MIS           0 :             op.errn              = errno;
     314               0 :             op.bytes_transferred = 0;
     315                 :         }
     316                 :     }
     317                 : 
     318 HIT           6 :     op.impl_ref = std::move(pw->ref_);
     319               6 :     self->svc_.post(&op);
     320               6 : }
     321                 : 
     322                 : } // namespace boost::corosio::detail
     323                 : 
     324                 : #endif // BOOST_COROSIO_POSIX
     325                 : 
     326                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
        

Generated by: LCOV version 2.3