TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_POSIX
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/stream_file.hpp>
19 : #include <boost/corosio/file_base.hpp>
20 : #include <boost/corosio/detail/intrusive.hpp>
21 : #include <boost/corosio/detail/dispatch_coro.hpp>
22 : #include <boost/corosio/detail/scheduler_op.hpp>
23 : #include <boost/corosio/detail/continuation_op.hpp>
24 : #include <boost/corosio/detail/thread_pool.hpp>
25 : #include <boost/corosio/detail/scheduler.hpp>
26 : #include <boost/corosio/detail/buffer_param.hpp>
27 : #include <boost/corosio/native/detail/make_err.hpp>
28 : #include <boost/capy/ex/executor_ref.hpp>
29 : #include <boost/capy/error.hpp>
30 : #include <boost/capy/buffers.hpp>
31 :
32 : #include <atomic>
33 : #include <coroutine>
34 : #include <cstddef>
35 : #include <cstdint>
36 : #include <filesystem>
37 : #include <limits>
38 : #include <memory>
39 : #include <optional>
40 : #include <stop_token>
41 : #include <system_error>
42 :
43 : #include <errno.h>
44 : #include <fcntl.h>
45 : #include <sys/stat.h>
46 : #include <sys/uio.h>
47 : #include <unistd.h>
48 :
49 : /*
50 : POSIX Stream File Implementation
51 : =================================
52 :
53 : Regular files cannot be monitored by epoll/kqueue/select — the kernel
54 : always reports them as ready. Blocking I/O (pread/pwrite) is dispatched
55 : to a shared thread pool, with completion posted back to the scheduler.
56 :
57 : This follows the same pattern as posix_resolver: pool_work_item for
58 : dispatch, scheduler_op for completion, shared_from_this for lifetime.
59 :
60 : Completion Flow
61 : ---------------
62 : 1. read_some() sets up file_read_op, posts to thread pool
63 : 2. Pool thread runs preadv() (blocking)
64 : 3. Pool thread stores results, posts scheduler_op to scheduler
65 : 4. Scheduler invokes op() which resumes the coroutine
66 :
67 : Single-Inflight Constraint
68 : --------------------------
69 : Only one asynchronous operation may be in flight at a time on a
70 : given file object. Concurrent read and write is not supported
71 : because both share offset_ without synchronization.
72 : */
73 :
74 : namespace boost::corosio::detail {
75 :
76 : struct scheduler;
77 : class posix_stream_file_service;
78 :
79 : /** Stream file implementation for POSIX backends.
80 :
81 : Each instance contains embedded operation objects (read_op_, write_op_)
82 : that are reused across calls. This avoids per-operation heap allocation.
83 : */
84 : class posix_stream_file final
85 : : public stream_file::implementation
86 : , public std::enable_shared_from_this<posix_stream_file>
87 : , public intrusive_list<posix_stream_file>::node
88 : {
89 : friend class posix_stream_file_service;
90 :
91 : public:
92 : static constexpr std::size_t max_buffers = 16;
93 :
94 : /** Operation state for a single file read or write. */
95 : struct file_op : scheduler_op
96 : {
97 : struct canceller
98 : {
99 : file_op* op;
100 HIT 1 : void operator()() const noexcept
101 : {
102 1 : op->request_cancel();
103 1 : }
104 : };
105 :
106 : // Coroutine state
107 : std::coroutine_handle<> h;
108 : detail::continuation_op cont_op;
109 : capy::executor_ref ex;
110 :
111 : // Output pointers
112 : std::error_code* ec_out = nullptr;
113 : std::size_t* bytes_out = nullptr;
114 :
115 : // Buffer data (copied from buffer_param at submission time)
116 : iovec iovecs[max_buffers];
117 : int iovec_count = 0;
118 :
119 : // Result storage (populated by worker thread)
120 : int errn = 0;
121 : std::size_t bytes_transferred = 0;
122 : bool is_read = false;
123 :
124 : // Thread coordination
125 : std::atomic<bool> cancelled{false};
126 : std::optional<std::stop_callback<canceller>> stop_cb;
127 :
128 : /// Prevents use-after-free when file is closed with pending ops.
129 : std::shared_ptr<void> impl_ref;
130 :
131 52 : file_op() = default;
132 :
133 12 : void reset() noexcept
134 : {
135 12 : iovec_count = 0;
136 12 : errn = 0;
137 12 : bytes_transferred = 0;
138 12 : is_read = false;
139 12 : cancelled.store(false, std::memory_order_relaxed);
140 12 : stop_cb.reset();
141 12 : impl_ref.reset();
142 12 : ec_out = nullptr;
143 12 : bytes_out = nullptr;
144 12 : }
145 :
146 : void operator()() override;
147 : void destroy() override;
148 :
149 141 : void request_cancel() noexcept
150 : {
151 141 : cancelled.store(true, std::memory_order_release);
152 141 : }
153 :
154 12 : void start(std::stop_token const& token)
155 : {
156 12 : cancelled.store(false, std::memory_order_release);
157 12 : stop_cb.reset();
158 12 : if (token.stop_possible())
159 1 : stop_cb.emplace(token, canceller{this});
160 12 : }
161 : };
162 :
163 : /** Pool work item for thread pool dispatch. */
164 : struct pool_op : pool_work_item
165 : {
166 : posix_stream_file* file_ = nullptr;
167 : std::shared_ptr<posix_stream_file> ref_;
168 : };
169 :
170 : explicit posix_stream_file(posix_stream_file_service& svc) noexcept;
171 :
172 : // -- io_stream::implementation --
173 :
174 : std::coroutine_handle<> read_some(
175 : std::coroutine_handle<>,
176 : capy::executor_ref,
177 : buffer_param,
178 : std::stop_token,
179 : std::error_code*,
180 : std::size_t*) override;
181 :
182 : std::coroutine_handle<> write_some(
183 : std::coroutine_handle<>,
184 : capy::executor_ref,
185 : buffer_param,
186 : std::stop_token,
187 : std::error_code*,
188 : std::size_t*) override;
189 :
190 : // -- stream_file::implementation --
191 :
192 81 : native_handle_type native_handle() const noexcept override
193 : {
194 81 : return fd_;
195 : }
196 :
197 70 : void cancel() noexcept override
198 : {
199 70 : read_op_.request_cancel();
200 70 : write_op_.request_cancel();
201 70 : }
202 :
203 : std::uint64_t size() const override;
204 : void resize(std::uint64_t new_size) override;
205 : void sync_data() override;
206 : void sync_all() override;
207 : native_handle_type release() override;
208 : void assign(native_handle_type handle) override;
209 : std::uint64_t seek(std::int64_t offset, file_base::seek_basis origin) override;
210 :
211 : // -- Internal --
212 :
213 : /** Open the file and store the fd. */
214 : std::error_code open_file(
215 : std::filesystem::path const& path, file_base::flags mode);
216 :
217 : /** Close the file descriptor. */
218 : void close_file() noexcept;
219 :
220 : private:
221 : posix_stream_file_service& svc_;
222 : int fd_ = -1;
223 : std::uint64_t offset_ = 0;
224 :
225 : file_op read_op_;
226 : file_op write_op_;
227 : pool_op read_pool_op_;
228 : pool_op write_pool_op_;
229 :
230 : static void do_read_work(pool_work_item*) noexcept;
231 : static void do_write_work(pool_work_item*) noexcept;
232 : };
233 :
234 : // ---------------------------------------------------------------------------
235 : // Inline implementation
236 : // ---------------------------------------------------------------------------
237 :
238 : inline
239 26 : posix_stream_file::posix_stream_file(posix_stream_file_service& svc) noexcept
240 26 : : svc_(svc)
241 : {
242 26 : }
243 :
244 : inline std::error_code
245 19 : posix_stream_file::open_file(
246 : std::filesystem::path const& path, file_base::flags mode)
247 : {
248 19 : close_file();
249 :
250 19 : int oflags = 0;
251 :
252 : // Access mode
253 19 : unsigned access = static_cast<unsigned>(mode) & 3u;
254 19 : if (access == static_cast<unsigned>(file_base::read_write))
255 2 : oflags |= O_RDWR;
256 17 : else if (access == static_cast<unsigned>(file_base::write_only))
257 7 : oflags |= O_WRONLY;
258 : else
259 10 : oflags |= O_RDONLY;
260 :
261 : // Creation flags
262 19 : if ((mode & file_base::create) != file_base::flags(0))
263 6 : oflags |= O_CREAT;
264 19 : if ((mode & file_base::exclusive) != file_base::flags(0))
265 1 : oflags |= O_EXCL;
266 19 : if ((mode & file_base::truncate) != file_base::flags(0))
267 5 : oflags |= O_TRUNC;
268 19 : if ((mode & file_base::append) != file_base::flags(0))
269 1 : oflags |= O_APPEND;
270 19 : if ((mode & file_base::sync_all_on_write) != file_base::flags(0))
271 MIS 0 : oflags |= O_SYNC;
272 :
273 HIT 19 : int fd = ::open(path.c_str(), oflags, 0666);
274 19 : if (fd < 0)
275 2 : return make_err(errno);
276 :
277 17 : fd_ = fd;
278 17 : offset_ = 0;
279 :
280 : // Append mode: position at end-of-file (preadv/pwritev use
281 : // explicit offsets, so O_APPEND alone is not sufficient).
282 17 : if ((mode & file_base::append) != file_base::flags(0))
283 : {
284 : struct stat st;
285 1 : if (::fstat(fd, &st) < 0)
286 : {
287 MIS 0 : int err = errno;
288 0 : ::close(fd);
289 0 : fd_ = -1;
290 0 : return make_err(err);
291 : }
292 HIT 1 : offset_ = static_cast<std::uint64_t>(st.st_size);
293 : }
294 :
295 : #ifdef POSIX_FADV_SEQUENTIAL
296 17 : ::posix_fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
297 : #endif
298 :
299 17 : return {};
300 : }
301 :
302 : inline void
303 89 : posix_stream_file::close_file() noexcept
304 : {
305 89 : if (fd_ >= 0)
306 : {
307 17 : ::close(fd_);
308 17 : fd_ = -1;
309 : }
310 89 : }
311 :
312 : inline std::uint64_t
313 3 : posix_stream_file::size() const
314 : {
315 : struct stat st;
316 3 : if (::fstat(fd_, &st) < 0)
317 MIS 0 : throw_system_error(make_err(errno), "stream_file::size");
318 HIT 3 : return static_cast<std::uint64_t>(st.st_size);
319 : }
320 :
321 : inline void
322 1 : posix_stream_file::resize(std::uint64_t new_size)
323 : {
324 1 : if (new_size > static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
325 MIS 0 : throw_system_error(make_err(EOVERFLOW), "stream_file::resize");
326 HIT 1 : if (::ftruncate(fd_, static_cast<off_t>(new_size)) < 0)
327 MIS 0 : throw_system_error(make_err(errno), "stream_file::resize");
328 HIT 1 : }
329 :
330 : inline void
331 1 : posix_stream_file::sync_data()
332 : {
333 : #if BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
334 1 : if (::fdatasync(fd_) < 0)
335 : #else // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
336 : if (::fsync(fd_) < 0)
337 : #endif // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
338 MIS 0 : throw_system_error(make_err(errno), "stream_file::sync_data");
339 HIT 1 : }
340 :
341 : inline void
342 1 : posix_stream_file::sync_all()
343 : {
344 1 : if (::fsync(fd_) < 0)
345 MIS 0 : throw_system_error(make_err(errno), "stream_file::sync_all");
346 HIT 1 : }
347 :
348 : inline native_handle_type
349 1 : posix_stream_file::release()
350 : {
351 1 : int fd = fd_;
352 1 : fd_ = -1;
353 1 : offset_ = 0;
354 1 : return fd;
355 : }
356 :
357 : inline void
358 1 : posix_stream_file::assign(native_handle_type handle)
359 : {
360 1 : close_file();
361 1 : fd_ = handle;
362 1 : offset_ = 0;
363 1 : }
364 :
365 : inline std::uint64_t
366 7 : posix_stream_file::seek(std::int64_t offset, file_base::seek_basis origin)
367 : {
368 : // We track offset_ ourselves (not the kernel fd offset)
369 : // because preadv/pwritev use explicit offsets.
370 : std::int64_t new_pos;
371 :
372 7 : if (origin == file_base::seek_set)
373 : {
374 3 : new_pos = offset;
375 : }
376 4 : else if (origin == file_base::seek_cur)
377 : {
378 2 : new_pos = static_cast<std::int64_t>(offset_) + offset;
379 : }
380 : else
381 : {
382 : struct stat st;
383 2 : if (::fstat(fd_, &st) < 0)
384 MIS 0 : throw_system_error(make_err(errno), "stream_file::seek");
385 HIT 2 : new_pos = st.st_size + offset;
386 : }
387 :
388 7 : if (new_pos < 0)
389 3 : throw_system_error(make_err(EINVAL), "stream_file::seek");
390 4 : if (new_pos > static_cast<std::int64_t>(std::numeric_limits<off_t>::max()))
391 MIS 0 : throw_system_error(make_err(EOVERFLOW), "stream_file::seek");
392 :
393 HIT 4 : offset_ = static_cast<std::uint64_t>(new_pos);
394 :
395 4 : return offset_;
396 : }
397 :
398 : // -- file_op completion handler --
399 : // (read_some, write_some, do_read_work, do_write_work are
400 : // defined in posix_stream_file_service.hpp after the service)
401 :
402 : inline void
403 12 : posix_stream_file::file_op::operator()()
404 : {
405 12 : stop_cb.reset();
406 :
407 12 : bool const was_cancelled = cancelled.load(std::memory_order_acquire);
408 :
409 12 : if (ec_out)
410 : {
411 12 : if (was_cancelled)
412 1 : *ec_out = capy::error::canceled;
413 11 : else if (errn != 0)
414 MIS 0 : *ec_out = make_err(errn);
415 HIT 11 : else if (is_read && bytes_transferred == 0)
416 1 : *ec_out = capy::error::eof;
417 : else
418 10 : *ec_out = {};
419 : }
420 :
421 12 : if (bytes_out)
422 12 : *bytes_out = was_cancelled ? 0 : bytes_transferred;
423 :
424 : // Move impl_ref to a local so members remain valid through
425 : // dispatch — impl_ref may be the last shared_ptr keeping
426 : // the parent posix_stream_file (which embeds this file_op) alive.
427 12 : auto prevent_destroy = std::move(impl_ref);
428 12 : ex.on_work_finished();
429 12 : cont_op.cont.h = h;
430 12 : dispatch_coro(ex, cont_op.cont).resume();
431 12 : }
432 :
433 : inline void
434 MIS 0 : posix_stream_file::file_op::destroy()
435 : {
436 0 : stop_cb.reset();
437 0 : auto local_ex = ex;
438 0 : impl_ref.reset();
439 0 : local_ex.on_work_finished();
440 0 : }
441 :
442 : } // namespace boost::corosio::detail
443 :
444 : #endif // BOOST_COROSIO_POSIX
445 :
446 : #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
|