TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_POSIX
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/random_access_file.hpp>
19 : #include <boost/corosio/file_base.hpp>
20 : #include <boost/corosio/detail/intrusive.hpp>
21 : #include <boost/corosio/detail/scheduler_op.hpp>
22 : #include <boost/corosio/detail/thread_pool.hpp>
23 : #include <boost/corosio/detail/scheduler.hpp>
24 : #include <boost/corosio/detail/buffer_param.hpp>
25 : #include <boost/corosio/native/detail/make_err.hpp>
26 : #include <boost/capy/ex/executor_ref.hpp>
27 : #include <boost/capy/error.hpp>
28 : #include <boost/capy/buffers.hpp>
29 :
30 : #include <atomic>
31 : #include <coroutine>
32 : #include <cstddef>
33 : #include <cstdint>
34 : #include <filesystem>
35 : #include <limits>
36 : #include <memory>
37 : #include <mutex>
38 : #include <optional>
39 : #include <stop_token>
40 : #include <system_error>
41 :
42 : #include <errno.h>
43 : #include <fcntl.h>
44 : #include <sys/stat.h>
45 : #include <sys/uio.h>
46 : #include <unistd.h>
47 :
48 : /*
49 : POSIX Random-Access File Implementation
50 : ========================================
51 :
52 : Each async read/write heap-allocates an raf_op that serves
53 : as both the thread-pool work item and the scheduler completion
54 : op. This allows unlimited concurrent operations on the same
55 : file object, matching Asio's per-op allocation model.
56 :
57 : The raf_op self-deletes on completion or shutdown.
58 : */
59 :
60 : namespace boost::corosio::detail {
61 :
62 : struct scheduler;
63 : class posix_random_access_file_service;
64 :
65 : /** Random-access file implementation for POSIX backends. */
66 : class posix_random_access_file final
67 : : public random_access_file::implementation
68 : , public std::enable_shared_from_this<posix_random_access_file>
69 : , public intrusive_list<posix_random_access_file>::node
70 : {
71 : friend class posix_random_access_file_service;
72 :
73 : public:
74 : static constexpr std::size_t max_buffers = 16;
75 :
76 : /** Per-operation state, heap-allocated for each async call.
77 :
78 : Inherits from scheduler_op (for scheduler completion) and
79 : pool_work_item (for thread-pool dispatch). Linked into the
80 : file's outstanding_ops_ list for cancellation tracking.
81 : */
82 : struct raf_op final
83 : : scheduler_op
84 : , pool_work_item
85 : , intrusive_list<raf_op>::node
86 : {
87 : struct canceller
88 : {
89 : raf_op* op;
90 HIT 1 : void operator()() const noexcept
91 : {
92 1 : op->cancelled.store(true, std::memory_order_release);
93 1 : }
94 : };
95 :
96 : std::coroutine_handle<> h;
97 : capy::executor_ref ex;
98 :
99 : std::error_code* ec_out = nullptr;
100 : std::size_t* bytes_out = nullptr;
101 :
102 : iovec iovecs[max_buffers];
103 : int iovec_count = 0;
104 : std::uint64_t offset = 0;
105 :
106 : int errn = 0;
107 : std::size_t bytes_transferred = 0;
108 : bool is_read = false;
109 :
110 : std::atomic<bool> cancelled{false};
111 : std::optional<std::stop_callback<canceller>> stop_cb;
112 :
113 : posix_random_access_file* file_ = nullptr;
114 : std::shared_ptr<posix_random_access_file> file_ref;
115 :
116 126 : void start(std::stop_token const& token)
117 : {
118 126 : cancelled.store(false, std::memory_order_release);
119 126 : stop_cb.reset();
120 126 : if (token.stop_possible())
121 1 : stop_cb.emplace(token, canceller{this});
122 126 : }
123 :
124 : void operator()() override;
125 : void destroy() override;
126 :
127 : /// Thread-pool work function: executes preadv/pwritev.
128 : static void do_work(pool_work_item*) noexcept;
129 : };
130 :
131 : explicit posix_random_access_file(
132 : posix_random_access_file_service& svc) noexcept;
133 :
134 : // -- random_access_file::implementation --
135 :
136 : std::coroutine_handle<> read_some_at(
137 : std::uint64_t offset,
138 : std::coroutine_handle<>,
139 : capy::executor_ref,
140 : buffer_param,
141 : std::stop_token,
142 : std::error_code*,
143 : std::size_t*) override;
144 :
145 : std::coroutine_handle<> write_some_at(
146 : std::uint64_t offset,
147 : std::coroutine_handle<>,
148 : capy::executor_ref,
149 : buffer_param,
150 : std::stop_token,
151 : std::error_code*,
152 : std::size_t*) override;
153 :
154 194 : native_handle_type native_handle() const noexcept override
155 : {
156 194 : return fd_;
157 : }
158 :
159 67 : void cancel() noexcept override
160 : {
161 67 : std::lock_guard<std::mutex> lock(ops_mutex_);
162 67 : outstanding_ops_.for_each([](raf_op* op) {
163 MIS 0 : op->cancelled.store(true, std::memory_order_release);
164 0 : });
165 HIT 67 : }
166 :
167 : std::uint64_t size() const override;
168 : void resize(std::uint64_t new_size) override;
169 : void sync_data() override;
170 : void sync_all() override;
171 : native_handle_type release() override;
172 : void assign(native_handle_type handle) override;
173 :
174 : std::error_code open_file(
175 : std::filesystem::path const& path, file_base::flags mode);
176 : void close_file() noexcept;
177 :
178 : private:
179 : posix_random_access_file_service& svc_;
180 : int fd_ = -1;
181 : std::mutex ops_mutex_;
182 : intrusive_list<raf_op> outstanding_ops_;
183 : };
184 :
185 : // ---------------------------------------------------------------------------
186 : // Inline implementation
187 : // ---------------------------------------------------------------------------
188 :
189 : inline
190 24 : posix_random_access_file::posix_random_access_file(
191 24 : posix_random_access_file_service& svc) noexcept
192 24 : : svc_(svc)
193 : {
194 24 : }
195 :
196 : inline std::error_code
197 19 : posix_random_access_file::open_file(
198 : std::filesystem::path const& path, file_base::flags mode)
199 : {
200 19 : close_file();
201 :
202 19 : int oflags = 0;
203 :
204 19 : unsigned access = static_cast<unsigned>(mode) & 3u;
205 19 : if (access == static_cast<unsigned>(file_base::read_write))
206 5 : oflags |= O_RDWR;
207 14 : else if (access == static_cast<unsigned>(file_base::write_only))
208 2 : oflags |= O_WRONLY;
209 : else
210 12 : oflags |= O_RDONLY;
211 :
212 19 : if ((mode & file_base::create) != file_base::flags(0))
213 5 : oflags |= O_CREAT;
214 19 : if ((mode & file_base::exclusive) != file_base::flags(0))
215 MIS 0 : oflags |= O_EXCL;
216 HIT 19 : if ((mode & file_base::truncate) != file_base::flags(0))
217 5 : oflags |= O_TRUNC;
218 19 : if ((mode & file_base::sync_all_on_write) != file_base::flags(0))
219 MIS 0 : oflags |= O_SYNC;
220 : // Note: no O_APPEND for random access files
221 :
222 HIT 19 : int fd = ::open(path.c_str(), oflags, 0666);
223 19 : if (fd < 0)
224 1 : return make_err(errno);
225 :
226 18 : fd_ = fd;
227 :
228 : #ifdef POSIX_FADV_RANDOM
229 18 : ::posix_fadvise(fd_, 0, 0, POSIX_FADV_RANDOM);
230 : #endif
231 :
232 18 : return {};
233 : }
234 :
235 : inline void
236 86 : posix_random_access_file::close_file() noexcept
237 : {
238 86 : if (fd_ >= 0)
239 : {
240 18 : ::close(fd_);
241 18 : fd_ = -1;
242 : }
243 86 : }
244 :
245 : inline std::uint64_t
246 2 : posix_random_access_file::size() const
247 : {
248 : struct stat st;
249 2 : if (::fstat(fd_, &st) < 0)
250 MIS 0 : throw_system_error(make_err(errno), "random_access_file::size");
251 HIT 2 : return static_cast<std::uint64_t>(st.st_size);
252 : }
253 :
254 : inline void
255 2 : posix_random_access_file::resize(std::uint64_t new_size)
256 : {
257 2 : if (new_size > static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
258 MIS 0 : throw_system_error(make_err(EOVERFLOW), "random_access_file::resize");
259 HIT 2 : if (::ftruncate(fd_, static_cast<off_t>(new_size)) < 0)
260 MIS 0 : throw_system_error(make_err(errno), "random_access_file::resize");
261 HIT 2 : }
262 :
263 : inline void
264 1 : posix_random_access_file::sync_data()
265 : {
266 : #if BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
267 1 : if (::fdatasync(fd_) < 0)
268 : #else // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
269 : if (::fsync(fd_) < 0)
270 : #endif // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
271 MIS 0 : throw_system_error(make_err(errno), "random_access_file::sync_data");
272 HIT 1 : }
273 :
274 : inline void
275 1 : posix_random_access_file::sync_all()
276 : {
277 1 : if (::fsync(fd_) < 0)
278 MIS 0 : throw_system_error(make_err(errno), "random_access_file::sync_all");
279 HIT 1 : }
280 :
281 : inline native_handle_type
282 1 : posix_random_access_file::release()
283 : {
284 1 : int fd = fd_;
285 1 : fd_ = -1;
286 1 : return fd;
287 : }
288 :
289 : inline void
290 1 : posix_random_access_file::assign(native_handle_type handle)
291 : {
292 1 : close_file();
293 1 : fd_ = handle;
294 1 : }
295 :
296 : // read_some_at, write_some_at are defined in
297 : // posix_random_access_file_service.hpp after the service.
298 :
299 : // -- raf_op completion handler (scheduler thread) --
300 :
301 : inline void
302 126 : posix_random_access_file::raf_op::operator()()
303 : {
304 126 : stop_cb.reset();
305 :
306 126 : bool const was_cancelled = cancelled.load(std::memory_order_acquire);
307 :
308 126 : if (ec_out)
309 : {
310 126 : if (was_cancelled)
311 1 : *ec_out = capy::error::canceled;
312 125 : else if (errn != 0)
313 MIS 0 : *ec_out = make_err(errn);
314 HIT 125 : else if (is_read && bytes_transferred == 0)
315 1 : *ec_out = capy::error::eof;
316 : else
317 124 : *ec_out = {};
318 : }
319 :
320 126 : if (bytes_out)
321 126 : *bytes_out = was_cancelled ? 0 : bytes_transferred;
322 :
323 : {
324 126 : std::lock_guard<std::mutex> lock(file_->ops_mutex_);
325 126 : file_->outstanding_ops_.remove(this);
326 126 : }
327 :
328 126 : file_ref.reset();
329 :
330 126 : auto coro = h;
331 126 : ex.on_work_finished();
332 126 : delete this;
333 126 : coro.resume();
334 126 : }
335 :
336 : // -- raf_op shutdown cleanup --
337 :
338 : inline void
339 MIS 0 : posix_random_access_file::raf_op::destroy()
340 : {
341 0 : stop_cb.reset();
342 : {
343 0 : std::lock_guard<std::mutex> lock(file_->ops_mutex_);
344 0 : file_->outstanding_ops_.remove(this);
345 0 : }
346 0 : file_ref.reset();
347 0 : ex.on_work_finished();
348 0 : delete this;
349 0 : }
350 :
351 : } // namespace boost::corosio::detail
352 :
353 : #endif // BOOST_COROSIO_POSIX
354 :
355 : #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_HPP
|