include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp

84.2% Lines (117/139) 100.0% List of functions (9/9)
epoll_scheduler.hpp
f(x) Functions (9)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21
22 #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23 #include <boost/corosio/detail/timer_service.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29
30 #include <boost/corosio/detail/except.hpp>
31
32 #include <atomic>
33 #include <chrono>
34 #include <cstdint>
35 #include <mutex>
36
37 #include <errno.h>
38 #include <sys/epoll.h>
39 #include <sys/eventfd.h>
40 #include <sys/timerfd.h>
41 #include <unistd.h>
42
43 namespace boost::corosio::detail {
44
45 struct epoll_op;
46 struct descriptor_state;
47
48 /** Linux scheduler using epoll for I/O multiplexing.
49
50 This scheduler implements the scheduler interface using Linux epoll
51 for efficient I/O event notification. It uses a single reactor model
52 where one thread runs epoll_wait while other threads
53 wait on a condition variable for handler work. This design provides:
54
55 - Handler parallelism: N posted handlers can execute on N threads
56 - No thundering herd: condition_variable wakes exactly one thread
57 - IOCP parity: Behavior matches Windows I/O completion port semantics
58
59 When threads call run(), they first try to execute queued handlers.
60 If the queue is empty and no reactor is running, one thread becomes
61 the reactor and runs epoll_wait. Other threads wait on a condition
62 variable until handlers are available.
63
64 @par Thread Safety
65 All public member functions are thread-safe.
66 */
67 class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
68 {
69 public:
70 /** Construct the scheduler.
71
72 Creates an epoll instance, eventfd for reactor interruption,
73 and timerfd for kernel-managed timer expiry.
74
75 @param ctx Reference to the owning execution_context.
76 @param concurrency_hint Hint for expected thread count (unused).
77 */
78 epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79
80 /// Destroy the scheduler.
81 ~epoll_scheduler() override;
82
83 epoll_scheduler(epoll_scheduler const&) = delete;
84 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
85
86 /// Shut down the scheduler, draining pending operations.
87 void shutdown() override;
88
89 /** Return the epoll file descriptor.
90
91 Used by socket services to register file descriptors
92 for I/O event notification.
93
94 @return The epoll file descriptor.
95 */
96 int epoll_fd() const noexcept
97 {
98 return epoll_fd_;
99 }
100
101 /** Register a descriptor for persistent monitoring.
102
103 The fd is registered once and stays registered until explicitly
104 deregistered. Events are dispatched via descriptor_state which
105 tracks pending read/write/connect operations.
106
107 @param fd The file descriptor to register.
108 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
109 */
110 void register_descriptor(int fd, descriptor_state* desc) const;
111
112 /** Deregister a persistently registered descriptor.
113
114 @param fd The file descriptor to deregister.
115 */
116 void deregister_descriptor(int fd) const;
117
118 private:
119 void
120 run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
121 void interrupt_reactor() const override;
122 void update_timerfd() const;
123
124 int epoll_fd_;
125 int event_fd_;
126 int timer_fd_;
127
128 // Edge-triggered eventfd state
129 mutable std::atomic<bool> eventfd_armed_{false};
130
131 // Set when the earliest timer changes; flushed before epoll_wait
132 mutable std::atomic<bool> timerfd_stale_{false};
133 };
134
135 320x inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
136 320x : epoll_fd_(-1)
137 320x , event_fd_(-1)
138 320x , timer_fd_(-1)
139 {
140 320x epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
141 320x if (epoll_fd_ < 0)
142 detail::throw_system_error(make_err(errno), "epoll_create1");
143
144 320x event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
145 320x if (event_fd_ < 0)
146 {
147 int errn = errno;
148 ::close(epoll_fd_);
149 detail::throw_system_error(make_err(errn), "eventfd");
150 }
151
152 320x timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
153 320x if (timer_fd_ < 0)
154 {
155 int errn = errno;
156 ::close(event_fd_);
157 ::close(epoll_fd_);
158 detail::throw_system_error(make_err(errn), "timerfd_create");
159 }
160
161 320x epoll_event ev{};
162 320x ev.events = EPOLLIN | EPOLLET;
163 320x ev.data.ptr = nullptr;
164 320x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
165 {
166 int errn = errno;
167 ::close(timer_fd_);
168 ::close(event_fd_);
169 ::close(epoll_fd_);
170 detail::throw_system_error(make_err(errn), "epoll_ctl");
171 }
172
173 320x epoll_event timer_ev{};
174 320x timer_ev.events = EPOLLIN | EPOLLERR;
175 320x timer_ev.data.ptr = &timer_fd_;
176 320x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
177 {
178 int errn = errno;
179 ::close(timer_fd_);
180 ::close(event_fd_);
181 ::close(epoll_fd_);
182 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
183 }
184
185 320x timer_svc_ = &get_timer_service(ctx, *this);
186 320x timer_svc_->set_on_earliest_changed(
187 4919x timer_service::callback(this, [](void* p) {
188 4599x auto* self = static_cast<epoll_scheduler*>(p);
189 4599x self->timerfd_stale_.store(true, std::memory_order_release);
190 4599x self->interrupt_reactor();
191 4599x }));
192
193 320x get_resolver_service(ctx, *this);
194 320x get_signal_service(ctx, *this);
195 320x get_stream_file_service(ctx, *this);
196 320x get_random_access_file_service(ctx, *this);
197
198 320x completed_ops_.push(&task_op_);
199 320x }
200
201 640x inline epoll_scheduler::~epoll_scheduler()
202 {
203 320x if (timer_fd_ >= 0)
204 320x ::close(timer_fd_);
205 320x if (event_fd_ >= 0)
206 320x ::close(event_fd_);
207 320x if (epoll_fd_ >= 0)
208 320x ::close(epoll_fd_);
209 640x }
210
211 inline void
212 320x epoll_scheduler::shutdown()
213 {
214 320x shutdown_drain();
215
216 320x if (event_fd_ >= 0)
217 320x interrupt_reactor();
218 320x }
219
220 inline void
221 8873x epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
222 {
223 8873x epoll_event ev{};
224 8873x ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
225 8873x ev.data.ptr = desc;
226
227 8873x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
228 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
229
230 8873x desc->registered_events = ev.events;
231 8873x desc->fd = fd;
232 8873x desc->scheduler_ = this;
233 8873x desc->ready_events_.store(0, std::memory_order_relaxed);
234
235 8873x std::lock_guard lock(desc->mutex);
236 8873x desc->impl_ref_.reset();
237 8873x desc->read_ready = false;
238 8873x desc->write_ready = false;
239 8873x }
240
241 inline void
242 8873x epoll_scheduler::deregister_descriptor(int fd) const
243 {
244 8873x ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
245 8873x }
246
247 inline void
248 5208x epoll_scheduler::interrupt_reactor() const
249 {
250 5208x bool expected = false;
251 5208x if (eventfd_armed_.compare_exchange_strong(
252 expected, true, std::memory_order_release,
253 std::memory_order_relaxed))
254 {
255 4998x std::uint64_t val = 1;
256 4998x [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
257 }
258 5208x }
259
260 inline void
261 9158x epoll_scheduler::update_timerfd() const
262 {
263 9158x auto nearest = timer_svc_->nearest_expiry();
264
265 9158x itimerspec ts{};
266 9158x int flags = 0;
267
268 9158x if (nearest == timer_service::time_point::max())
269 {
270 // No timers — disarm by setting to 0 (relative)
271 }
272 else
273 {
274 9105x auto now = std::chrono::steady_clock::now();
275 9105x if (nearest <= now)
276 {
277 // Use 1ns instead of 0 — zero disarms the timerfd
278 282x ts.it_value.tv_nsec = 1;
279 }
280 else
281 {
282 8823x auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
283 8823x nearest - now)
284 8823x .count();
285 8823x ts.it_value.tv_sec = nsec / 1000000000;
286 8823x ts.it_value.tv_nsec = nsec % 1000000000;
287 8823x if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
288 ts.it_value.tv_nsec = 1;
289 }
290 }
291
292 9158x if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
293 detail::throw_system_error(make_err(errno), "timerfd_settime");
294 9158x }
295
296 inline void
297 36316x epoll_scheduler::run_task(std::unique_lock<std::mutex>& lock, context_type* ctx)
298 {
299 36316x int timeout_ms = task_interrupted_ ? 0 : -1;
300
301 36316x if (lock.owns_lock())
302 13062x lock.unlock();
303
304 36316x task_cleanup on_exit{this, &lock, ctx};
305
306 // Flush deferred timerfd programming before blocking
307 36316x if (timerfd_stale_.exchange(false, std::memory_order_acquire))
308 4577x update_timerfd();
309
310 epoll_event events[128];
311 36316x int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
312
313 36316x if (nfds < 0 && errno != EINTR)
314 detail::throw_system_error(make_err(errno), "epoll_wait");
315
316 36316x bool check_timers = false;
317 36316x op_queue local_ops;
318
319 81177x for (int i = 0; i < nfds; ++i)
320 {
321 44861x if (events[i].data.ptr == nullptr)
322 {
323 std::uint64_t val;
324 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
325 4678x [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
326 4678x eventfd_armed_.store(false, std::memory_order_relaxed);
327 4678x continue;
328 4678x }
329
330 40183x if (events[i].data.ptr == &timer_fd_)
331 {
332 std::uint64_t expirations;
333 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
334 [[maybe_unused]] auto r =
335 4581x ::read(timer_fd_, &expirations, sizeof(expirations));
336 4581x check_timers = true;
337 4581x continue;
338 4581x }
339
340 35602x auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
341 35602x desc->add_ready_events(events[i].events);
342
343 35602x bool expected = false;
344 35602x if (desc->is_enqueued_.compare_exchange_strong(
345 expected, true, std::memory_order_release,
346 std::memory_order_relaxed))
347 {
348 35602x local_ops.push(desc);
349 }
350 }
351
352 36316x if (check_timers)
353 {
354 4581x timer_svc_->process_expired();
355 4581x update_timerfd();
356 }
357
358 36316x lock.lock();
359
360 36316x if (!local_ops.empty())
361 22717x completed_ops_.splice(local_ops);
362 36316x }
363
364 } // namespace boost::corosio::detail
365
366 #endif // BOOST_COROSIO_HAS_EPOLL
367
368 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
369