TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21 :
22 : #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23 : #include <boost/corosio/detail/timer_service.hpp>
24 : #include <boost/corosio/native/detail/make_err.hpp>
25 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 : #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 : #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29 :
30 : #include <boost/corosio/detail/except.hpp>
31 :
32 : #include <atomic>
33 : #include <chrono>
34 : #include <cstdint>
35 : #include <mutex>
36 :
37 : #include <errno.h>
38 : #include <sys/epoll.h>
39 : #include <sys/eventfd.h>
40 : #include <sys/timerfd.h>
41 : #include <unistd.h>
42 :
43 : namespace boost::corosio::detail {
44 :
45 : struct epoll_op;
46 : struct descriptor_state;
47 :
48 : /** Linux scheduler using epoll for I/O multiplexing.
49 :
50 : This scheduler implements the scheduler interface using Linux epoll
51 : for efficient I/O event notification. It uses a single reactor model
52 : where one thread runs epoll_wait while other threads
53 : wait on a condition variable for handler work. This design provides:
54 :
55 : - Handler parallelism: N posted handlers can execute on N threads
56 : - No thundering herd: condition_variable wakes exactly one thread
57 : - IOCP parity: Behavior matches Windows I/O completion port semantics
58 :
59 : When threads call run(), they first try to execute queued handlers.
60 : If the queue is empty and no reactor is running, one thread becomes
61 : the reactor and runs epoll_wait. Other threads wait on a condition
62 : variable until handlers are available.
63 :
64 : @par Thread Safety
65 : All public member functions are thread-safe.
66 : */
67 : class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
68 : {
69 : public:
70 : /** Construct the scheduler.
71 :
72 : Creates an epoll instance, eventfd for reactor interruption,
73 : and timerfd for kernel-managed timer expiry.
74 :
75 : @param ctx Reference to the owning execution_context.
76 : @param concurrency_hint Hint for expected thread count (unused).
77 : */
78 : epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79 :
80 : /// Destroy the scheduler.
81 : ~epoll_scheduler() override;
82 :
83 : epoll_scheduler(epoll_scheduler const&) = delete;
84 : epoll_scheduler& operator=(epoll_scheduler const&) = delete;
85 :
86 : /// Shut down the scheduler, draining pending operations.
87 : void shutdown() override;
88 :
89 : /** Return the epoll file descriptor.
90 :
91 : Used by socket services to register file descriptors
92 : for I/O event notification.
93 :
94 : @return The epoll file descriptor.
95 : */
96 : int epoll_fd() const noexcept
97 : {
98 : return epoll_fd_;
99 : }
100 :
101 : /** Register a descriptor for persistent monitoring.
102 :
103 : The fd is registered once and stays registered until explicitly
104 : deregistered. Events are dispatched via descriptor_state which
105 : tracks pending read/write/connect operations.
106 :
107 : @param fd The file descriptor to register.
108 : @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
109 : */
110 : void register_descriptor(int fd, descriptor_state* desc) const;
111 :
112 : /** Deregister a persistently registered descriptor.
113 :
114 : @param fd The file descriptor to deregister.
115 : */
116 : void deregister_descriptor(int fd) const;
117 :
118 : private:
119 : void
120 : run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
121 : void interrupt_reactor() const override;
122 : void update_timerfd() const;
123 :
124 : int epoll_fd_;
125 : int event_fd_;
126 : int timer_fd_;
127 :
128 : // Edge-triggered eventfd state
129 : mutable std::atomic<bool> eventfd_armed_{false};
130 :
131 : // Set when the earliest timer changes; flushed before epoll_wait
132 : mutable std::atomic<bool> timerfd_stale_{false};
133 : };
134 :
135 HIT 320 : inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
136 320 : : epoll_fd_(-1)
137 320 : , event_fd_(-1)
138 320 : , timer_fd_(-1)
139 : {
140 320 : epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
141 320 : if (epoll_fd_ < 0)
142 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_create1");
143 :
144 HIT 320 : event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
145 320 : if (event_fd_ < 0)
146 : {
147 MIS 0 : int errn = errno;
148 0 : ::close(epoll_fd_);
149 0 : detail::throw_system_error(make_err(errn), "eventfd");
150 : }
151 :
152 HIT 320 : timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
153 320 : if (timer_fd_ < 0)
154 : {
155 MIS 0 : int errn = errno;
156 0 : ::close(event_fd_);
157 0 : ::close(epoll_fd_);
158 0 : detail::throw_system_error(make_err(errn), "timerfd_create");
159 : }
160 :
161 HIT 320 : epoll_event ev{};
162 320 : ev.events = EPOLLIN | EPOLLET;
163 320 : ev.data.ptr = nullptr;
164 320 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
165 : {
166 MIS 0 : int errn = errno;
167 0 : ::close(timer_fd_);
168 0 : ::close(event_fd_);
169 0 : ::close(epoll_fd_);
170 0 : detail::throw_system_error(make_err(errn), "epoll_ctl");
171 : }
172 :
173 HIT 320 : epoll_event timer_ev{};
174 320 : timer_ev.events = EPOLLIN | EPOLLERR;
175 320 : timer_ev.data.ptr = &timer_fd_;
176 320 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
177 : {
178 MIS 0 : int errn = errno;
179 0 : ::close(timer_fd_);
180 0 : ::close(event_fd_);
181 0 : ::close(epoll_fd_);
182 0 : detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
183 : }
184 :
185 HIT 320 : timer_svc_ = &get_timer_service(ctx, *this);
186 320 : timer_svc_->set_on_earliest_changed(
187 4919 : timer_service::callback(this, [](void* p) {
188 4599 : auto* self = static_cast<epoll_scheduler*>(p);
189 4599 : self->timerfd_stale_.store(true, std::memory_order_release);
190 4599 : self->interrupt_reactor();
191 4599 : }));
192 :
193 320 : get_resolver_service(ctx, *this);
194 320 : get_signal_service(ctx, *this);
195 320 : get_stream_file_service(ctx, *this);
196 320 : get_random_access_file_service(ctx, *this);
197 :
198 320 : completed_ops_.push(&task_op_);
199 320 : }
200 :
201 640 : inline epoll_scheduler::~epoll_scheduler()
202 : {
203 320 : if (timer_fd_ >= 0)
204 320 : ::close(timer_fd_);
205 320 : if (event_fd_ >= 0)
206 320 : ::close(event_fd_);
207 320 : if (epoll_fd_ >= 0)
208 320 : ::close(epoll_fd_);
209 640 : }
210 :
211 : inline void
212 320 : epoll_scheduler::shutdown()
213 : {
214 320 : shutdown_drain();
215 :
216 320 : if (event_fd_ >= 0)
217 320 : interrupt_reactor();
218 320 : }
219 :
220 : inline void
221 8873 : epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
222 : {
223 8873 : epoll_event ev{};
224 8873 : ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
225 8873 : ev.data.ptr = desc;
226 :
227 8873 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
228 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
229 :
230 HIT 8873 : desc->registered_events = ev.events;
231 8873 : desc->fd = fd;
232 8873 : desc->scheduler_ = this;
233 8873 : desc->ready_events_.store(0, std::memory_order_relaxed);
234 :
235 8873 : std::lock_guard lock(desc->mutex);
236 8873 : desc->impl_ref_.reset();
237 8873 : desc->read_ready = false;
238 8873 : desc->write_ready = false;
239 8873 : }
240 :
241 : inline void
242 8873 : epoll_scheduler::deregister_descriptor(int fd) const
243 : {
244 8873 : ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
245 8873 : }
246 :
247 : inline void
248 5208 : epoll_scheduler::interrupt_reactor() const
249 : {
250 5208 : bool expected = false;
251 5208 : if (eventfd_armed_.compare_exchange_strong(
252 : expected, true, std::memory_order_release,
253 : std::memory_order_relaxed))
254 : {
255 4998 : std::uint64_t val = 1;
256 4998 : [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
257 : }
258 5208 : }
259 :
260 : inline void
261 9158 : epoll_scheduler::update_timerfd() const
262 : {
263 9158 : auto nearest = timer_svc_->nearest_expiry();
264 :
265 9158 : itimerspec ts{};
266 9158 : int flags = 0;
267 :
268 9158 : if (nearest == timer_service::time_point::max())
269 : {
270 : // No timers — disarm by setting to 0 (relative)
271 : }
272 : else
273 : {
274 9105 : auto now = std::chrono::steady_clock::now();
275 9105 : if (nearest <= now)
276 : {
277 : // Use 1ns instead of 0 — zero disarms the timerfd
278 282 : ts.it_value.tv_nsec = 1;
279 : }
280 : else
281 : {
282 8823 : auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
283 8823 : nearest - now)
284 8823 : .count();
285 8823 : ts.it_value.tv_sec = nsec / 1000000000;
286 8823 : ts.it_value.tv_nsec = nsec % 1000000000;
287 8823 : if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
288 MIS 0 : ts.it_value.tv_nsec = 1;
289 : }
290 : }
291 :
292 HIT 9158 : if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
293 MIS 0 : detail::throw_system_error(make_err(errno), "timerfd_settime");
294 HIT 9158 : }
295 :
296 : inline void
297 36316 : epoll_scheduler::run_task(std::unique_lock<std::mutex>& lock, context_type* ctx)
298 : {
299 36316 : int timeout_ms = task_interrupted_ ? 0 : -1;
300 :
301 36316 : if (lock.owns_lock())
302 13062 : lock.unlock();
303 :
304 36316 : task_cleanup on_exit{this, &lock, ctx};
305 :
306 : // Flush deferred timerfd programming before blocking
307 36316 : if (timerfd_stale_.exchange(false, std::memory_order_acquire))
308 4577 : update_timerfd();
309 :
310 : epoll_event events[128];
311 36316 : int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
312 :
313 36316 : if (nfds < 0 && errno != EINTR)
314 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_wait");
315 :
316 HIT 36316 : bool check_timers = false;
317 36316 : op_queue local_ops;
318 :
319 81177 : for (int i = 0; i < nfds; ++i)
320 : {
321 44861 : if (events[i].data.ptr == nullptr)
322 : {
323 : std::uint64_t val;
324 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
325 4678 : [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
326 4678 : eventfd_armed_.store(false, std::memory_order_relaxed);
327 4678 : continue;
328 4678 : }
329 :
330 40183 : if (events[i].data.ptr == &timer_fd_)
331 : {
332 : std::uint64_t expirations;
333 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
334 : [[maybe_unused]] auto r =
335 4581 : ::read(timer_fd_, &expirations, sizeof(expirations));
336 4581 : check_timers = true;
337 4581 : continue;
338 4581 : }
339 :
340 35602 : auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
341 35602 : desc->add_ready_events(events[i].events);
342 :
343 35602 : bool expected = false;
344 35602 : if (desc->is_enqueued_.compare_exchange_strong(
345 : expected, true, std::memory_order_release,
346 : std::memory_order_relaxed))
347 : {
348 35602 : local_ops.push(desc);
349 : }
350 : }
351 :
352 36316 : if (check_timers)
353 : {
354 4581 : timer_svc_->process_expired();
355 4581 : update_timerfd();
356 : }
357 :
358 36316 : lock.lock();
359 :
360 36316 : if (!local_ops.empty())
361 22717 : completed_ops_.splice(local_ops);
362 36316 : }
363 :
364 : } // namespace boost::corosio::detail
365 :
366 : #endif // BOOST_COROSIO_HAS_EPOLL
367 :
368 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
|