TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_POSIX
16 :
17 : #include <boost/corosio/native/detail/posix/posix_random_access_file.hpp>
18 : #include <boost/corosio/detail/random_access_file_service.hpp>
19 : #include <boost/corosio/detail/thread_pool.hpp>
20 :
21 : #include <limits>
22 : #include <mutex>
23 : #include <unordered_map>
24 :
25 : namespace boost::corosio::detail {
26 :
27 : /** Random-access file service for POSIX backends. */
28 : class BOOST_COROSIO_DECL posix_random_access_file_service final
29 : : public random_access_file_service
30 : {
31 : public:
32 HIT 515 : posix_random_access_file_service(
33 : capy::execution_context& ctx, scheduler& sched)
34 1030 : : sched_(&sched)
35 515 : , pool_(get_or_create_pool(ctx))
36 : {
37 515 : }
38 :
39 1030 : ~posix_random_access_file_service() override = default;
40 :
41 : posix_random_access_file_service(
42 : posix_random_access_file_service const&) = delete;
43 : posix_random_access_file_service& operator=(
44 : posix_random_access_file_service const&) = delete;
45 :
46 24 : io_object::implementation* construct() override
47 : {
48 24 : auto ptr = std::make_shared<posix_random_access_file>(*this);
49 24 : auto* impl = ptr.get();
50 :
51 : {
52 24 : std::lock_guard<std::mutex> lock(mutex_);
53 24 : file_list_.push_back(impl);
54 24 : file_ptrs_[impl] = std::move(ptr);
55 24 : }
56 :
57 24 : return impl;
58 24 : }
59 :
60 24 : void destroy(io_object::implementation* p) override
61 : {
62 24 : auto& impl = static_cast<posix_random_access_file&>(*p);
63 24 : impl.cancel();
64 24 : impl.close_file();
65 24 : destroy_impl(impl);
66 24 : }
67 :
68 42 : void close(io_object::handle& h) override
69 : {
70 42 : if (h.get())
71 : {
72 42 : auto& impl = static_cast<posix_random_access_file&>(*h.get());
73 42 : impl.cancel();
74 42 : impl.close_file();
75 : }
76 42 : }
77 :
78 19 : std::error_code open_file(
79 : random_access_file::implementation& impl,
80 : std::filesystem::path const& path,
81 : file_base::flags mode) override
82 : {
83 19 : return static_cast<posix_random_access_file&>(impl).open_file(
84 19 : path, mode);
85 : }
86 :
87 515 : void shutdown() override
88 : {
89 515 : std::lock_guard<std::mutex> lock(mutex_);
90 515 : for (auto* impl = file_list_.pop_front(); impl != nullptr;
91 MIS 0 : impl = file_list_.pop_front())
92 : {
93 0 : impl->cancel();
94 0 : impl->close_file();
95 : }
96 HIT 515 : file_ptrs_.clear();
97 515 : }
98 :
99 24 : void destroy_impl(posix_random_access_file& impl)
100 : {
101 24 : std::lock_guard<std::mutex> lock(mutex_);
102 24 : file_list_.remove(&impl);
103 24 : file_ptrs_.erase(&impl);
104 24 : }
105 :
106 126 : void post(scheduler_op* op)
107 : {
108 126 : sched_->post(op);
109 126 : }
110 :
111 : void work_started() noexcept
112 : {
113 : sched_->work_started();
114 : }
115 :
116 : void work_finished() noexcept
117 : {
118 : sched_->work_finished();
119 : }
120 :
121 126 : thread_pool& pool() noexcept
122 : {
123 126 : return pool_;
124 : }
125 :
126 : private:
127 515 : static thread_pool& get_or_create_pool(capy::execution_context& ctx)
128 : {
129 515 : auto* p = ctx.find_service<thread_pool>();
130 515 : if (p)
131 515 : return *p;
132 MIS 0 : return ctx.make_service<thread_pool>();
133 : }
134 :
135 : scheduler* sched_;
136 : thread_pool& pool_;
137 : std::mutex mutex_;
138 : intrusive_list<posix_random_access_file> file_list_;
139 : std::unordered_map<
140 : posix_random_access_file*,
141 : std::shared_ptr<posix_random_access_file>>
142 : file_ptrs_;
143 : };
144 :
145 : /** Get or create the random-access file service for the given context. */
146 : inline posix_random_access_file_service&
147 HIT 515 : get_random_access_file_service(capy::execution_context& ctx, scheduler& sched)
148 : {
149 515 : return ctx.make_service<posix_random_access_file_service>(sched);
150 : }
151 :
152 : // ---------------------------------------------------------------------------
153 : // posix_random_access_file inline implementations (require complete service)
154 : // ---------------------------------------------------------------------------
155 :
156 : inline std::coroutine_handle<>
157 116 : posix_random_access_file::read_some_at(
158 : std::uint64_t offset,
159 : std::coroutine_handle<> h,
160 : capy::executor_ref ex,
161 : buffer_param param,
162 : std::stop_token token,
163 : std::error_code* ec,
164 : std::size_t* bytes_out)
165 : {
166 116 : capy::mutable_buffer bufs[max_buffers];
167 116 : auto count = param.copy_to(bufs, max_buffers);
168 :
169 116 : if (count == 0)
170 : {
171 MIS 0 : *ec = {};
172 0 : *bytes_out = 0;
173 0 : return h;
174 : }
175 :
176 HIT 116 : auto* op = new raf_op();
177 116 : op->is_read = true;
178 116 : op->offset = offset;
179 :
180 116 : op->iovec_count = static_cast<int>(count);
181 232 : for (int i = 0; i < op->iovec_count; ++i)
182 : {
183 116 : op->iovecs[i].iov_base = bufs[i].data();
184 116 : op->iovecs[i].iov_len = bufs[i].size();
185 : }
186 :
187 116 : op->h = h;
188 116 : op->ex = ex;
189 116 : op->ec_out = ec;
190 116 : op->bytes_out = bytes_out;
191 116 : op->file_ = this;
192 116 : op->file_ref = this->shared_from_this();
193 116 : op->start(token);
194 :
195 116 : op->ex.on_work_started();
196 :
197 : {
198 116 : std::lock_guard<std::mutex> lock(ops_mutex_);
199 116 : outstanding_ops_.push_back(op);
200 116 : }
201 :
202 116 : static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
203 116 : if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
204 : {
205 MIS 0 : op->cancelled.store(true, std::memory_order_release);
206 0 : svc_.post(static_cast<scheduler_op*>(op));
207 : }
208 HIT 116 : return std::noop_coroutine();
209 : }
210 :
211 : inline std::coroutine_handle<>
212 10 : posix_random_access_file::write_some_at(
213 : std::uint64_t offset,
214 : std::coroutine_handle<> h,
215 : capy::executor_ref ex,
216 : buffer_param param,
217 : std::stop_token token,
218 : std::error_code* ec,
219 : std::size_t* bytes_out)
220 : {
221 10 : capy::mutable_buffer bufs[max_buffers];
222 10 : auto count = param.copy_to(bufs, max_buffers);
223 :
224 10 : if (count == 0)
225 : {
226 MIS 0 : *ec = {};
227 0 : *bytes_out = 0;
228 0 : return h;
229 : }
230 :
231 HIT 10 : auto* op = new raf_op();
232 10 : op->is_read = false;
233 10 : op->offset = offset;
234 :
235 10 : op->iovec_count = static_cast<int>(count);
236 20 : for (int i = 0; i < op->iovec_count; ++i)
237 : {
238 10 : op->iovecs[i].iov_base = bufs[i].data();
239 10 : op->iovecs[i].iov_len = bufs[i].size();
240 : }
241 :
242 10 : op->h = h;
243 10 : op->ex = ex;
244 10 : op->ec_out = ec;
245 10 : op->bytes_out = bytes_out;
246 10 : op->file_ = this;
247 10 : op->file_ref = this->shared_from_this();
248 10 : op->start(token);
249 :
250 10 : op->ex.on_work_started();
251 :
252 : {
253 10 : std::lock_guard<std::mutex> lock(ops_mutex_);
254 10 : outstanding_ops_.push_back(op);
255 10 : }
256 :
257 10 : static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
258 10 : if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
259 : {
260 MIS 0 : op->cancelled.store(true, std::memory_order_release);
261 0 : svc_.post(static_cast<scheduler_op*>(op));
262 : }
263 HIT 10 : return std::noop_coroutine();
264 : }
265 :
266 : // -- raf_op thread-pool work function --
267 :
268 : inline void
269 126 : posix_random_access_file::raf_op::do_work(pool_work_item* w) noexcept
270 : {
271 126 : auto* op = static_cast<raf_op*>(w);
272 126 : auto* self = op->file_;
273 :
274 126 : if (op->cancelled.load(std::memory_order_acquire))
275 : {
276 1 : op->errn = ECANCELED;
277 1 : op->bytes_transferred = 0;
278 : }
279 250 : else if (op->offset >
280 125 : static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
281 : {
282 MIS 0 : op->errn = EOVERFLOW;
283 0 : op->bytes_transferred = 0;
284 : }
285 : else
286 : {
287 : ssize_t n;
288 HIT 125 : if (op->is_read)
289 : {
290 : do
291 : {
292 230 : n = ::preadv(self->fd_, op->iovecs, op->iovec_count,
293 115 : static_cast<off_t>(op->offset));
294 : }
295 115 : while (n < 0 && errno == EINTR);
296 : }
297 : else
298 : {
299 : do
300 : {
301 20 : n = ::pwritev(self->fd_, op->iovecs, op->iovec_count,
302 10 : static_cast<off_t>(op->offset));
303 : }
304 10 : while (n < 0 && errno == EINTR);
305 : }
306 :
307 125 : if (n >= 0)
308 : {
309 125 : op->errn = 0;
310 125 : op->bytes_transferred = static_cast<std::size_t>(n);
311 : }
312 : else
313 : {
314 MIS 0 : op->errn = errno;
315 0 : op->bytes_transferred = 0;
316 : }
317 : }
318 :
319 HIT 126 : self->svc_.post(static_cast<scheduler_op*>(op));
320 126 : }
321 :
322 : } // namespace boost::corosio::detail
323 :
324 : #endif // BOOST_COROSIO_POSIX
325 :
326 : #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
|