include/boost/corosio/native/detail/posix/posix_stream_file.hpp

86.1% Lines (118/137) 94.7% List of functions (18/19)
posix_stream_file.hpp
f(x) Functions (19)
Function Calls Lines Blocks
boost::corosio::detail::posix_stream_file::file_op::canceller::operator()() const :100 1x 100.0% 100.0% boost::corosio::detail::posix_stream_file::file_op::file_op() :131 52x 100.0% 100.0% boost::corosio::detail::posix_stream_file::file_op::reset() :133 12x 100.0% 100.0% boost::corosio::detail::posix_stream_file::file_op::request_cancel() :149 141x 100.0% 100.0% boost::corosio::detail::posix_stream_file::file_op::start(std::stop_token const&) :154 12x 100.0% 100.0% boost::corosio::detail::posix_stream_file::native_handle() const :192 81x 100.0% 100.0% boost::corosio::detail::posix_stream_file::cancel() :197 70x 100.0% 100.0% boost::corosio::detail::posix_stream_file::posix_stream_file(boost::corosio::detail::posix_stream_file_service&) :239 26x 100.0% 100.0% boost::corosio::detail::posix_stream_file::open_file(std::filesystem::__cxx11::path const&, boost::corosio::file_base::flags) :245 19x 84.8% 90.0% boost::corosio::detail::posix_stream_file::close_file() :303 89x 100.0% 100.0% boost::corosio::detail::posix_stream_file::size() const :313 3x 75.0% 62.0% boost::corosio::detail::posix_stream_file::resize(unsigned long) :322 1x 66.7% 55.0% boost::corosio::detail::posix_stream_file::sync_data() :331 1x 75.0% 67.0% boost::corosio::detail::posix_stream_file::sync_all() :342 1x 75.0% 67.0% boost::corosio::detail::posix_stream_file::release() :349 1x 100.0% 100.0% boost::corosio::detail::posix_stream_file::assign(int) :358 1x 100.0% 100.0% boost::corosio::detail::posix_stream_file::seek(long, boost::corosio::file_base::seek_basis) :366 7x 85.7% 73.0% boost::corosio::detail::posix_stream_file::file_op::operator()() :403 12x 94.4% 87.0% boost::corosio::detail::posix_stream_file::file_op::destroy() :434 0 0.0% 0.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_POSIX
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/stream_file.hpp>
19 #include <boost/corosio/file_base.hpp>
20 #include <boost/corosio/detail/intrusive.hpp>
21 #include <boost/corosio/detail/dispatch_coro.hpp>
22 #include <boost/corosio/detail/scheduler_op.hpp>
23 #include <boost/corosio/detail/continuation_op.hpp>
24 #include <boost/corosio/detail/thread_pool.hpp>
25 #include <boost/corosio/detail/scheduler.hpp>
26 #include <boost/corosio/detail/buffer_param.hpp>
27 #include <boost/corosio/native/detail/make_err.hpp>
28 #include <boost/capy/ex/executor_ref.hpp>
29 #include <boost/capy/error.hpp>
30 #include <boost/capy/buffers.hpp>
31
32 #include <atomic>
33 #include <coroutine>
34 #include <cstddef>
35 #include <cstdint>
36 #include <filesystem>
37 #include <limits>
38 #include <memory>
39 #include <optional>
40 #include <stop_token>
41 #include <system_error>
42
43 #include <errno.h>
44 #include <fcntl.h>
45 #include <sys/stat.h>
46 #include <sys/uio.h>
47 #include <unistd.h>
48
49 /*
50 POSIX Stream File Implementation
51 =================================
52
53 Regular files cannot be monitored by epoll/kqueue/select — the kernel
54 always reports them as ready. Blocking I/O (pread/pwrite) is dispatched
55 to a shared thread pool, with completion posted back to the scheduler.
56
57 This follows the same pattern as posix_resolver: pool_work_item for
58 dispatch, scheduler_op for completion, shared_from_this for lifetime.
59
60 Completion Flow
61 ---------------
62 1. read_some() sets up file_read_op, posts to thread pool
63 2. Pool thread runs preadv() (blocking)
64 3. Pool thread stores results, posts scheduler_op to scheduler
65 4. Scheduler invokes op() which resumes the coroutine
66
67 Single-Inflight Constraint
68 --------------------------
69 Only one asynchronous operation may be in flight at a time on a
70 given file object. Concurrent read and write is not supported
71 because both share offset_ without synchronization.
72 */
73
74 namespace boost::corosio::detail {
75
76 struct scheduler;
77 class posix_stream_file_service;
78
79 /** Stream file implementation for POSIX backends.
80
81 Each instance contains embedded operation objects (read_op_, write_op_)
82 that are reused across calls. This avoids per-operation heap allocation.
83 */
84 class posix_stream_file final
85 : public stream_file::implementation
86 , public std::enable_shared_from_this<posix_stream_file>
87 , public intrusive_list<posix_stream_file>::node
88 {
89 friend class posix_stream_file_service;
90
91 public:
92 static constexpr std::size_t max_buffers = 16;
93
94 /** Operation state for a single file read or write. */
95 struct file_op : scheduler_op
96 {
97 struct canceller
98 {
99 file_op* op;
100 1x void operator()() const noexcept
101 {
102 1x op->request_cancel();
103 1x }
104 };
105
106 // Coroutine state
107 std::coroutine_handle<> h;
108 detail::continuation_op cont_op;
109 capy::executor_ref ex;
110
111 // Output pointers
112 std::error_code* ec_out = nullptr;
113 std::size_t* bytes_out = nullptr;
114
115 // Buffer data (copied from buffer_param at submission time)
116 iovec iovecs[max_buffers];
117 int iovec_count = 0;
118
119 // Result storage (populated by worker thread)
120 int errn = 0;
121 std::size_t bytes_transferred = 0;
122 bool is_read = false;
123
124 // Thread coordination
125 std::atomic<bool> cancelled{false};
126 std::optional<std::stop_callback<canceller>> stop_cb;
127
128 /// Prevents use-after-free when file is closed with pending ops.
129 std::shared_ptr<void> impl_ref;
130
131 52x file_op() = default;
132
133 12x void reset() noexcept
134 {
135 12x iovec_count = 0;
136 12x errn = 0;
137 12x bytes_transferred = 0;
138 12x is_read = false;
139 12x cancelled.store(false, std::memory_order_relaxed);
140 12x stop_cb.reset();
141 12x impl_ref.reset();
142 12x ec_out = nullptr;
143 12x bytes_out = nullptr;
144 12x }
145
146 void operator()() override;
147 void destroy() override;
148
149 141x void request_cancel() noexcept
150 {
151 141x cancelled.store(true, std::memory_order_release);
152 141x }
153
154 12x void start(std::stop_token const& token)
155 {
156 12x cancelled.store(false, std::memory_order_release);
157 12x stop_cb.reset();
158 12x if (token.stop_possible())
159 1x stop_cb.emplace(token, canceller{this});
160 12x }
161 };
162
163 /** Pool work item for thread pool dispatch. */
164 struct pool_op : pool_work_item
165 {
166 posix_stream_file* file_ = nullptr;
167 std::shared_ptr<posix_stream_file> ref_;
168 };
169
170 explicit posix_stream_file(posix_stream_file_service& svc) noexcept;
171
172 // -- io_stream::implementation --
173
174 std::coroutine_handle<> read_some(
175 std::coroutine_handle<>,
176 capy::executor_ref,
177 buffer_param,
178 std::stop_token,
179 std::error_code*,
180 std::size_t*) override;
181
182 std::coroutine_handle<> write_some(
183 std::coroutine_handle<>,
184 capy::executor_ref,
185 buffer_param,
186 std::stop_token,
187 std::error_code*,
188 std::size_t*) override;
189
190 // -- stream_file::implementation --
191
192 81x native_handle_type native_handle() const noexcept override
193 {
194 81x return fd_;
195 }
196
197 70x void cancel() noexcept override
198 {
199 70x read_op_.request_cancel();
200 70x write_op_.request_cancel();
201 70x }
202
203 std::uint64_t size() const override;
204 void resize(std::uint64_t new_size) override;
205 void sync_data() override;
206 void sync_all() override;
207 native_handle_type release() override;
208 void assign(native_handle_type handle) override;
209 std::uint64_t seek(std::int64_t offset, file_base::seek_basis origin) override;
210
211 // -- Internal --
212
213 /** Open the file and store the fd. */
214 std::error_code open_file(
215 std::filesystem::path const& path, file_base::flags mode);
216
217 /** Close the file descriptor. */
218 void close_file() noexcept;
219
220 private:
221 posix_stream_file_service& svc_;
222 int fd_ = -1;
223 std::uint64_t offset_ = 0;
224
225 file_op read_op_;
226 file_op write_op_;
227 pool_op read_pool_op_;
228 pool_op write_pool_op_;
229
230 static void do_read_work(pool_work_item*) noexcept;
231 static void do_write_work(pool_work_item*) noexcept;
232 };
233
234 // ---------------------------------------------------------------------------
235 // Inline implementation
236 // ---------------------------------------------------------------------------
237
238 inline
239 26x posix_stream_file::posix_stream_file(posix_stream_file_service& svc) noexcept
240 26x : svc_(svc)
241 {
242 26x }
243
244 inline std::error_code
245 19x posix_stream_file::open_file(
246 std::filesystem::path const& path, file_base::flags mode)
247 {
248 19x close_file();
249
250 19x int oflags = 0;
251
252 // Access mode
253 19x unsigned access = static_cast<unsigned>(mode) & 3u;
254 19x if (access == static_cast<unsigned>(file_base::read_write))
255 2x oflags |= O_RDWR;
256 17x else if (access == static_cast<unsigned>(file_base::write_only))
257 7x oflags |= O_WRONLY;
258 else
259 10x oflags |= O_RDONLY;
260
261 // Creation flags
262 19x if ((mode & file_base::create) != file_base::flags(0))
263 6x oflags |= O_CREAT;
264 19x if ((mode & file_base::exclusive) != file_base::flags(0))
265 1x oflags |= O_EXCL;
266 19x if ((mode & file_base::truncate) != file_base::flags(0))
267 5x oflags |= O_TRUNC;
268 19x if ((mode & file_base::append) != file_base::flags(0))
269 1x oflags |= O_APPEND;
270 19x if ((mode & file_base::sync_all_on_write) != file_base::flags(0))
271 oflags |= O_SYNC;
272
273 19x int fd = ::open(path.c_str(), oflags, 0666);
274 19x if (fd < 0)
275 2x return make_err(errno);
276
277 17x fd_ = fd;
278 17x offset_ = 0;
279
280 // Append mode: position at end-of-file (preadv/pwritev use
281 // explicit offsets, so O_APPEND alone is not sufficient).
282 17x if ((mode & file_base::append) != file_base::flags(0))
283 {
284 struct stat st;
285 1x if (::fstat(fd, &st) < 0)
286 {
287 int err = errno;
288 ::close(fd);
289 fd_ = -1;
290 return make_err(err);
291 }
292 1x offset_ = static_cast<std::uint64_t>(st.st_size);
293 }
294
295 #ifdef POSIX_FADV_SEQUENTIAL
296 17x ::posix_fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
297 #endif
298
299 17x return {};
300 }
301
302 inline void
303 89x posix_stream_file::close_file() noexcept
304 {
305 89x if (fd_ >= 0)
306 {
307 17x ::close(fd_);
308 17x fd_ = -1;
309 }
310 89x }
311
312 inline std::uint64_t
313 3x posix_stream_file::size() const
314 {
315 struct stat st;
316 3x if (::fstat(fd_, &st) < 0)
317 throw_system_error(make_err(errno), "stream_file::size");
318 3x return static_cast<std::uint64_t>(st.st_size);
319 }
320
321 inline void
322 1x posix_stream_file::resize(std::uint64_t new_size)
323 {
324 1x if (new_size > static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
325 throw_system_error(make_err(EOVERFLOW), "stream_file::resize");
326 1x if (::ftruncate(fd_, static_cast<off_t>(new_size)) < 0)
327 throw_system_error(make_err(errno), "stream_file::resize");
328 1x }
329
330 inline void
331 1x posix_stream_file::sync_data()
332 {
333 #if BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
334 1x if (::fdatasync(fd_) < 0)
335 #else // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
336 if (::fsync(fd_) < 0)
337 #endif // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
338 throw_system_error(make_err(errno), "stream_file::sync_data");
339 1x }
340
341 inline void
342 1x posix_stream_file::sync_all()
343 {
344 1x if (::fsync(fd_) < 0)
345 throw_system_error(make_err(errno), "stream_file::sync_all");
346 1x }
347
348 inline native_handle_type
349 1x posix_stream_file::release()
350 {
351 1x int fd = fd_;
352 1x fd_ = -1;
353 1x offset_ = 0;
354 1x return fd;
355 }
356
357 inline void
358 1x posix_stream_file::assign(native_handle_type handle)
359 {
360 1x close_file();
361 1x fd_ = handle;
362 1x offset_ = 0;
363 1x }
364
365 inline std::uint64_t
366 7x posix_stream_file::seek(std::int64_t offset, file_base::seek_basis origin)
367 {
368 // We track offset_ ourselves (not the kernel fd offset)
369 // because preadv/pwritev use explicit offsets.
370 std::int64_t new_pos;
371
372 7x if (origin == file_base::seek_set)
373 {
374 3x new_pos = offset;
375 }
376 4x else if (origin == file_base::seek_cur)
377 {
378 2x new_pos = static_cast<std::int64_t>(offset_) + offset;
379 }
380 else
381 {
382 struct stat st;
383 2x if (::fstat(fd_, &st) < 0)
384 throw_system_error(make_err(errno), "stream_file::seek");
385 2x new_pos = st.st_size + offset;
386 }
387
388 7x if (new_pos < 0)
389 3x throw_system_error(make_err(EINVAL), "stream_file::seek");
390 4x if (new_pos > static_cast<std::int64_t>(std::numeric_limits<off_t>::max()))
391 throw_system_error(make_err(EOVERFLOW), "stream_file::seek");
392
393 4x offset_ = static_cast<std::uint64_t>(new_pos);
394
395 4x return offset_;
396 }
397
398 // -- file_op completion handler --
399 // (read_some, write_some, do_read_work, do_write_work are
400 // defined in posix_stream_file_service.hpp after the service)
401
402 inline void
403 12x posix_stream_file::file_op::operator()()
404 {
405 12x stop_cb.reset();
406
407 12x bool const was_cancelled = cancelled.load(std::memory_order_acquire);
408
409 12x if (ec_out)
410 {
411 12x if (was_cancelled)
412 1x *ec_out = capy::error::canceled;
413 11x else if (errn != 0)
414 *ec_out = make_err(errn);
415 11x else if (is_read && bytes_transferred == 0)
416 1x *ec_out = capy::error::eof;
417 else
418 10x *ec_out = {};
419 }
420
421 12x if (bytes_out)
422 12x *bytes_out = was_cancelled ? 0 : bytes_transferred;
423
424 // Move impl_ref to a local so members remain valid through
425 // dispatch — impl_ref may be the last shared_ptr keeping
426 // the parent posix_stream_file (which embeds this file_op) alive.
427 12x auto prevent_destroy = std::move(impl_ref);
428 12x ex.on_work_finished();
429 12x cont_op.cont.h = h;
430 12x dispatch_coro(ex, cont_op.cont).resume();
431 12x }
432
433 inline void
434 posix_stream_file::file_op::destroy()
435 {
436 stop_cb.reset();
437 auto local_ex = ex;
438 impl_ref.reset();
439 local_ex.on_work_finished();
440 }
441
442 } // namespace boost::corosio::detail
443
444 #endif // BOOST_COROSIO_POSIX
445
446 #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
447