include/boost/corosio/native/detail/posix/posix_random_access_file.hpp

82.1% Lines (87/106) 87.5% List of functions (14/16)
posix_random_access_file.hpp
f(x) Functions (16)
Function Calls Lines Blocks
boost::corosio::detail::posix_random_access_file::raf_op::canceller::operator()() const :90 1x 100.0% 100.0% boost::corosio::detail::posix_random_access_file::raf_op::start(std::stop_token const&) :116 126x 100.0% 100.0% boost::corosio::detail::posix_random_access_file::native_handle() const :154 194x 100.0% 100.0% boost::corosio::detail::posix_random_access_file::cancel() :159 67x 100.0% 100.0% boost::corosio::detail::posix_random_access_file::cancel()::{lambda(boost::corosio::detail::posix_random_access_file::raf_op*)#1}::operator()(boost::corosio::detail::posix_random_access_file::raf_op*) const :162 0 50.0% 0.0% boost::corosio::detail::posix_random_access_file::posix_random_access_file(boost::corosio::detail::posix_random_access_file_service&) :190 24x 100.0% 100.0% boost::corosio::detail::posix_random_access_file::open_file(std::filesystem::__cxx11::path const&, boost::corosio::file_base::flags) :197 19x 91.3% 93.0% boost::corosio::detail::posix_random_access_file::close_file() :236 86x 100.0% 100.0% boost::corosio::detail::posix_random_access_file::size() const :246 2x 75.0% 62.0% boost::corosio::detail::posix_random_access_file::resize(unsigned long) :255 2x 66.7% 55.0% boost::corosio::detail::posix_random_access_file::sync_data() :264 1x 75.0% 67.0% boost::corosio::detail::posix_random_access_file::sync_all() :275 1x 75.0% 67.0% boost::corosio::detail::posix_random_access_file::release() :282 1x 100.0% 100.0% boost::corosio::detail::posix_random_access_file::assign(int) :290 1x 100.0% 100.0% boost::corosio::detail::posix_random_access_file::raf_op::operator()() :302 126x 95.5% 94.0% boost::corosio::detail::posix_random_access_file::raf_op::destroy() :339 0 0.0% 0.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_POSIX
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/random_access_file.hpp>
19 #include <boost/corosio/file_base.hpp>
20 #include <boost/corosio/detail/intrusive.hpp>
21 #include <boost/corosio/detail/scheduler_op.hpp>
22 #include <boost/corosio/detail/thread_pool.hpp>
23 #include <boost/corosio/detail/scheduler.hpp>
24 #include <boost/corosio/detail/buffer_param.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/capy/ex/executor_ref.hpp>
27 #include <boost/capy/error.hpp>
28 #include <boost/capy/buffers.hpp>
29
30 #include <atomic>
31 #include <coroutine>
32 #include <cstddef>
33 #include <cstdint>
34 #include <filesystem>
35 #include <limits>
36 #include <memory>
37 #include <mutex>
38 #include <optional>
39 #include <stop_token>
40 #include <system_error>
41
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/stat.h>
45 #include <sys/uio.h>
46 #include <unistd.h>
47
48 /*
49 POSIX Random-Access File Implementation
50 ========================================
51
52 Each async read/write heap-allocates an raf_op that serves
53 as both the thread-pool work item and the scheduler completion
54 op. This allows unlimited concurrent operations on the same
55 file object, matching Asio's per-op allocation model.
56
57 The raf_op self-deletes on completion or shutdown.
58 */
59
60 namespace boost::corosio::detail {
61
62 struct scheduler;
63 class posix_random_access_file_service;
64
65 /** Random-access file implementation for POSIX backends. */
66 class posix_random_access_file final
67 : public random_access_file::implementation
68 , public std::enable_shared_from_this<posix_random_access_file>
69 , public intrusive_list<posix_random_access_file>::node
70 {
71 friend class posix_random_access_file_service;
72
73 public:
74 static constexpr std::size_t max_buffers = 16;
75
76 /** Per-operation state, heap-allocated for each async call.
77
78 Inherits from scheduler_op (for scheduler completion) and
79 pool_work_item (for thread-pool dispatch). Linked into the
80 file's outstanding_ops_ list for cancellation tracking.
81 */
82 struct raf_op final
83 : scheduler_op
84 , pool_work_item
85 , intrusive_list<raf_op>::node
86 {
87 struct canceller
88 {
89 raf_op* op;
90 1x void operator()() const noexcept
91 {
92 1x op->cancelled.store(true, std::memory_order_release);
93 1x }
94 };
95
96 std::coroutine_handle<> h;
97 capy::executor_ref ex;
98
99 std::error_code* ec_out = nullptr;
100 std::size_t* bytes_out = nullptr;
101
102 iovec iovecs[max_buffers];
103 int iovec_count = 0;
104 std::uint64_t offset = 0;
105
106 int errn = 0;
107 std::size_t bytes_transferred = 0;
108 bool is_read = false;
109
110 std::atomic<bool> cancelled{false};
111 std::optional<std::stop_callback<canceller>> stop_cb;
112
113 posix_random_access_file* file_ = nullptr;
114 std::shared_ptr<posix_random_access_file> file_ref;
115
116 126x void start(std::stop_token const& token)
117 {
118 126x cancelled.store(false, std::memory_order_release);
119 126x stop_cb.reset();
120 126x if (token.stop_possible())
121 1x stop_cb.emplace(token, canceller{this});
122 126x }
123
124 void operator()() override;
125 void destroy() override;
126
127 /// Thread-pool work function: executes preadv/pwritev.
128 static void do_work(pool_work_item*) noexcept;
129 };
130
131 explicit posix_random_access_file(
132 posix_random_access_file_service& svc) noexcept;
133
134 // -- random_access_file::implementation --
135
136 std::coroutine_handle<> read_some_at(
137 std::uint64_t offset,
138 std::coroutine_handle<>,
139 capy::executor_ref,
140 buffer_param,
141 std::stop_token,
142 std::error_code*,
143 std::size_t*) override;
144
145 std::coroutine_handle<> write_some_at(
146 std::uint64_t offset,
147 std::coroutine_handle<>,
148 capy::executor_ref,
149 buffer_param,
150 std::stop_token,
151 std::error_code*,
152 std::size_t*) override;
153
154 194x native_handle_type native_handle() const noexcept override
155 {
156 194x return fd_;
157 }
158
159 67x void cancel() noexcept override
160 {
161 67x std::lock_guard<std::mutex> lock(ops_mutex_);
162 67x outstanding_ops_.for_each([](raf_op* op) {
163 op->cancelled.store(true, std::memory_order_release);
164 });
165 67x }
166
167 std::uint64_t size() const override;
168 void resize(std::uint64_t new_size) override;
169 void sync_data() override;
170 void sync_all() override;
171 native_handle_type release() override;
172 void assign(native_handle_type handle) override;
173
174 std::error_code open_file(
175 std::filesystem::path const& path, file_base::flags mode);
176 void close_file() noexcept;
177
178 private:
179 posix_random_access_file_service& svc_;
180 int fd_ = -1;
181 std::mutex ops_mutex_;
182 intrusive_list<raf_op> outstanding_ops_;
183 };
184
185 // ---------------------------------------------------------------------------
186 // Inline implementation
187 // ---------------------------------------------------------------------------
188
189 inline
190 24x posix_random_access_file::posix_random_access_file(
191 24x posix_random_access_file_service& svc) noexcept
192 24x : svc_(svc)
193 {
194 24x }
195
196 inline std::error_code
197 19x posix_random_access_file::open_file(
198 std::filesystem::path const& path, file_base::flags mode)
199 {
200 19x close_file();
201
202 19x int oflags = 0;
203
204 19x unsigned access = static_cast<unsigned>(mode) & 3u;
205 19x if (access == static_cast<unsigned>(file_base::read_write))
206 5x oflags |= O_RDWR;
207 14x else if (access == static_cast<unsigned>(file_base::write_only))
208 2x oflags |= O_WRONLY;
209 else
210 12x oflags |= O_RDONLY;
211
212 19x if ((mode & file_base::create) != file_base::flags(0))
213 5x oflags |= O_CREAT;
214 19x if ((mode & file_base::exclusive) != file_base::flags(0))
215 oflags |= O_EXCL;
216 19x if ((mode & file_base::truncate) != file_base::flags(0))
217 5x oflags |= O_TRUNC;
218 19x if ((mode & file_base::sync_all_on_write) != file_base::flags(0))
219 oflags |= O_SYNC;
220 // Note: no O_APPEND for random access files
221
222 19x int fd = ::open(path.c_str(), oflags, 0666);
223 19x if (fd < 0)
224 1x return make_err(errno);
225
226 18x fd_ = fd;
227
228 #ifdef POSIX_FADV_RANDOM
229 18x ::posix_fadvise(fd_, 0, 0, POSIX_FADV_RANDOM);
230 #endif
231
232 18x return {};
233 }
234
235 inline void
236 86x posix_random_access_file::close_file() noexcept
237 {
238 86x if (fd_ >= 0)
239 {
240 18x ::close(fd_);
241 18x fd_ = -1;
242 }
243 86x }
244
245 inline std::uint64_t
246 2x posix_random_access_file::size() const
247 {
248 struct stat st;
249 2x if (::fstat(fd_, &st) < 0)
250 throw_system_error(make_err(errno), "random_access_file::size");
251 2x return static_cast<std::uint64_t>(st.st_size);
252 }
253
254 inline void
255 2x posix_random_access_file::resize(std::uint64_t new_size)
256 {
257 2x if (new_size > static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
258 throw_system_error(make_err(EOVERFLOW), "random_access_file::resize");
259 2x if (::ftruncate(fd_, static_cast<off_t>(new_size)) < 0)
260 throw_system_error(make_err(errno), "random_access_file::resize");
261 2x }
262
263 inline void
264 1x posix_random_access_file::sync_data()
265 {
266 #if BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
267 1x if (::fdatasync(fd_) < 0)
268 #else // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
269 if (::fsync(fd_) < 0)
270 #endif // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
271 throw_system_error(make_err(errno), "random_access_file::sync_data");
272 1x }
273
274 inline void
275 1x posix_random_access_file::sync_all()
276 {
277 1x if (::fsync(fd_) < 0)
278 throw_system_error(make_err(errno), "random_access_file::sync_all");
279 1x }
280
281 inline native_handle_type
282 1x posix_random_access_file::release()
283 {
284 1x int fd = fd_;
285 1x fd_ = -1;
286 1x return fd;
287 }
288
289 inline void
290 1x posix_random_access_file::assign(native_handle_type handle)
291 {
292 1x close_file();
293 1x fd_ = handle;
294 1x }
295
296 // read_some_at, write_some_at are defined in
297 // posix_random_access_file_service.hpp after the service.
298
299 // -- raf_op completion handler (scheduler thread) --
300
301 inline void
302 126x posix_random_access_file::raf_op::operator()()
303 {
304 126x stop_cb.reset();
305
306 126x bool const was_cancelled = cancelled.load(std::memory_order_acquire);
307
308 126x if (ec_out)
309 {
310 126x if (was_cancelled)
311 1x *ec_out = capy::error::canceled;
312 125x else if (errn != 0)
313 *ec_out = make_err(errn);
314 125x else if (is_read && bytes_transferred == 0)
315 1x *ec_out = capy::error::eof;
316 else
317 124x *ec_out = {};
318 }
319
320 126x if (bytes_out)
321 126x *bytes_out = was_cancelled ? 0 : bytes_transferred;
322
323 {
324 126x std::lock_guard<std::mutex> lock(file_->ops_mutex_);
325 126x file_->outstanding_ops_.remove(this);
326 126x }
327
328 126x file_ref.reset();
329
330 126x auto coro = h;
331 126x ex.on_work_finished();
332 126x delete this;
333 126x coro.resume();
334 126x }
335
336 // -- raf_op shutdown cleanup --
337
338 inline void
339 posix_random_access_file::raf_op::destroy()
340 {
341 stop_cb.reset();
342 {
343 std::lock_guard<std::mutex> lock(file_->ops_mutex_);
344 file_->outstanding_ops_.remove(this);
345 }
346 file_ref.reset();
347 ex.on_work_finished();
348 delete this;
349 }
350
351 } // namespace boost::corosio::detail
352
353 #endif // BOOST_COROSIO_POSIX
354
355 #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_HPP
356