1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21  

21  

22  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
 
27 +
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
 
28 +
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
27  

29  

28  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/except.hpp>
29  

31  

30  
#include <atomic>
32  
#include <atomic>
31  
#include <chrono>
33  
#include <chrono>
32  
#include <cstdint>
34  
#include <cstdint>
33  
#include <mutex>
35  
#include <mutex>
34  

36  

35  
#include <errno.h>
37  
#include <errno.h>
36  
#include <sys/epoll.h>
38  
#include <sys/epoll.h>
37  
#include <sys/eventfd.h>
39  
#include <sys/eventfd.h>
38  
#include <sys/timerfd.h>
40  
#include <sys/timerfd.h>
39  
#include <unistd.h>
41  
#include <unistd.h>
40  

42  

41  
namespace boost::corosio::detail {
43  
namespace boost::corosio::detail {
42  

44  

43  
struct epoll_op;
45  
struct epoll_op;
44  
struct descriptor_state;
46  
struct descriptor_state;
45  

47  

46  
/** Linux scheduler using epoll for I/O multiplexing.
48  
/** Linux scheduler using epoll for I/O multiplexing.
47  

49  

48  
    This scheduler implements the scheduler interface using Linux epoll
50  
    This scheduler implements the scheduler interface using Linux epoll
49  
    for efficient I/O event notification. It uses a single reactor model
51  
    for efficient I/O event notification. It uses a single reactor model
50  
    where one thread runs epoll_wait while other threads
52  
    where one thread runs epoll_wait while other threads
51  
    wait on a condition variable for handler work. This design provides:
53  
    wait on a condition variable for handler work. This design provides:
52  

54  

53  
    - Handler parallelism: N posted handlers can execute on N threads
55  
    - Handler parallelism: N posted handlers can execute on N threads
54  
    - No thundering herd: condition_variable wakes exactly one thread
56  
    - No thundering herd: condition_variable wakes exactly one thread
55  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
57  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
56  

58  

57  
    When threads call run(), they first try to execute queued handlers.
59  
    When threads call run(), they first try to execute queued handlers.
58  
    If the queue is empty and no reactor is running, one thread becomes
60  
    If the queue is empty and no reactor is running, one thread becomes
59  
    the reactor and runs epoll_wait. Other threads wait on a condition
61  
    the reactor and runs epoll_wait. Other threads wait on a condition
60  
    variable until handlers are available.
62  
    variable until handlers are available.
61  

63  

62  
    @par Thread Safety
64  
    @par Thread Safety
63  
    All public member functions are thread-safe.
65  
    All public member functions are thread-safe.
64  
*/
66  
*/
65  
class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
67  
class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
66  
{
68  
{
67  
public:
69  
public:
68  
    /** Construct the scheduler.
70  
    /** Construct the scheduler.
69  

71  

70  
        Creates an epoll instance, eventfd for reactor interruption,
72  
        Creates an epoll instance, eventfd for reactor interruption,
71  
        and timerfd for kernel-managed timer expiry.
73  
        and timerfd for kernel-managed timer expiry.
72  

74  

73  
        @param ctx Reference to the owning execution_context.
75  
        @param ctx Reference to the owning execution_context.
74  
        @param concurrency_hint Hint for expected thread count (unused).
76  
        @param concurrency_hint Hint for expected thread count (unused).
75  
    */
77  
    */
76  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
78  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
77  

79  

78  
    /// Destroy the scheduler.
80  
    /// Destroy the scheduler.
79  
    ~epoll_scheduler() override;
81  
    ~epoll_scheduler() override;
80  

82  

81  
    epoll_scheduler(epoll_scheduler const&)            = delete;
83  
    epoll_scheduler(epoll_scheduler const&)            = delete;
82  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
84  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
83  

85  

84  
    /// Shut down the scheduler, draining pending operations.
86  
    /// Shut down the scheduler, draining pending operations.
85  
    void shutdown() override;
87  
    void shutdown() override;
86  

88  

87  
    /** Return the epoll file descriptor.
89  
    /** Return the epoll file descriptor.
88  

90  

89  
        Used by socket services to register file descriptors
91  
        Used by socket services to register file descriptors
90  
        for I/O event notification.
92  
        for I/O event notification.
91  

93  

92  
        @return The epoll file descriptor.
94  
        @return The epoll file descriptor.
93  
    */
95  
    */
94  
    int epoll_fd() const noexcept
96  
    int epoll_fd() const noexcept
95  
    {
97  
    {
96  
        return epoll_fd_;
98  
        return epoll_fd_;
97  
    }
99  
    }
98  

100  

99  
    /** Register a descriptor for persistent monitoring.
101  
    /** Register a descriptor for persistent monitoring.
100  

102  

101  
        The fd is registered once and stays registered until explicitly
103  
        The fd is registered once and stays registered until explicitly
102  
        deregistered. Events are dispatched via descriptor_state which
104  
        deregistered. Events are dispatched via descriptor_state which
103  
        tracks pending read/write/connect operations.
105  
        tracks pending read/write/connect operations.
104  

106  

105  
        @param fd The file descriptor to register.
107  
        @param fd The file descriptor to register.
106  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
108  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
107  
    */
109  
    */
108  
    void register_descriptor(int fd, descriptor_state* desc) const;
110  
    void register_descriptor(int fd, descriptor_state* desc) const;
109  

111  

110  
    /** Deregister a persistently registered descriptor.
112  
    /** Deregister a persistently registered descriptor.
111  

113  

112  
        @param fd The file descriptor to deregister.
114  
        @param fd The file descriptor to deregister.
113  
    */
115  
    */
114  
    void deregister_descriptor(int fd) const;
116  
    void deregister_descriptor(int fd) const;
115  

117  

116  
private:
118  
private:
117  
    void
119  
    void
118  
    run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
120  
    run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
119  
    void interrupt_reactor() const override;
121  
    void interrupt_reactor() const override;
120  
    void update_timerfd() const;
122  
    void update_timerfd() const;
121  

123  

122  
    int epoll_fd_;
124  
    int epoll_fd_;
123  
    int event_fd_;
125  
    int event_fd_;
124  
    int timer_fd_;
126  
    int timer_fd_;
125  

127  

126  
    // Edge-triggered eventfd state
128  
    // Edge-triggered eventfd state
127  
    mutable std::atomic<bool> eventfd_armed_{false};
129  
    mutable std::atomic<bool> eventfd_armed_{false};
128  

130  

129  
    // Set when the earliest timer changes; flushed before epoll_wait
131  
    // Set when the earliest timer changes; flushed before epoll_wait
130  
    mutable std::atomic<bool> timerfd_stale_{false};
132  
    mutable std::atomic<bool> timerfd_stale_{false};
131  
};
133  
};
132  

134  

133  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
135  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
134  
    : epoll_fd_(-1)
136  
    : epoll_fd_(-1)
135  
    , event_fd_(-1)
137  
    , event_fd_(-1)
136  
    , timer_fd_(-1)
138  
    , timer_fd_(-1)
137  
{
139  
{
138  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
140  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
139  
    if (epoll_fd_ < 0)
141  
    if (epoll_fd_ < 0)
140  
        detail::throw_system_error(make_err(errno), "epoll_create1");
142  
        detail::throw_system_error(make_err(errno), "epoll_create1");
141  

143  

142  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
144  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
143  
    if (event_fd_ < 0)
145  
    if (event_fd_ < 0)
144  
    {
146  
    {
145  
        int errn = errno;
147  
        int errn = errno;
146  
        ::close(epoll_fd_);
148  
        ::close(epoll_fd_);
147  
        detail::throw_system_error(make_err(errn), "eventfd");
149  
        detail::throw_system_error(make_err(errn), "eventfd");
148  
    }
150  
    }
149  

151  

150  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
152  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
151  
    if (timer_fd_ < 0)
153  
    if (timer_fd_ < 0)
152  
    {
154  
    {
153  
        int errn = errno;
155  
        int errn = errno;
154  
        ::close(event_fd_);
156  
        ::close(event_fd_);
155  
        ::close(epoll_fd_);
157  
        ::close(epoll_fd_);
156  
        detail::throw_system_error(make_err(errn), "timerfd_create");
158  
        detail::throw_system_error(make_err(errn), "timerfd_create");
157  
    }
159  
    }
158  

160  

159  
    epoll_event ev{};
161  
    epoll_event ev{};
160  
    ev.events   = EPOLLIN | EPOLLET;
162  
    ev.events   = EPOLLIN | EPOLLET;
161  
    ev.data.ptr = nullptr;
163  
    ev.data.ptr = nullptr;
162  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
164  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
163  
    {
165  
    {
164  
        int errn = errno;
166  
        int errn = errno;
165  
        ::close(timer_fd_);
167  
        ::close(timer_fd_);
166  
        ::close(event_fd_);
168  
        ::close(event_fd_);
167  
        ::close(epoll_fd_);
169  
        ::close(epoll_fd_);
168  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
170  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
169  
    }
171  
    }
170  

172  

171  
    epoll_event timer_ev{};
173  
    epoll_event timer_ev{};
172  
    timer_ev.events   = EPOLLIN | EPOLLERR;
174  
    timer_ev.events   = EPOLLIN | EPOLLERR;
173  
    timer_ev.data.ptr = &timer_fd_;
175  
    timer_ev.data.ptr = &timer_fd_;
174  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
176  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
175  
    {
177  
    {
176  
        int errn = errno;
178  
        int errn = errno;
177  
        ::close(timer_fd_);
179  
        ::close(timer_fd_);
178  
        ::close(event_fd_);
180  
        ::close(event_fd_);
179  
        ::close(epoll_fd_);
181  
        ::close(epoll_fd_);
180  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
182  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
181  
    }
183  
    }
182  

184  

183  
    timer_svc_ = &get_timer_service(ctx, *this);
185  
    timer_svc_ = &get_timer_service(ctx, *this);
184  
    timer_svc_->set_on_earliest_changed(
186  
    timer_svc_->set_on_earliest_changed(
185  
        timer_service::callback(this, [](void* p) {
187  
        timer_service::callback(this, [](void* p) {
186  
            auto* self = static_cast<epoll_scheduler*>(p);
188  
            auto* self = static_cast<epoll_scheduler*>(p);
187  
            self->timerfd_stale_.store(true, std::memory_order_release);
189  
            self->timerfd_stale_.store(true, std::memory_order_release);
188  
            self->interrupt_reactor();
190  
            self->interrupt_reactor();
189  
        }));
191  
        }));
190  

192  

191  
    get_resolver_service(ctx, *this);
193  
    get_resolver_service(ctx, *this);
192  
    get_signal_service(ctx, *this);
194  
    get_signal_service(ctx, *this);
 
195 +
    get_stream_file_service(ctx, *this);
 
196 +
    get_random_access_file_service(ctx, *this);
193  

197  

194  
    completed_ops_.push(&task_op_);
198  
    completed_ops_.push(&task_op_);
195  
}
199  
}
196  

200  

197  
inline epoll_scheduler::~epoll_scheduler()
201  
inline epoll_scheduler::~epoll_scheduler()
198  
{
202  
{
199  
    if (timer_fd_ >= 0)
203  
    if (timer_fd_ >= 0)
200  
        ::close(timer_fd_);
204  
        ::close(timer_fd_);
201  
    if (event_fd_ >= 0)
205  
    if (event_fd_ >= 0)
202  
        ::close(event_fd_);
206  
        ::close(event_fd_);
203  
    if (epoll_fd_ >= 0)
207  
    if (epoll_fd_ >= 0)
204  
        ::close(epoll_fd_);
208  
        ::close(epoll_fd_);
205  
}
209  
}
206  

210  

207  
inline void
211  
inline void
208  
epoll_scheduler::shutdown()
212  
epoll_scheduler::shutdown()
209  
{
213  
{
210  
    shutdown_drain();
214  
    shutdown_drain();
211  

215  

212  
    if (event_fd_ >= 0)
216  
    if (event_fd_ >= 0)
213  
        interrupt_reactor();
217  
        interrupt_reactor();
214  
}
218  
}
215  

219  

216  
inline void
220  
inline void
217  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
221  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
218  
{
222  
{
219  
    epoll_event ev{};
223  
    epoll_event ev{};
220  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
224  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
221  
    ev.data.ptr = desc;
225  
    ev.data.ptr = desc;
222  

226  

223  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
227  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
224  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
228  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
225  

229  

226  
    desc->registered_events = ev.events;
230  
    desc->registered_events = ev.events;
227  
    desc->fd                = fd;
231  
    desc->fd                = fd;
228  
    desc->scheduler_        = this;
232  
    desc->scheduler_        = this;
229  
    desc->ready_events_.store(0, std::memory_order_relaxed);
233  
    desc->ready_events_.store(0, std::memory_order_relaxed);
230  

234  

231  
    std::lock_guard lock(desc->mutex);
235  
    std::lock_guard lock(desc->mutex);
232  
    desc->impl_ref_.reset();
236  
    desc->impl_ref_.reset();
233  
    desc->read_ready  = false;
237  
    desc->read_ready  = false;
234  
    desc->write_ready = false;
238  
    desc->write_ready = false;
235  
}
239  
}
236  

240  

237  
inline void
241  
inline void
238  
epoll_scheduler::deregister_descriptor(int fd) const
242  
epoll_scheduler::deregister_descriptor(int fd) const
239  
{
243  
{
240  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
244  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
241  
}
245  
}
242  

246  

243  
inline void
247  
inline void
244  
epoll_scheduler::interrupt_reactor() const
248  
epoll_scheduler::interrupt_reactor() const
245  
{
249  
{
246  
    bool expected = false;
250  
    bool expected = false;
247  
    if (eventfd_armed_.compare_exchange_strong(
251  
    if (eventfd_armed_.compare_exchange_strong(
248  
            expected, true, std::memory_order_release,
252  
            expected, true, std::memory_order_release,
249  
            std::memory_order_relaxed))
253  
            std::memory_order_relaxed))
250  
    {
254  
    {
251  
        std::uint64_t val       = 1;
255  
        std::uint64_t val       = 1;
252  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
256  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
253  
    }
257  
    }
254  
}
258  
}
255  

259  

256  
inline void
260  
inline void
257  
epoll_scheduler::update_timerfd() const
261  
epoll_scheduler::update_timerfd() const
258  
{
262  
{
259  
    auto nearest = timer_svc_->nearest_expiry();
263  
    auto nearest = timer_svc_->nearest_expiry();
260  

264  

261  
    itimerspec ts{};
265  
    itimerspec ts{};
262  
    int flags = 0;
266  
    int flags = 0;
263  

267  

264  
    if (nearest == timer_service::time_point::max())
268  
    if (nearest == timer_service::time_point::max())
265  
    {
269  
    {
266  
        // No timers — disarm by setting to 0 (relative)
270  
        // No timers — disarm by setting to 0 (relative)
267  
    }
271  
    }
268  
    else
272  
    else
269  
    {
273  
    {
270  
        auto now = std::chrono::steady_clock::now();
274  
        auto now = std::chrono::steady_clock::now();
271  
        if (nearest <= now)
275  
        if (nearest <= now)
272  
        {
276  
        {
273  
            // Use 1ns instead of 0 — zero disarms the timerfd
277  
            // Use 1ns instead of 0 — zero disarms the timerfd
274  
            ts.it_value.tv_nsec = 1;
278  
            ts.it_value.tv_nsec = 1;
275  
        }
279  
        }
276  
        else
280  
        else
277  
        {
281  
        {
278  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
282  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
279  
                            nearest - now)
283  
                            nearest - now)
280  
                            .count();
284  
                            .count();
281  
            ts.it_value.tv_sec  = nsec / 1000000000;
285  
            ts.it_value.tv_sec  = nsec / 1000000000;
282  
            ts.it_value.tv_nsec = nsec % 1000000000;
286  
            ts.it_value.tv_nsec = nsec % 1000000000;
283  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
287  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
284  
                ts.it_value.tv_nsec = 1;
288  
                ts.it_value.tv_nsec = 1;
285  
        }
289  
        }
286  
    }
290  
    }
287  

291  

288  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
292  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
289  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
293  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
290  
}
294  
}
291  

295  

292  
inline void
296  
inline void
293  
epoll_scheduler::run_task(std::unique_lock<std::mutex>& lock, context_type* ctx)
297  
epoll_scheduler::run_task(std::unique_lock<std::mutex>& lock, context_type* ctx)
294  
{
298  
{
295  
    int timeout_ms = task_interrupted_ ? 0 : -1;
299  
    int timeout_ms = task_interrupted_ ? 0 : -1;
296  

300  

297  
    if (lock.owns_lock())
301  
    if (lock.owns_lock())
298  
        lock.unlock();
302  
        lock.unlock();
299  

303  

300  
    task_cleanup on_exit{this, &lock, ctx};
304  
    task_cleanup on_exit{this, &lock, ctx};
301  

305  

302  
    // Flush deferred timerfd programming before blocking
306  
    // Flush deferred timerfd programming before blocking
303  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
307  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
304  
        update_timerfd();
308  
        update_timerfd();
305  

309  

306  
    epoll_event events[128];
310  
    epoll_event events[128];
307  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
311  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
308  

312  

309  
    if (nfds < 0 && errno != EINTR)
313  
    if (nfds < 0 && errno != EINTR)
310  
        detail::throw_system_error(make_err(errno), "epoll_wait");
314  
        detail::throw_system_error(make_err(errno), "epoll_wait");
311  

315  

312  
    bool check_timers = false;
316  
    bool check_timers = false;
313  
    op_queue local_ops;
317  
    op_queue local_ops;
314  

318  

315  
    for (int i = 0; i < nfds; ++i)
319  
    for (int i = 0; i < nfds; ++i)
316  
    {
320  
    {
317  
        if (events[i].data.ptr == nullptr)
321  
        if (events[i].data.ptr == nullptr)
318  
        {
322  
        {
319  
            std::uint64_t val;
323  
            std::uint64_t val;
320  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
324  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
321  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
325  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
322  
            eventfd_armed_.store(false, std::memory_order_relaxed);
326  
            eventfd_armed_.store(false, std::memory_order_relaxed);
323  
            continue;
327  
            continue;
324  
        }
328  
        }
325  

329  

326  
        if (events[i].data.ptr == &timer_fd_)
330  
        if (events[i].data.ptr == &timer_fd_)
327  
        {
331  
        {
328  
            std::uint64_t expirations;
332  
            std::uint64_t expirations;
329  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
333  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
330  
            [[maybe_unused]] auto r =
334  
            [[maybe_unused]] auto r =
331  
                ::read(timer_fd_, &expirations, sizeof(expirations));
335  
                ::read(timer_fd_, &expirations, sizeof(expirations));
332  
            check_timers = true;
336  
            check_timers = true;
333  
            continue;
337  
            continue;
334  
        }
338  
        }
335  

339  

336  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
340  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
337  
        desc->add_ready_events(events[i].events);
341  
        desc->add_ready_events(events[i].events);
338  

342  

339  
        bool expected = false;
343  
        bool expected = false;
340  
        if (desc->is_enqueued_.compare_exchange_strong(
344  
        if (desc->is_enqueued_.compare_exchange_strong(
341  
                expected, true, std::memory_order_release,
345  
                expected, true, std::memory_order_release,
342  
                std::memory_order_relaxed))
346  
                std::memory_order_relaxed))
343  
        {
347  
        {
344  
            local_ops.push(desc);
348  
            local_ops.push(desc);
345  
        }
349  
        }
346  
    }
350  
    }
347  

351  

348  
    if (check_timers)
352  
    if (check_timers)
349  
    {
353  
    {
350  
        timer_svc_->process_expired();
354  
        timer_svc_->process_expired();
351  
        update_timerfd();
355  
        update_timerfd();
352  
    }
356  
    }
353  

357  

354  
    lock.lock();
358  
    lock.lock();
355  

359  

356  
    if (!local_ops.empty())
360  
    if (!local_ops.empty())
357  
        completed_ops_.splice(local_ops);
361  
        completed_ops_.splice(local_ops);
358  
}
362  
}
359  

363  

360  
} // namespace boost::corosio::detail
364  
} // namespace boost::corosio::detail
361  

365  

362  
#endif // BOOST_COROSIO_HAS_EPOLL
366  
#endif // BOOST_COROSIO_HAS_EPOLL
363  

367  

364  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
368  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP