LCOV - code coverage report
Current view: top level - corosio/native/detail/posix - posix_random_access_file_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 87.0 % 138 120 18
Test Date: 2026-03-26 16:40:44 Functions: 100.0 % 16 16

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_POSIX
      16                 : 
      17                 : #include <boost/corosio/native/detail/posix/posix_random_access_file.hpp>
      18                 : #include <boost/corosio/detail/random_access_file_service.hpp>
      19                 : #include <boost/corosio/detail/thread_pool.hpp>
      20                 : 
      21                 : #include <limits>
      22                 : #include <mutex>
      23                 : #include <unordered_map>
      24                 : 
      25                 : namespace boost::corosio::detail {
      26                 : 
      27                 : /** Random-access file service for POSIX backends. */
      28                 : class BOOST_COROSIO_DECL posix_random_access_file_service final
      29                 :     : public random_access_file_service
      30                 : {
      31                 : public:
      32 HIT         515 :     posix_random_access_file_service(
      33                 :         capy::execution_context& ctx, scheduler& sched)
      34            1030 :         : sched_(&sched)
      35             515 :         , pool_(get_or_create_pool(ctx))
      36                 :     {
      37             515 :     }
      38                 : 
      39            1030 :     ~posix_random_access_file_service() override = default;
      40                 : 
      41                 :     posix_random_access_file_service(
      42                 :         posix_random_access_file_service const&)            = delete;
      43                 :     posix_random_access_file_service& operator=(
      44                 :         posix_random_access_file_service const&) = delete;
      45                 : 
      46              24 :     io_object::implementation* construct() override
      47                 :     {
      48              24 :         auto ptr   = std::make_shared<posix_random_access_file>(*this);
      49              24 :         auto* impl = ptr.get();
      50                 : 
      51                 :         {
      52              24 :             std::lock_guard<std::mutex> lock(mutex_);
      53              24 :             file_list_.push_back(impl);
      54              24 :             file_ptrs_[impl] = std::move(ptr);
      55              24 :         }
      56                 : 
      57              24 :         return impl;
      58              24 :     }
      59                 : 
      60              24 :     void destroy(io_object::implementation* p) override
      61                 :     {
      62              24 :         auto& impl = static_cast<posix_random_access_file&>(*p);
      63              24 :         impl.cancel();
      64              24 :         impl.close_file();
      65              24 :         destroy_impl(impl);
      66              24 :     }
      67                 : 
      68              42 :     void close(io_object::handle& h) override
      69                 :     {
      70              42 :         if (h.get())
      71                 :         {
      72              42 :             auto& impl = static_cast<posix_random_access_file&>(*h.get());
      73              42 :             impl.cancel();
      74              42 :             impl.close_file();
      75                 :         }
      76              42 :     }
      77                 : 
      78              19 :     std::error_code open_file(
      79                 :         random_access_file::implementation& impl,
      80                 :         std::filesystem::path const& path,
      81                 :         file_base::flags mode) override
      82                 :     {
      83              19 :         return static_cast<posix_random_access_file&>(impl).open_file(
      84              19 :             path, mode);
      85                 :     }
      86                 : 
      87             515 :     void shutdown() override
      88                 :     {
      89             515 :         std::lock_guard<std::mutex> lock(mutex_);
      90             515 :         for (auto* impl = file_list_.pop_front(); impl != nullptr;
      91 MIS           0 :              impl       = file_list_.pop_front())
      92                 :         {
      93               0 :             impl->cancel();
      94               0 :             impl->close_file();
      95                 :         }
      96 HIT         515 :         file_ptrs_.clear();
      97             515 :     }
      98                 : 
      99              24 :     void destroy_impl(posix_random_access_file& impl)
     100                 :     {
     101              24 :         std::lock_guard<std::mutex> lock(mutex_);
     102              24 :         file_list_.remove(&impl);
     103              24 :         file_ptrs_.erase(&impl);
     104              24 :     }
     105                 : 
     106             126 :     void post(scheduler_op* op)
     107                 :     {
     108             126 :         sched_->post(op);
     109             126 :     }
     110                 : 
     111                 :     void work_started() noexcept
     112                 :     {
     113                 :         sched_->work_started();
     114                 :     }
     115                 : 
     116                 :     void work_finished() noexcept
     117                 :     {
     118                 :         sched_->work_finished();
     119                 :     }
     120                 : 
     121             126 :     thread_pool& pool() noexcept
     122                 :     {
     123             126 :         return pool_;
     124                 :     }
     125                 : 
     126                 : private:
     127             515 :     static thread_pool& get_or_create_pool(capy::execution_context& ctx)
     128                 :     {
     129             515 :         auto* p = ctx.find_service<thread_pool>();
     130             515 :         if (p)
     131             515 :             return *p;
     132 MIS           0 :         return ctx.make_service<thread_pool>();
     133                 :     }
     134                 : 
     135                 :     scheduler* sched_;
     136                 :     thread_pool& pool_;
     137                 :     std::mutex mutex_;
     138                 :     intrusive_list<posix_random_access_file> file_list_;
     139                 :     std::unordered_map<
     140                 :         posix_random_access_file*,
     141                 :         std::shared_ptr<posix_random_access_file>>
     142                 :         file_ptrs_;
     143                 : };
     144                 : 
     145                 : /** Get or create the random-access file service for the given context. */
     146                 : inline posix_random_access_file_service&
     147 HIT         515 : get_random_access_file_service(capy::execution_context& ctx, scheduler& sched)
     148                 : {
     149             515 :     return ctx.make_service<posix_random_access_file_service>(sched);
     150                 : }
     151                 : 
     152                 : // ---------------------------------------------------------------------------
     153                 : // posix_random_access_file inline implementations (require complete service)
     154                 : // ---------------------------------------------------------------------------
     155                 : 
     156                 : inline std::coroutine_handle<>
     157             116 : posix_random_access_file::read_some_at(
     158                 :     std::uint64_t offset,
     159                 :     std::coroutine_handle<> h,
     160                 :     capy::executor_ref ex,
     161                 :     buffer_param param,
     162                 :     std::stop_token token,
     163                 :     std::error_code* ec,
     164                 :     std::size_t* bytes_out)
     165                 : {
     166             116 :     capy::mutable_buffer bufs[max_buffers];
     167             116 :     auto count = param.copy_to(bufs, max_buffers);
     168                 : 
     169             116 :     if (count == 0)
     170                 :     {
     171 MIS           0 :         *ec        = {};
     172               0 :         *bytes_out = 0;
     173               0 :         return h;
     174                 :     }
     175                 : 
     176 HIT         116 :     auto* op = new raf_op();
     177             116 :     op->is_read = true;
     178             116 :     op->offset  = offset;
     179                 : 
     180             116 :     op->iovec_count = static_cast<int>(count);
     181             232 :     for (int i = 0; i < op->iovec_count; ++i)
     182                 :     {
     183             116 :         op->iovecs[i].iov_base = bufs[i].data();
     184             116 :         op->iovecs[i].iov_len  = bufs[i].size();
     185                 :     }
     186                 : 
     187             116 :     op->h         = h;
     188             116 :     op->ex        = ex;
     189             116 :     op->ec_out    = ec;
     190             116 :     op->bytes_out = bytes_out;
     191             116 :     op->file_     = this;
     192             116 :     op->file_ref  = this->shared_from_this();
     193             116 :     op->start(token);
     194                 : 
     195             116 :     op->ex.on_work_started();
     196                 : 
     197                 :     {
     198             116 :         std::lock_guard<std::mutex> lock(ops_mutex_);
     199             116 :         outstanding_ops_.push_back(op);
     200             116 :     }
     201                 : 
     202             116 :     static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
     203             116 :     if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
     204                 :     {
     205 MIS           0 :         op->cancelled.store(true, std::memory_order_release);
     206               0 :         svc_.post(static_cast<scheduler_op*>(op));
     207                 :     }
     208 HIT         116 :     return std::noop_coroutine();
     209                 : }
     210                 : 
     211                 : inline std::coroutine_handle<>
     212              10 : posix_random_access_file::write_some_at(
     213                 :     std::uint64_t offset,
     214                 :     std::coroutine_handle<> h,
     215                 :     capy::executor_ref ex,
     216                 :     buffer_param param,
     217                 :     std::stop_token token,
     218                 :     std::error_code* ec,
     219                 :     std::size_t* bytes_out)
     220                 : {
     221              10 :     capy::mutable_buffer bufs[max_buffers];
     222              10 :     auto count = param.copy_to(bufs, max_buffers);
     223                 : 
     224              10 :     if (count == 0)
     225                 :     {
     226 MIS           0 :         *ec        = {};
     227               0 :         *bytes_out = 0;
     228               0 :         return h;
     229                 :     }
     230                 : 
     231 HIT          10 :     auto* op = new raf_op();
     232              10 :     op->is_read = false;
     233              10 :     op->offset  = offset;
     234                 : 
     235              10 :     op->iovec_count = static_cast<int>(count);
     236              20 :     for (int i = 0; i < op->iovec_count; ++i)
     237                 :     {
     238              10 :         op->iovecs[i].iov_base = bufs[i].data();
     239              10 :         op->iovecs[i].iov_len  = bufs[i].size();
     240                 :     }
     241                 : 
     242              10 :     op->h         = h;
     243              10 :     op->ex        = ex;
     244              10 :     op->ec_out    = ec;
     245              10 :     op->bytes_out = bytes_out;
     246              10 :     op->file_     = this;
     247              10 :     op->file_ref  = this->shared_from_this();
     248              10 :     op->start(token);
     249                 : 
     250              10 :     op->ex.on_work_started();
     251                 : 
     252                 :     {
     253              10 :         std::lock_guard<std::mutex> lock(ops_mutex_);
     254              10 :         outstanding_ops_.push_back(op);
     255              10 :     }
     256                 : 
     257              10 :     static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
     258              10 :     if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
     259                 :     {
     260 MIS           0 :         op->cancelled.store(true, std::memory_order_release);
     261               0 :         svc_.post(static_cast<scheduler_op*>(op));
     262                 :     }
     263 HIT          10 :     return std::noop_coroutine();
     264                 : }
     265                 : 
     266                 : // -- raf_op thread-pool work function --
     267                 : 
     268                 : inline void
     269             126 : posix_random_access_file::raf_op::do_work(pool_work_item* w) noexcept
     270                 : {
     271             126 :     auto* op   = static_cast<raf_op*>(w);
     272             126 :     auto* self = op->file_;
     273                 : 
     274             126 :     if (op->cancelled.load(std::memory_order_acquire))
     275                 :     {
     276               1 :         op->errn              = ECANCELED;
     277               1 :         op->bytes_transferred = 0;
     278                 :     }
     279             250 :     else if (op->offset >
     280             125 :              static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
     281                 :     {
     282 MIS           0 :         op->errn              = EOVERFLOW;
     283               0 :         op->bytes_transferred = 0;
     284                 :     }
     285                 :     else
     286                 :     {
     287                 :         ssize_t n;
     288 HIT         125 :         if (op->is_read)
     289                 :         {
     290                 :             do
     291                 :             {
     292             230 :                 n = ::preadv(self->fd_, op->iovecs, op->iovec_count,
     293             115 :                              static_cast<off_t>(op->offset));
     294                 :             }
     295             115 :             while (n < 0 && errno == EINTR);
     296                 :         }
     297                 :         else
     298                 :         {
     299                 :             do
     300                 :             {
     301              20 :                 n = ::pwritev(self->fd_, op->iovecs, op->iovec_count,
     302              10 :                               static_cast<off_t>(op->offset));
     303                 :             }
     304              10 :             while (n < 0 && errno == EINTR);
     305                 :         }
     306                 : 
     307             125 :         if (n >= 0)
     308                 :         {
     309             125 :             op->errn              = 0;
     310             125 :             op->bytes_transferred = static_cast<std::size_t>(n);
     311                 :         }
     312                 :         else
     313                 :         {
     314 MIS           0 :             op->errn              = errno;
     315               0 :             op->bytes_transferred = 0;
     316                 :         }
     317                 :     }
     318                 : 
     319 HIT         126 :     self->svc_.post(static_cast<scheduler_op*>(op));
     320             126 : }
     321                 : 
     322                 : } // namespace boost::corosio::detail
     323                 : 
     324                 : #endif // BOOST_COROSIO_POSIX
     325                 : 
     326                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
        

Generated by: LCOV version 2.3