TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_POSIX
16 :
17 : #include <boost/corosio/native/detail/posix/posix_stream_file.hpp>
18 : #include <boost/corosio/detail/file_service.hpp>
19 : #include <boost/corosio/detail/thread_pool.hpp>
20 :
21 : #include <mutex>
22 : #include <unordered_map>
23 :
24 : namespace boost::corosio::detail {
25 :
26 : /** Stream file service for POSIX backends.
27 :
28 : Owns all posix_stream_file instances. Thread lifecycle is
29 : managed by the thread_pool service (shared with resolver).
30 : */
31 : class BOOST_COROSIO_DECL posix_stream_file_service final
32 : : public file_service
33 : {
34 : public:
35 HIT 515 : posix_stream_file_service(
36 : capy::execution_context& ctx, scheduler& sched)
37 1030 : : sched_(&sched)
38 515 : , pool_(get_or_create_pool(ctx))
39 : {
40 515 : }
41 :
42 1030 : ~posix_stream_file_service() override = default;
43 :
44 : posix_stream_file_service(posix_stream_file_service const&) = delete;
45 : posix_stream_file_service& operator=(posix_stream_file_service const&) = delete;
46 :
47 26 : io_object::implementation* construct() override
48 : {
49 26 : auto ptr = std::make_shared<posix_stream_file>(*this);
50 26 : auto* impl = ptr.get();
51 :
52 : {
53 26 : std::lock_guard<std::mutex> lock(mutex_);
54 26 : file_list_.push_back(impl);
55 26 : file_ptrs_[impl] = std::move(ptr);
56 26 : }
57 :
58 26 : return impl;
59 26 : }
60 :
61 26 : void destroy(io_object::implementation* p) override
62 : {
63 26 : auto& impl = static_cast<posix_stream_file&>(*p);
64 26 : impl.cancel();
65 26 : impl.close_file();
66 26 : destroy_impl(impl);
67 26 : }
68 :
69 43 : void close(io_object::handle& h) override
70 : {
71 43 : if (h.get())
72 : {
73 43 : auto& impl = static_cast<posix_stream_file&>(*h.get());
74 43 : impl.cancel();
75 43 : impl.close_file();
76 : }
77 43 : }
78 :
79 19 : std::error_code open_file(
80 : stream_file::implementation& impl,
81 : std::filesystem::path const& path,
82 : file_base::flags mode) override
83 : {
84 19 : return static_cast<posix_stream_file&>(impl).open_file(path, mode);
85 : }
86 :
87 515 : void shutdown() override
88 : {
89 515 : std::lock_guard<std::mutex> lock(mutex_);
90 515 : for (auto* impl = file_list_.pop_front(); impl != nullptr;
91 MIS 0 : impl = file_list_.pop_front())
92 : {
93 0 : impl->cancel();
94 0 : impl->close_file();
95 : }
96 HIT 515 : file_ptrs_.clear();
97 515 : }
98 :
99 26 : void destroy_impl(posix_stream_file& impl)
100 : {
101 26 : std::lock_guard<std::mutex> lock(mutex_);
102 26 : file_list_.remove(&impl);
103 26 : file_ptrs_.erase(&impl);
104 26 : }
105 :
106 12 : void post(scheduler_op* op)
107 : {
108 12 : sched_->post(op);
109 12 : }
110 :
111 : void work_started() noexcept
112 : {
113 : sched_->work_started();
114 : }
115 :
116 : void work_finished() noexcept
117 : {
118 : sched_->work_finished();
119 : }
120 :
121 12 : thread_pool& pool() noexcept
122 : {
123 12 : return pool_;
124 : }
125 :
126 : private:
127 515 : static thread_pool& get_or_create_pool(capy::execution_context& ctx)
128 : {
129 515 : auto* p = ctx.find_service<thread_pool>();
130 515 : if (p)
131 515 : return *p;
132 MIS 0 : return ctx.make_service<thread_pool>();
133 : }
134 :
135 : scheduler* sched_;
136 : thread_pool& pool_;
137 : std::mutex mutex_;
138 : intrusive_list<posix_stream_file> file_list_;
139 : std::unordered_map<posix_stream_file*, std::shared_ptr<posix_stream_file>>
140 : file_ptrs_;
141 : };
142 :
143 : /** Get or create the stream file service for the given context. */
144 : inline posix_stream_file_service&
145 HIT 515 : get_stream_file_service(capy::execution_context& ctx, scheduler& sched)
146 : {
147 515 : return ctx.make_service<posix_stream_file_service>(sched);
148 : }
149 :
150 : // ---------------------------------------------------------------------------
151 : // posix_stream_file inline implementations (require complete service type)
152 : // ---------------------------------------------------------------------------
153 :
154 : inline std::coroutine_handle<>
155 6 : posix_stream_file::read_some(
156 : std::coroutine_handle<> h,
157 : capy::executor_ref ex,
158 : buffer_param param,
159 : std::stop_token token,
160 : std::error_code* ec,
161 : std::size_t* bytes_out)
162 : {
163 6 : auto& op = read_op_;
164 6 : op.reset();
165 6 : op.is_read = true;
166 :
167 6 : capy::mutable_buffer bufs[max_buffers];
168 6 : op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
169 :
170 6 : if (op.iovec_count == 0)
171 : {
172 MIS 0 : *ec = {};
173 0 : *bytes_out = 0;
174 0 : op.cont_op.cont.h = h;
175 0 : return dispatch_coro(ex, op.cont_op.cont);
176 : }
177 :
178 HIT 12 : for (int i = 0; i < op.iovec_count; ++i)
179 : {
180 6 : op.iovecs[i].iov_base = bufs[i].data();
181 6 : op.iovecs[i].iov_len = bufs[i].size();
182 : }
183 :
184 6 : op.h = h;
185 6 : op.ex = ex;
186 6 : op.ec_out = ec;
187 6 : op.bytes_out = bytes_out;
188 6 : op.start(token);
189 :
190 6 : op.ex.on_work_started();
191 :
192 6 : read_pool_op_.file_ = this;
193 6 : read_pool_op_.ref_ = this->shared_from_this();
194 6 : read_pool_op_.func_ = &posix_stream_file::do_read_work;
195 6 : if (!svc_.pool().post(&read_pool_op_))
196 : {
197 MIS 0 : op.impl_ref = std::move(read_pool_op_.ref_);
198 0 : op.cancelled.store(true, std::memory_order_release);
199 0 : svc_.post(&read_op_);
200 : }
201 HIT 6 : return std::noop_coroutine();
202 : }
203 :
204 : inline void
205 6 : posix_stream_file::do_read_work(pool_work_item* w) noexcept
206 : {
207 6 : auto* pw = static_cast<pool_op*>(w);
208 6 : auto* self = pw->file_;
209 6 : auto& op = self->read_op_;
210 :
211 6 : if (!op.cancelled.load(std::memory_order_acquire))
212 : {
213 : ssize_t n;
214 : do
215 : {
216 10 : n = ::preadv(self->fd_, op.iovecs, op.iovec_count,
217 5 : static_cast<off_t>(self->offset_));
218 : }
219 5 : while (n < 0 && errno == EINTR);
220 :
221 5 : if (n >= 0)
222 : {
223 5 : op.errn = 0;
224 5 : op.bytes_transferred = static_cast<std::size_t>(n);
225 5 : self->offset_ += static_cast<std::uint64_t>(n);
226 : }
227 : else
228 : {
229 MIS 0 : op.errn = errno;
230 0 : op.bytes_transferred = 0;
231 : }
232 : }
233 :
234 HIT 6 : op.impl_ref = std::move(pw->ref_);
235 6 : self->svc_.post(&op);
236 6 : }
237 :
238 : inline std::coroutine_handle<>
239 6 : posix_stream_file::write_some(
240 : std::coroutine_handle<> h,
241 : capy::executor_ref ex,
242 : buffer_param param,
243 : std::stop_token token,
244 : std::error_code* ec,
245 : std::size_t* bytes_out)
246 : {
247 6 : auto& op = write_op_;
248 6 : op.reset();
249 6 : op.is_read = false;
250 :
251 6 : capy::mutable_buffer bufs[max_buffers];
252 6 : op.iovec_count = static_cast<int>(param.copy_to(bufs, max_buffers));
253 :
254 6 : if (op.iovec_count == 0)
255 : {
256 MIS 0 : *ec = {};
257 0 : *bytes_out = 0;
258 0 : op.cont_op.cont.h = h;
259 0 : return dispatch_coro(ex, op.cont_op.cont);
260 : }
261 :
262 HIT 12 : for (int i = 0; i < op.iovec_count; ++i)
263 : {
264 6 : op.iovecs[i].iov_base = bufs[i].data();
265 6 : op.iovecs[i].iov_len = bufs[i].size();
266 : }
267 :
268 6 : op.h = h;
269 6 : op.ex = ex;
270 6 : op.ec_out = ec;
271 6 : op.bytes_out = bytes_out;
272 6 : op.start(token);
273 :
274 6 : op.ex.on_work_started();
275 :
276 6 : write_pool_op_.file_ = this;
277 6 : write_pool_op_.ref_ = this->shared_from_this();
278 6 : write_pool_op_.func_ = &posix_stream_file::do_write_work;
279 6 : if (!svc_.pool().post(&write_pool_op_))
280 : {
281 MIS 0 : op.impl_ref = std::move(write_pool_op_.ref_);
282 0 : op.cancelled.store(true, std::memory_order_release);
283 0 : svc_.post(&write_op_);
284 : }
285 HIT 6 : return std::noop_coroutine();
286 : }
287 :
288 : inline void
289 6 : posix_stream_file::do_write_work(pool_work_item* w) noexcept
290 : {
291 6 : auto* pw = static_cast<pool_op*>(w);
292 6 : auto* self = pw->file_;
293 6 : auto& op = self->write_op_;
294 :
295 6 : if (!op.cancelled.load(std::memory_order_acquire))
296 : {
297 : ssize_t n;
298 : do
299 : {
300 12 : n = ::pwritev(self->fd_, op.iovecs, op.iovec_count,
301 6 : static_cast<off_t>(self->offset_));
302 : }
303 6 : while (n < 0 && errno == EINTR);
304 :
305 6 : if (n >= 0)
306 : {
307 6 : op.errn = 0;
308 6 : op.bytes_transferred = static_cast<std::size_t>(n);
309 6 : self->offset_ += static_cast<std::uint64_t>(n);
310 : }
311 : else
312 : {
313 MIS 0 : op.errn = errno;
314 0 : op.bytes_transferred = 0;
315 : }
316 : }
317 :
318 HIT 6 : op.impl_ref = std::move(pw->ref_);
319 6 : self->svc_.post(&op);
320 6 : }
321 :
322 : } // namespace boost::corosio::detail
323 :
324 : #endif // BOOST_COROSIO_POSIX
325 :
326 : #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_SERVICE_HPP
|