include/boost/corosio/native/detail/posix/posix_random_access_file_service.hpp

87.0% Lines (120/138) 100.0% List of functions (15/15)
posix_random_access_file_service.hpp
f(x) Functions (15)
Function Calls Lines Blocks
boost::corosio::detail::posix_random_access_file_service::posix_random_access_file_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :32 515x 100.0% 88.0% boost::corosio::detail::posix_random_access_file_service::~posix_random_access_file_service() :39 1030x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::construct() :46 24x 100.0% 71.0% boost::corosio::detail::posix_random_access_file_service::destroy(boost::corosio::io_object::implementation*) :60 24x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::close(boost::corosio::io_object::handle&) :68 42x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::open_file(boost::corosio::random_access_file::implementation&, std::filesystem::__cxx11::path const&, boost::corosio::file_base::flags) :78 19x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::shutdown() :87 515x 62.5% 70.0% boost::corosio::detail::posix_random_access_file_service::destroy_impl(boost::corosio::detail::posix_random_access_file&) :99 24x 100.0% 67.0% boost::corosio::detail::posix_random_access_file_service::post(boost::corosio::detail::scheduler_op*) :106 126x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::pool() :121 126x 100.0% 100.0% boost::corosio::detail::posix_random_access_file_service::get_or_create_pool(boost::capy::execution_context&) :127 515x 80.0% 67.0% boost::corosio::detail::get_random_access_file_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :147 515x 100.0% 100.0% boost::corosio::detail::posix_random_access_file::read_some_at(unsigned long, std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :157 116x 83.3% 79.0% boost::corosio::detail::posix_random_access_file::write_some_at(unsigned long, std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :212 10x 83.3% 79.0% boost::corosio::detail::posix_random_access_file::raf_op::do_work(boost::corosio::detail::pool_work_item*) :269 126x 83.3% 76.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_POSIX
16
17 #include <boost/corosio/native/detail/posix/posix_random_access_file.hpp>
18 #include <boost/corosio/detail/random_access_file_service.hpp>
19 #include <boost/corosio/detail/thread_pool.hpp>
20
21 #include <limits>
22 #include <mutex>
23 #include <unordered_map>
24
25 namespace boost::corosio::detail {
26
27 /** Random-access file service for POSIX backends. */
28 class BOOST_COROSIO_DECL posix_random_access_file_service final
29 : public random_access_file_service
30 {
31 public:
32 515x posix_random_access_file_service(
33 capy::execution_context& ctx, scheduler& sched)
34 1030x : sched_(&sched)
35 515x , pool_(get_or_create_pool(ctx))
36 {
37 515x }
38
39 1030x ~posix_random_access_file_service() override = default;
40
41 posix_random_access_file_service(
42 posix_random_access_file_service const&) = delete;
43 posix_random_access_file_service& operator=(
44 posix_random_access_file_service const&) = delete;
45
46 24x io_object::implementation* construct() override
47 {
48 24x auto ptr = std::make_shared<posix_random_access_file>(*this);
49 24x auto* impl = ptr.get();
50
51 {
52 24x std::lock_guard<std::mutex> lock(mutex_);
53 24x file_list_.push_back(impl);
54 24x file_ptrs_[impl] = std::move(ptr);
55 24x }
56
57 24x return impl;
58 24x }
59
60 24x void destroy(io_object::implementation* p) override
61 {
62 24x auto& impl = static_cast<posix_random_access_file&>(*p);
63 24x impl.cancel();
64 24x impl.close_file();
65 24x destroy_impl(impl);
66 24x }
67
68 42x void close(io_object::handle& h) override
69 {
70 42x if (h.get())
71 {
72 42x auto& impl = static_cast<posix_random_access_file&>(*h.get());
73 42x impl.cancel();
74 42x impl.close_file();
75 }
76 42x }
77
78 19x std::error_code open_file(
79 random_access_file::implementation& impl,
80 std::filesystem::path const& path,
81 file_base::flags mode) override
82 {
83 19x return static_cast<posix_random_access_file&>(impl).open_file(
84 19x path, mode);
85 }
86
87 515x void shutdown() override
88 {
89 515x std::lock_guard<std::mutex> lock(mutex_);
90 515x for (auto* impl = file_list_.pop_front(); impl != nullptr;
91 impl = file_list_.pop_front())
92 {
93 impl->cancel();
94 impl->close_file();
95 }
96 515x file_ptrs_.clear();
97 515x }
98
99 24x void destroy_impl(posix_random_access_file& impl)
100 {
101 24x std::lock_guard<std::mutex> lock(mutex_);
102 24x file_list_.remove(&impl);
103 24x file_ptrs_.erase(&impl);
104 24x }
105
106 126x void post(scheduler_op* op)
107 {
108 126x sched_->post(op);
109 126x }
110
111 void work_started() noexcept
112 {
113 sched_->work_started();
114 }
115
116 void work_finished() noexcept
117 {
118 sched_->work_finished();
119 }
120
121 126x thread_pool& pool() noexcept
122 {
123 126x return pool_;
124 }
125
126 private:
127 515x static thread_pool& get_or_create_pool(capy::execution_context& ctx)
128 {
129 515x auto* p = ctx.find_service<thread_pool>();
130 515x if (p)
131 515x return *p;
132 return ctx.make_service<thread_pool>();
133 }
134
135 scheduler* sched_;
136 thread_pool& pool_;
137 std::mutex mutex_;
138 intrusive_list<posix_random_access_file> file_list_;
139 std::unordered_map<
140 posix_random_access_file*,
141 std::shared_ptr<posix_random_access_file>>
142 file_ptrs_;
143 };
144
145 /** Get or create the random-access file service for the given context. */
146 inline posix_random_access_file_service&
147 515x get_random_access_file_service(capy::execution_context& ctx, scheduler& sched)
148 {
149 515x return ctx.make_service<posix_random_access_file_service>(sched);
150 }
151
152 // ---------------------------------------------------------------------------
153 // posix_random_access_file inline implementations (require complete service)
154 // ---------------------------------------------------------------------------
155
156 inline std::coroutine_handle<>
157 116x posix_random_access_file::read_some_at(
158 std::uint64_t offset,
159 std::coroutine_handle<> h,
160 capy::executor_ref ex,
161 buffer_param param,
162 std::stop_token token,
163 std::error_code* ec,
164 std::size_t* bytes_out)
165 {
166 116x capy::mutable_buffer bufs[max_buffers];
167 116x auto count = param.copy_to(bufs, max_buffers);
168
169 116x if (count == 0)
170 {
171 *ec = {};
172 *bytes_out = 0;
173 return h;
174 }
175
176 116x auto* op = new raf_op();
177 116x op->is_read = true;
178 116x op->offset = offset;
179
180 116x op->iovec_count = static_cast<int>(count);
181 232x for (int i = 0; i < op->iovec_count; ++i)
182 {
183 116x op->iovecs[i].iov_base = bufs[i].data();
184 116x op->iovecs[i].iov_len = bufs[i].size();
185 }
186
187 116x op->h = h;
188 116x op->ex = ex;
189 116x op->ec_out = ec;
190 116x op->bytes_out = bytes_out;
191 116x op->file_ = this;
192 116x op->file_ref = this->shared_from_this();
193 116x op->start(token);
194
195 116x op->ex.on_work_started();
196
197 {
198 116x std::lock_guard<std::mutex> lock(ops_mutex_);
199 116x outstanding_ops_.push_back(op);
200 116x }
201
202 116x static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
203 116x if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
204 {
205 op->cancelled.store(true, std::memory_order_release);
206 svc_.post(static_cast<scheduler_op*>(op));
207 }
208 116x return std::noop_coroutine();
209 }
210
211 inline std::coroutine_handle<>
212 10x posix_random_access_file::write_some_at(
213 std::uint64_t offset,
214 std::coroutine_handle<> h,
215 capy::executor_ref ex,
216 buffer_param param,
217 std::stop_token token,
218 std::error_code* ec,
219 std::size_t* bytes_out)
220 {
221 10x capy::mutable_buffer bufs[max_buffers];
222 10x auto count = param.copy_to(bufs, max_buffers);
223
224 10x if (count == 0)
225 {
226 *ec = {};
227 *bytes_out = 0;
228 return h;
229 }
230
231 10x auto* op = new raf_op();
232 10x op->is_read = false;
233 10x op->offset = offset;
234
235 10x op->iovec_count = static_cast<int>(count);
236 20x for (int i = 0; i < op->iovec_count; ++i)
237 {
238 10x op->iovecs[i].iov_base = bufs[i].data();
239 10x op->iovecs[i].iov_len = bufs[i].size();
240 }
241
242 10x op->h = h;
243 10x op->ex = ex;
244 10x op->ec_out = ec;
245 10x op->bytes_out = bytes_out;
246 10x op->file_ = this;
247 10x op->file_ref = this->shared_from_this();
248 10x op->start(token);
249
250 10x op->ex.on_work_started();
251
252 {
253 10x std::lock_guard<std::mutex> lock(ops_mutex_);
254 10x outstanding_ops_.push_back(op);
255 10x }
256
257 10x static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
258 10x if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
259 {
260 op->cancelled.store(true, std::memory_order_release);
261 svc_.post(static_cast<scheduler_op*>(op));
262 }
263 10x return std::noop_coroutine();
264 }
265
266 // -- raf_op thread-pool work function --
267
268 inline void
269 126x posix_random_access_file::raf_op::do_work(pool_work_item* w) noexcept
270 {
271 126x auto* op = static_cast<raf_op*>(w);
272 126x auto* self = op->file_;
273
274 126x if (op->cancelled.load(std::memory_order_acquire))
275 {
276 1x op->errn = ECANCELED;
277 1x op->bytes_transferred = 0;
278 }
279 250x else if (op->offset >
280 125x static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
281 {
282 op->errn = EOVERFLOW;
283 op->bytes_transferred = 0;
284 }
285 else
286 {
287 ssize_t n;
288 125x if (op->is_read)
289 {
290 do
291 {
292 230x n = ::preadv(self->fd_, op->iovecs, op->iovec_count,
293 115x static_cast<off_t>(op->offset));
294 }
295 115x while (n < 0 && errno == EINTR);
296 }
297 else
298 {
299 do
300 {
301 20x n = ::pwritev(self->fd_, op->iovecs, op->iovec_count,
302 10x static_cast<off_t>(op->offset));
303 }
304 10x while (n < 0 && errno == EINTR);
305 }
306
307 125x if (n >= 0)
308 {
309 125x op->errn = 0;
310 125x op->bytes_transferred = static_cast<std::size_t>(n);
311 }
312 else
313 {
314 op->errn = errno;
315 op->bytes_transferred = 0;
316 }
317 }
318
319 126x self->svc_.post(static_cast<scheduler_op*>(op));
320 126x }
321
322 } // namespace boost::corosio::detail
323
324 #endif // BOOST_COROSIO_POSIX
325
326 #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
327