include/boost/corosio/native/detail/posix/posix_stream_file_service.hpp

84.6% Lines (121/143) 100.0% List of functions (16/16)
posix_stream_file_service.hpp
f(x) Functions (16)
Function Calls Lines Blocks
boost::corosio::detail::posix_stream_file_service::posix_stream_file_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :35 515x 100.0% 88.0% boost::corosio::detail::posix_stream_file_service::~posix_stream_file_service() :42 1030x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::construct() :47 26x 100.0% 71.0% boost::corosio::detail::posix_stream_file_service::destroy(boost::corosio::io_object::implementation*) :61 26x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::close(boost::corosio::io_object::handle&) :69 43x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::open_file(boost::corosio::stream_file::implementation&, std::filesystem::__cxx11::path const&, boost::corosio::file_base::flags) :79 19x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::shutdown() :87 515x 62.5% 70.0% boost::corosio::detail::posix_stream_file_service::destroy_impl(boost::corosio::detail::posix_stream_file&) :99 26x 100.0% 67.0% boost::corosio::detail::posix_stream_file_service::post(boost::corosio::detail::scheduler_op*) :106 12x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::pool() :121 12x 100.0% 100.0% boost::corosio::detail::posix_stream_file_service::get_or_create_pool(boost::capy::execution_context&) :127 515x 80.0% 67.0% boost::corosio::detail::get_stream_file_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :145 515x 100.0% 100.0% boost::corosio::detail::posix_stream_file::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :155 6x 75.0% 73.0% boost::corosio::detail::posix_stream_file::do_read_work(boost::corosio::detail::pool_work_item*) :205 6x 88.2% 83.0% boost::corosio::detail::posix_stream_file::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :239 6x 75.0% 73.0% boost::corosio::detail::posix_stream_file::do_write_work(boost::corosio::detail::pool_work_item*) :289 6x 88.2% 83.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_POSIX
16
17 #include <boost/corosio/native/detail/posix/posix_stream_file.hpp>
18 #include <boost/corosio/detail/file_service.hpp>
19 #include <boost/corosio/detail/thread_pool.hpp>
20
21 #include <mutex>
22 #include <unordered_map>
23
24 namespace boost::corosio::detail {
25
26 /** Stream file service for POSIX backends.
27
28 Owns all posix_stream_file instances. Thread lifecycle is
29 managed by the thread_pool service (shared with resolver).
30 */
31 class BOOST_COROSIO_DECL posix_stream_file_service final
32 : public file_service
33 {
34 public:
35 515x posix_stream_file_service(
36 capy::execution_context& ctx, scheduler& sched)
37 1030x : sched_(&sched)
38 515x , pool_(get_or_create_pool(ctx))
39 {
40 515x }
41
42 1030x ~posix_stream_file_service() override = default;
43
44 posix_stream_file_service(posix_stream_file_service const&) = delete;
45 posix_stream_file_service& operator=(posix_stream_file_service const&) = delete;
46
47 26x io_object::implementation* construct() override
48 {
49 26x auto ptr = std::make_shared<posix_stream_file>(*this);
50 26x auto* impl = ptr.get();
51
52 {
53 26x std::lock_guard<std::mutex> lock(mutex_);
54 26x file_list_.push_back(impl);
55 26x file_ptrs_[impl] = std::move(ptr);
56 26x }
57
58 26x return impl;
59 26x }
60
61 26x void destroy(io_object::implementation* p) override
62 {
63 26x auto& impl = static_cast<posix_stream_file&>(*p);
64 26x impl.cancel();
65 26x impl.close_file();
66 26x destroy_impl(impl);
67 26x }
68
69 43x void close(io_object::handle& h) override
70 {
71 43x if (h.get())
72 {
73 43x auto& impl = static_cast<posix_stream_file&>(*h.get());
74 43x impl.cancel();
75 43x impl.close_file();
76 }
77 43x }
78
79 19x std::error_code open_file(
80 stream_file::implementation& impl,
81 std::filesystem::path const& path,
82 file_base::flags mode) override
83 {
84 19x return static_cast<posix_stream_file&>(impl).open_file(path, mode);
85 }
86
87 515x void shutdown() override
88 {
89 515x std::lock_guard<std::mutex> lock(mutex_);
90 515x for (auto* impl = file_list_.pop_front(); impl != nullptr;
91 impl = file_list_.pop_front())
92 {
93 impl->cancel();
94 impl->close_file();
95 }
96 515x file_ptrs_.clear();
97 515x }
98
99 26x void destroy_impl(posix_stream_file& impl)
100 {
101 26x std::lock_guard<std::mutex> lock(mutex_);
102 26x file_list_.remove(&impl);
103 26x file_ptrs_.erase(&impl);
104 26x }
105
106 12x void post(scheduler_op* op)
107 {
108 12x sched_->post(op);
109 12x }
110
111 void work_started() noexcept
112 {
113 sched_->work_started();
114 }
115
116 void work_finished() noexcept
117 {
118 sched_->work_finished();
119 }
120
121 12x thread_pool& pool() noexcept
122 {
123 12x return pool_;
124 }
125
126 private:
127 515x static thread_pool& get_or_create_pool(capy::execution_context& ctx)
128 {
129 515x auto* p = ctx.find_service<thread_pool>();
130 515x if (p)
131 515x return *p;
132 return ctx.make_service<thread_pool>();
133 }
134
135 scheduler* sched_;
136 thread_pool& pool_;
137 std::mutex mutex_;
138 intrusive_list<posix_stream_file> file_list_;
139 std::unordered_map<posix_stream_file*, std::shared_ptr<posix_stream_file>>
140 file_ptrs_;
141 };
142
143 /** Get or create the stream file service for the given context. */
144 inline posix_stream_file_service&
145 515x get_stream_file_service(capy::execution_context& ctx, scheduler& sched)
146 {
147 515x return ctx.make_service<posix_stream_file_service>(sched);
148 }
149
150 // ---------------------------------------------------------------------------
151 // posix_stream_file inline implementations (require complete service type)
152 // ---------------------------------------------------------------------------
153
154 inline std::coroutine_handle<>
155 6x posix_stream_file::read_some(
156 std::coroutine_handle<> h,
157 capy::executor_ref ex,
158 buffer_param param,
159 std::stop_token token,
160 std::error_code* ec,
161 std::size_t* bytes_out)
162 {
163 6x auto& op = read_op_;
164 6x op.reset();
165 6x op.is_read = true;
166
167 6x capy::mutable_buffer bufs[max_buffers];
168 6x op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
169
170 6x if (op.iovec_count == 0)
171 {
172 *ec = {};
173 *bytes_out = 0;
174 op.cont_op.cont.h = h;
175 return dispatch_coro(ex, op.cont_op.cont);
176 }
177
178 12x for (int i = 0; i < op.iovec_count; ++i)
179 {
180 6x op.iovecs[i].iov_base = bufs[i].data();
181 6x op.iovecs[i].iov_len = bufs[i].size();
182 }
183
184 6x op.h = h;
185 6x op.ex = ex;
186 6x op.ec_out = ec;
187 6x op.bytes_out = bytes_out;
188 6x op.start(token);
189
190 6x op.ex.on_work_started();
191
192 6x read_pool_op_.file_ = this;
193 6x read_pool_op_.ref_ = this->shared_from_this();
194 6x read_pool_op_.func_ = &posix_stream_file::do_read_work;
195 6x if (!svc_.pool().post(&read_pool_op_))
196 {
197 op.impl_ref = std::move(read_pool_op_.ref_);
198 op.cancelled.store(true, std::memory_order_release);
199 svc_.post(&read_op_);
200 }
201 6x return std::noop_coroutine();
202 }
203
204 inline void
205 6x posix_stream_file::do_read_work(pool_work_item* w) noexcept
206 {
207 6x auto* pw = static_cast<pool_op*>(w);
208 6x auto* self = pw->file_;
209 6x auto& op = self->read_op_;
210
211 6x if (!op.cancelled.load(std::memory_order_acquire))
212 {
213 ssize_t n;
214 do
215 {
216 10x n = ::preadv(self->fd_, op.iovecs, op.iovec_count,
217 5x static_cast<off_t>(self->offset_));
218 }
219 5x while (n < 0 && errno == EINTR);
220
221 5x if (n >= 0)
222 {
223 5x op.errn = 0;
224 5x op.bytes_transferred = static_cast<std::size_t>(n);
225 5x self->offset_ += static_cast<std::uint64_t>(n);
226 }
227 else
228 {
229 op.errn = errno;
230 op.bytes_transferred = 0;
231 }
232 }
233
234 6x op.impl_ref = std::move(pw->ref_);
235 6x self->svc_.post(&op);
236 6x }
237
238 inline std::coroutine_handle<>
239 6x posix_stream_file::write_some(
240 std::coroutine_handle<> h,
241 capy::executor_ref ex,
242 buffer_param param,
243 std::stop_token token,
244 std::error_code* ec,
245 std::size_t* bytes_out)
246 {
247 6x auto& op = write_op_;
248 6x op.reset();
249 6x op.is_read = false;
250
251 6x capy::mutable_buffer bufs[max_buffers];
252 6x op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
253
254 6x if (op.iovec_count == 0)
255 {
256 *ec = {};
257 *bytes_out = 0;
258 op.cont_op.cont.h = h;
259 return dispatch_coro(ex, op.cont_op.cont);
260 }
261
262 12x for (int i = 0; i < op.iovec_count; ++i)
263 {
264 6x op.iovecs[i].iov_base = bufs[i].data();
265 6x op.iovecs[i].iov_len = bufs[i].size();
266 }
267
268 6x op.h = h;
269 6x op.ex = ex;
270 6x op.ec_out = ec;
271 6x op.bytes_out = bytes_out;
272 6x op.start(token);
273
274 6x op.ex.on_work_started();
275
276 6x write_pool_op_.file_ = this;
277 6x write_pool_op_.ref_ = this->shared_from_this();
278 6x write_pool_op_.func_ = &posix_stream_file::do_write_work;
279 6x if (!svc_.pool().post(&write_pool_op_))
280 {
281 op.impl_ref = std::move(write_pool_op_.ref_);
282 op.cancelled.store(true, std::memory_order_release);
283 svc_.post(&write_op_);
284 }
285 6x return std::noop_coroutine();
286 }
287
288 inline void
289 6x posix_stream_file::do_write_work(pool_work_item* w) noexcept
290 {
291 6x auto* pw = static_cast<pool_op*>(w);
292 6x auto* self = pw->file_;
293 6x auto& op = self->write_op_;
294
295 6x if (!op.cancelled.load(std::memory_order_acquire))
296 {
297 ssize_t n;
298 do
299 {
300 12x n = ::pwritev(self->fd_, op.iovecs, op.iovec_count,
301 6x static_cast<off_t>(self->offset_));
302 }
303 6x while (n < 0 && errno == EINTR);
304
305 6x if (n >= 0)
306 {
307 6x op.errn = 0;
308 6x op.bytes_transferred = static_cast<std::size_t>(n);
309 6x self->offset_ += static_cast<std::uint64_t>(n);
310 }
311 else
312 {
313 op.errn = errno;
314 op.bytes_transferred = 0;
315 }
316 }
317
318 6x op.impl_ref = std::move(pw->ref_);
319 6x self->svc_.post(&op);
320 6x }
321
322 } // namespace boost::corosio::detail
323
324 #endif // BOOST_COROSIO_POSIX
325
326 #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
327