1 +
//
 
2 +
// Copyright (c) 2026 Michael Vandeberg
 
3 +
//
 
4 +
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 +
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 +
//
 
7 +
// Official repository: https://github.com/cppalliance/corosio
 
8 +
//
 
9 +

 
10 +
#ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
 
11 +
#define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
 
12 +

 
13 +
#include <boost/corosio/detail/platform.hpp>
 
14 +

 
15 +
#if BOOST_COROSIO_POSIX
 
16 +

 
17 +
#include <boost/corosio/detail/config.hpp>
 
18 +
#include <boost/corosio/stream_file.hpp>
 
19 +
#include <boost/corosio/file_base.hpp>
 
20 +
#include <boost/corosio/detail/intrusive.hpp>
 
21 +
#include <boost/corosio/detail/dispatch_coro.hpp>
 
22 +
#include <boost/corosio/detail/scheduler_op.hpp>
 
23 +
#include <boost/corosio/detail/continuation_op.hpp>
 
24 +
#include <boost/corosio/detail/thread_pool.hpp>
 
25 +
#include <boost/corosio/detail/scheduler.hpp>
 
26 +
#include <boost/corosio/detail/buffer_param.hpp>
 
27 +
#include <boost/corosio/native/detail/make_err.hpp>
 
28 +
#include <boost/capy/ex/executor_ref.hpp>
 
29 +
#include <boost/capy/error.hpp>
 
30 +
#include <boost/capy/buffers.hpp>
 
31 +

 
32 +
#include <atomic>
 
33 +
#include <coroutine>
 
34 +
#include <cstddef>
 
35 +
#include <cstdint>
 
36 +
#include <filesystem>
 
37 +
#include <limits>
 
38 +
#include <memory>
 
39 +
#include <optional>
 
40 +
#include <stop_token>
 
41 +
#include <system_error>
 
42 +

 
43 +
#include <errno.h>
 
44 +
#include <fcntl.h>
 
45 +
#include <sys/stat.h>
 
46 +
#include <sys/uio.h>
 
47 +
#include <unistd.h>
 
48 +

 
49 +
/*
 
50 +
    POSIX Stream File Implementation
 
51 +
    =================================
 
52 +

 
53 +
    Regular files cannot be monitored by epoll/kqueue/select — the kernel
 
54 +
    always reports them as ready. Blocking I/O (pread/pwrite) is dispatched
 
55 +
    to a shared thread pool, with completion posted back to the scheduler.
 
56 +

 
57 +
    This follows the same pattern as posix_resolver: pool_work_item for
 
58 +
    dispatch, scheduler_op for completion, shared_from_this for lifetime.
 
59 +

 
60 +
    Completion Flow
 
61 +
    ---------------
 
62 +
    1. read_some() sets up file_read_op, posts to thread pool
 
63 +
    2. Pool thread runs preadv() (blocking)
 
64 +
    3. Pool thread stores results, posts scheduler_op to scheduler
 
65 +
    4. Scheduler invokes op() which resumes the coroutine
 
66 +

 
67 +
    Single-Inflight Constraint
 
68 +
    --------------------------
 
69 +
    Only one asynchronous operation may be in flight at a time on a
 
70 +
    given file object. Concurrent read and write is not supported
 
71 +
    because both share offset_ without synchronization.
 
72 +
*/
 
73 +

 
74 +
namespace boost::corosio::detail {
 
75 +

 
76 +
struct scheduler;
 
77 +
class posix_stream_file_service;
 
78 +

 
79 +
/** Stream file implementation for POSIX backends.
 
80 +

 
81 +
    Each instance contains embedded operation objects (read_op_, write_op_)
 
82 +
    that are reused across calls. This avoids per-operation heap allocation.
 
83 +
*/
 
84 +
class posix_stream_file final
 
85 +
    : public stream_file::implementation
 
86 +
    , public std::enable_shared_from_this<posix_stream_file>
 
87 +
    , public intrusive_list<posix_stream_file>::node
 
88 +
{
 
89 +
    friend class posix_stream_file_service;
 
90 +

 
91 +
public:
 
92 +
    static constexpr std::size_t max_buffers = 16;
 
93 +

 
94 +
    /** Operation state for a single file read or write. */
 
95 +
    struct file_op : scheduler_op
 
96 +
    {
 
97 +
        struct canceller
 
98 +
        {
 
99 +
            file_op* op;
 
100 +
            void operator()() const noexcept
 
101 +
            {
 
102 +
                op->request_cancel();
 
103 +
            }
 
104 +
        };
 
105 +

 
106 +
        // Coroutine state
 
107 +
        std::coroutine_handle<> h;
 
108 +
        detail::continuation_op cont_op;
 
109 +
        capy::executor_ref ex;
 
110 +

 
111 +
        // Output pointers
 
112 +
        std::error_code* ec_out = nullptr;
 
113 +
        std::size_t* bytes_out  = nullptr;
 
114 +

 
115 +
        // Buffer data (copied from buffer_param at submission time)
 
116 +
        iovec iovecs[max_buffers];
 
117 +
        int iovec_count = 0;
 
118 +

 
119 +
        // Result storage (populated by worker thread)
 
120 +
        int errn                    = 0;
 
121 +
        std::size_t bytes_transferred = 0;
 
122 +
        bool is_read                = false;
 
123 +

 
124 +
        // Thread coordination
 
125 +
        std::atomic<bool> cancelled{false};
 
126 +
        std::optional<std::stop_callback<canceller>> stop_cb;
 
127 +

 
128 +
        /// Prevents use-after-free when file is closed with pending ops.
 
129 +
        std::shared_ptr<void> impl_ref;
 
130 +

 
131 +
        file_op() = default;
 
132 +

 
133 +
        void reset() noexcept
 
134 +
        {
 
135 +
            iovec_count       = 0;
 
136 +
            errn              = 0;
 
137 +
            bytes_transferred = 0;
 
138 +
            is_read           = false;
 
139 +
            cancelled.store(false, std::memory_order_relaxed);
 
140 +
            stop_cb.reset();
 
141 +
            impl_ref.reset();
 
142 +
            ec_out    = nullptr;
 
143 +
            bytes_out = nullptr;
 
144 +
        }
 
145 +

 
146 +
        void operator()() override;
 
147 +
        void destroy() override;
 
148 +

 
149 +
        void request_cancel() noexcept
 
150 +
        {
 
151 +
            cancelled.store(true, std::memory_order_release);
 
152 +
        }
 
153 +

 
154 +
        void start(std::stop_token const& token)
 
155 +
        {
 
156 +
            cancelled.store(false, std::memory_order_release);
 
157 +
            stop_cb.reset();
 
158 +
            if (token.stop_possible())
 
159 +
                stop_cb.emplace(token, canceller{this});
 
160 +
        }
 
161 +
    };
 
162 +

 
163 +
    /** Pool work item for thread pool dispatch. */
 
164 +
    struct pool_op : pool_work_item
 
165 +
    {
 
166 +
        posix_stream_file* file_ = nullptr;
 
167 +
        std::shared_ptr<posix_stream_file> ref_;
 
168 +
    };
 
169 +

 
170 +
    explicit posix_stream_file(posix_stream_file_service& svc) noexcept;
 
171 +

 
172 +
    // -- io_stream::implementation --
 
173 +

 
174 +
    std::coroutine_handle<> read_some(
 
175 +
        std::coroutine_handle<>,
 
176 +
        capy::executor_ref,
 
177 +
        buffer_param,
 
178 +
        std::stop_token,
 
179 +
        std::error_code*,
 
180 +
        std::size_t*) override;
 
181 +

 
182 +
    std::coroutine_handle<> write_some(
 
183 +
        std::coroutine_handle<>,
 
184 +
        capy::executor_ref,
 
185 +
        buffer_param,
 
186 +
        std::stop_token,
 
187 +
        std::error_code*,
 
188 +
        std::size_t*) override;
 
189 +

 
190 +
    // -- stream_file::implementation --
 
191 +

 
192 +
    native_handle_type native_handle() const noexcept override
 
193 +
    {
 
194 +
        return fd_;
 
195 +
    }
 
196 +

 
197 +
    void cancel() noexcept override
 
198 +
    {
 
199 +
        read_op_.request_cancel();
 
200 +
        write_op_.request_cancel();
 
201 +
    }
 
202 +

 
203 +
    std::uint64_t size() const override;
 
204 +
    void resize(std::uint64_t new_size) override;
 
205 +
    void sync_data() override;
 
206 +
    void sync_all() override;
 
207 +
    native_handle_type release() override;
 
208 +
    void assign(native_handle_type handle) override;
 
209 +
    std::uint64_t seek(std::int64_t offset, file_base::seek_basis origin) override;
 
210 +

 
211 +
    // -- Internal --
 
212 +

 
213 +
    /** Open the file and store the fd. */
 
214 +
    std::error_code open_file(
 
215 +
        std::filesystem::path const& path, file_base::flags mode);
 
216 +

 
217 +
    /** Close the file descriptor. */
 
218 +
    void close_file() noexcept;
 
219 +

 
220 +
private:
 
221 +
    posix_stream_file_service& svc_;
 
222 +
    int fd_ = -1;
 
223 +
    std::uint64_t offset_ = 0;
 
224 +

 
225 +
    file_op read_op_;
 
226 +
    file_op write_op_;
 
227 +
    pool_op read_pool_op_;
 
228 +
    pool_op write_pool_op_;
 
229 +

 
230 +
    static void do_read_work(pool_work_item*) noexcept;
 
231 +
    static void do_write_work(pool_work_item*) noexcept;
 
232 +
};
 
233 +

 
234 +
// ---------------------------------------------------------------------------
 
235 +
// Inline implementation
 
236 +
// ---------------------------------------------------------------------------
 
237 +

 
238 +
inline
 
239 +
posix_stream_file::posix_stream_file(posix_stream_file_service& svc) noexcept
 
240 +
    : svc_(svc)
 
241 +
{
 
242 +
}
 
243 +

 
244 +
inline std::error_code
 
245 +
posix_stream_file::open_file(
 
246 +
    std::filesystem::path const& path, file_base::flags mode)
 
247 +
{
 
248 +
    close_file();
 
249 +

 
250 +
    int oflags = 0;
 
251 +

 
252 +
    // Access mode
 
253 +
    unsigned access = static_cast<unsigned>(mode) & 3u;
 
254 +
    if (access == static_cast<unsigned>(file_base::read_write))
 
255 +
        oflags |= O_RDWR;
 
256 +
    else if (access == static_cast<unsigned>(file_base::write_only))
 
257 +
        oflags |= O_WRONLY;
 
258 +
    else
 
259 +
        oflags |= O_RDONLY;
 
260 +

 
261 +
    // Creation flags
 
262 +
    if ((mode & file_base::create) != file_base::flags(0))
 
263 +
        oflags |= O_CREAT;
 
264 +
    if ((mode & file_base::exclusive) != file_base::flags(0))
 
265 +
        oflags |= O_EXCL;
 
266 +
    if ((mode & file_base::truncate) != file_base::flags(0))
 
267 +
        oflags |= O_TRUNC;
 
268 +
    if ((mode & file_base::append) != file_base::flags(0))
 
269 +
        oflags |= O_APPEND;
 
270 +
    if ((mode & file_base::sync_all_on_write) != file_base::flags(0))
 
271 +
        oflags |= O_SYNC;
 
272 +

 
273 +
    int fd = ::open(path.c_str(), oflags, 0666);
 
274 +
    if (fd < 0)
 
275 +
        return make_err(errno);
 
276 +

 
277 +
    fd_     = fd;
 
278 +
    offset_ = 0;
 
279 +

 
280 +
    // Append mode: position at end-of-file (preadv/pwritev use
 
281 +
    // explicit offsets, so O_APPEND alone is not sufficient).
 
282 +
    if ((mode & file_base::append) != file_base::flags(0))
 
283 +
    {
 
284 +
        struct stat st;
 
285 +
        if (::fstat(fd, &st) < 0)
 
286 +
        {
 
287 +
            int err = errno;
 
288 +
            ::close(fd);
 
289 +
            fd_ = -1;
 
290 +
            return make_err(err);
 
291 +
        }
 
292 +
        offset_ = static_cast<std::uint64_t>(st.st_size);
 
293 +
    }
 
294 +

 
295 +
#ifdef POSIX_FADV_SEQUENTIAL
 
296 +
    ::posix_fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
 
297 +
#endif
 
298 +

 
299 +
    return {};
 
300 +
}
 
301 +

 
302 +
inline void
 
303 +
posix_stream_file::close_file() noexcept
 
304 +
{
 
305 +
    if (fd_ >= 0)
 
306 +
    {
 
307 +
        ::close(fd_);
 
308 +
        fd_ = -1;
 
309 +
    }
 
310 +
}
 
311 +

 
312 +
inline std::uint64_t
 
313 +
posix_stream_file::size() const
 
314 +
{
 
315 +
    struct stat st;
 
316 +
    if (::fstat(fd_, &st) < 0)
 
317 +
        throw_system_error(make_err(errno), "stream_file::size");
 
318 +
    return static_cast<std::uint64_t>(st.st_size);
 
319 +
}
 
320 +

 
321 +
inline void
 
322 +
posix_stream_file::resize(std::uint64_t new_size)
 
323 +
{
 
324 +
    if (new_size > static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
 
325 +
        throw_system_error(make_err(EOVERFLOW), "stream_file::resize");
 
326 +
    if (::ftruncate(fd_, static_cast<off_t>(new_size)) < 0)
 
327 +
        throw_system_error(make_err(errno), "stream_file::resize");
 
328 +
}
 
329 +

 
330 +
inline void
 
331 +
posix_stream_file::sync_data()
 
332 +
{
 
333 +
#if BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
 
334 +
    if (::fdatasync(fd_) < 0)
 
335 +
#else // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
 
336 +
    if (::fsync(fd_) < 0)
 
337 +
#endif // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
 
338 +
        throw_system_error(make_err(errno), "stream_file::sync_data");
 
339 +
}
 
340 +

 
341 +
inline void
 
342 +
posix_stream_file::sync_all()
 
343 +
{
 
344 +
    if (::fsync(fd_) < 0)
 
345 +
        throw_system_error(make_err(errno), "stream_file::sync_all");
 
346 +
}
 
347 +

 
348 +
inline native_handle_type
 
349 +
posix_stream_file::release()
 
350 +
{
 
351 +
    int fd = fd_;
 
352 +
    fd_ = -1;
 
353 +
    offset_ = 0;
 
354 +
    return fd;
 
355 +
}
 
356 +

 
357 +
inline void
 
358 +
posix_stream_file::assign(native_handle_type handle)
 
359 +
{
 
360 +
    close_file();
 
361 +
    fd_ = handle;
 
362 +
    offset_ = 0;
 
363 +
}
 
364 +

 
365 +
inline std::uint64_t
 
366 +
posix_stream_file::seek(std::int64_t offset, file_base::seek_basis origin)
 
367 +
{
 
368 +
    // We track offset_ ourselves (not the kernel fd offset)
 
369 +
    // because preadv/pwritev use explicit offsets.
 
370 +
    std::int64_t new_pos;
 
371 +

 
372 +
    if (origin == file_base::seek_set)
 
373 +
    {
 
374 +
        new_pos = offset;
 
375 +
    }
 
376 +
    else if (origin == file_base::seek_cur)
 
377 +
    {
 
378 +
        new_pos = static_cast<std::int64_t>(offset_) + offset;
 
379 +
    }
 
380 +
    else
 
381 +
    {
 
382 +
        struct stat st;
 
383 +
        if (::fstat(fd_, &st) < 0)
 
384 +
            throw_system_error(make_err(errno), "stream_file::seek");
 
385 +
        new_pos = st.st_size + offset;
 
386 +
    }
 
387 +

 
388 +
    if (new_pos < 0)
 
389 +
        throw_system_error(make_err(EINVAL), "stream_file::seek");
 
390 +
    if (new_pos > static_cast<std::int64_t>(std::numeric_limits<off_t>::max()))
 
391 +
        throw_system_error(make_err(EOVERFLOW), "stream_file::seek");
 
392 +

 
393 +
    offset_ = static_cast<std::uint64_t>(new_pos);
 
394 +

 
395 +
    return offset_;
 
396 +
}
 
397 +

 
398 +
// -- file_op completion handler --
 
399 +
// (read_some, write_some, do_read_work, do_write_work are
 
400 +
//  defined in posix_stream_file_service.hpp after the service)
 
401 +

 
402 +
inline void
 
403 +
posix_stream_file::file_op::operator()()
 
404 +
{
 
405 +
    stop_cb.reset();
 
406 +

 
407 +
    bool const was_cancelled = cancelled.load(std::memory_order_acquire);
 
408 +

 
409 +
    if (ec_out)
 
410 +
    {
 
411 +
        if (was_cancelled)
 
412 +
            *ec_out = capy::error::canceled;
 
413 +
        else if (errn != 0)
 
414 +
            *ec_out = make_err(errn);
 
415 +
        else if (is_read && bytes_transferred == 0)
 
416 +
            *ec_out = capy::error::eof;
 
417 +
        else
 
418 +
            *ec_out = {};
 
419 +
    }
 
420 +

 
421 +
    if (bytes_out)
 
422 +
        *bytes_out = was_cancelled ? 0 : bytes_transferred;
 
423 +

 
424 +
    // Move impl_ref to a local so members remain valid through
 
425 +
    // dispatch — impl_ref may be the last shared_ptr keeping
 
426 +
    // the parent posix_stream_file (which embeds this file_op) alive.
 
427 +
    auto prevent_destroy = std::move(impl_ref);
 
428 +
    ex.on_work_finished();
 
429 +
    cont_op.cont.h = h;
 
430 +
    dispatch_coro(ex, cont_op.cont).resume();
 
431 +
}
 
432 +

 
433 +
inline void
 
434 +
posix_stream_file::file_op::destroy()
 
435 +
{
 
436 +
    stop_cb.reset();
 
437 +
    auto local_ex = ex;
 
438 +
    impl_ref.reset();
 
439 +
    local_ex.on_work_finished();
 
440 +
}
 
441 +

 
442 +
} // namespace boost::corosio::detail
 
443 +

 
444 +
#endif // BOOST_COROSIO_POSIX
 
445 +

 
446 +
#endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP