1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21  

21  

22  
#include <boost/corosio/native/detail/select/select_op.hpp>
22  
#include <boost/corosio/native/detail/select/select_op.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
 
27 +
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
 
28 +
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
27  

29  

28  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/except.hpp>
29  

31  

30  
#include <sys/select.h>
32  
#include <sys/select.h>
31  
#include <unistd.h>
33  
#include <unistd.h>
32  
#include <errno.h>
34  
#include <errno.h>
33  
#include <fcntl.h>
35  
#include <fcntl.h>
34  

36  

35  
#include <atomic>
37  
#include <atomic>
36  
#include <chrono>
38  
#include <chrono>
37  
#include <cstdint>
39  
#include <cstdint>
38  
#include <limits>
40  
#include <limits>
39  
#include <mutex>
41  
#include <mutex>
40  
#include <unordered_map>
42  
#include <unordered_map>
41  

43  

42  
namespace boost::corosio::detail {
44  
namespace boost::corosio::detail {
43  

45  

44  
struct select_op;
46  
struct select_op;
45  
struct select_descriptor_state;
47  
struct select_descriptor_state;
46  

48  

47  
/** POSIX scheduler using select() for I/O multiplexing.
49  
/** POSIX scheduler using select() for I/O multiplexing.
48  

50  

49  
    This scheduler implements the scheduler interface using the POSIX select()
51  
    This scheduler implements the scheduler interface using the POSIX select()
50  
    call for I/O event notification. It inherits the shared reactor threading
52  
    call for I/O event notification. It inherits the shared reactor threading
51  
    model from reactor_scheduler_base: signal state machine, inline completion
53  
    model from reactor_scheduler_base: signal state machine, inline completion
52  
    budget, work counting, and the do_one event loop.
54  
    budget, work counting, and the do_one event loop.
53  

55  

54  
    The design mirrors epoll_scheduler for behavioral consistency:
56  
    The design mirrors epoll_scheduler for behavioral consistency:
55  
    - Same single-reactor thread coordination model
57  
    - Same single-reactor thread coordination model
56  
    - Same deferred I/O pattern (reactor marks ready; workers do I/O)
58  
    - Same deferred I/O pattern (reactor marks ready; workers do I/O)
57  
    - Same timer integration pattern
59  
    - Same timer integration pattern
58  

60  

59  
    Known Limitations:
61  
    Known Limitations:
60  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
62  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
61  
    - O(n) scanning: rebuilds fd_sets each iteration
63  
    - O(n) scanning: rebuilds fd_sets each iteration
62  
    - Level-triggered only (no edge-triggered mode)
64  
    - Level-triggered only (no edge-triggered mode)
63  

65  

64  
    @par Thread Safety
66  
    @par Thread Safety
65  
    All public member functions are thread-safe.
67  
    All public member functions are thread-safe.
66  
*/
68  
*/
67  
class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler_base
69  
class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler_base
68  
{
70  
{
69  
public:
71  
public:
70  
    /** Construct the scheduler.
72  
    /** Construct the scheduler.
71  

73  

72  
        Creates a self-pipe for reactor interruption.
74  
        Creates a self-pipe for reactor interruption.
73  

75  

74  
        @param ctx Reference to the owning execution_context.
76  
        @param ctx Reference to the owning execution_context.
75  
        @param concurrency_hint Hint for expected thread count (unused).
77  
        @param concurrency_hint Hint for expected thread count (unused).
76  
    */
78  
    */
77  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
78  

80  

79  
    /// Destroy the scheduler.
81  
    /// Destroy the scheduler.
80  
    ~select_scheduler() override;
82  
    ~select_scheduler() override;
81  

83  

82  
    select_scheduler(select_scheduler const&)            = delete;
84  
    select_scheduler(select_scheduler const&)            = delete;
83  
    select_scheduler& operator=(select_scheduler const&) = delete;
85  
    select_scheduler& operator=(select_scheduler const&) = delete;
84  

86  

85  
    /// Shut down the scheduler, draining pending operations.
87  
    /// Shut down the scheduler, draining pending operations.
86  
    void shutdown() override;
88  
    void shutdown() override;
87  

89  

88  
    /** Return the maximum file descriptor value supported.
90  
    /** Return the maximum file descriptor value supported.
89  

91  

90  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
92  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
91  
        monitored by select(). Operations with fd >= FD_SETSIZE
93  
        monitored by select(). Operations with fd >= FD_SETSIZE
92  
        will fail with EINVAL.
94  
        will fail with EINVAL.
93  

95  

94  
        @return The maximum supported file descriptor value.
96  
        @return The maximum supported file descriptor value.
95  
    */
97  
    */
96  
    static constexpr int max_fd() noexcept
98  
    static constexpr int max_fd() noexcept
97  
    {
99  
    {
98  
        return FD_SETSIZE - 1;
100  
        return FD_SETSIZE - 1;
99  
    }
101  
    }
100  

102  

101  
    /** Register a descriptor for persistent monitoring.
103  
    /** Register a descriptor for persistent monitoring.
102  

104  

103  
        The fd is added to the registered_descs_ map and will be
105  
        The fd is added to the registered_descs_ map and will be
104  
        included in subsequent select() calls. The reactor is
106  
        included in subsequent select() calls. The reactor is
105  
        interrupted so a blocked select() rebuilds its fd_sets.
107  
        interrupted so a blocked select() rebuilds its fd_sets.
106  

108  

107  
        @param fd The file descriptor to register.
109  
        @param fd The file descriptor to register.
108  
        @param desc Pointer to descriptor state for this fd.
110  
        @param desc Pointer to descriptor state for this fd.
109  
    */
111  
    */
110  
    void register_descriptor(int fd, select_descriptor_state* desc) const;
112  
    void register_descriptor(int fd, select_descriptor_state* desc) const;
111  

113  

112  
    /** Deregister a persistently registered descriptor.
114  
    /** Deregister a persistently registered descriptor.
113  

115  

114  
        @param fd The file descriptor to deregister.
116  
        @param fd The file descriptor to deregister.
115  
    */
117  
    */
116  
    void deregister_descriptor(int fd) const;
118  
    void deregister_descriptor(int fd) const;
117  

119  

118  
    /** Interrupt the reactor so it rebuilds its fd_sets.
120  
    /** Interrupt the reactor so it rebuilds its fd_sets.
119  

121  

120  
        Called when a write or connect op is registered after
122  
        Called when a write or connect op is registered after
121  
        the reactor's snapshot was taken. Without this, select()
123  
        the reactor's snapshot was taken. Without this, select()
122  
        may block not watching for writability on the fd.
124  
        may block not watching for writability on the fd.
123  
    */
125  
    */
124  
    void notify_reactor() const;
126  
    void notify_reactor() const;
125  

127  

126  
private:
128  
private:
127  
    void
129  
    void
128  
    run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
130  
    run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
129  
    void interrupt_reactor() const override;
131  
    void interrupt_reactor() const override;
130  
    long calculate_timeout(long requested_timeout_us) const;
132  
    long calculate_timeout(long requested_timeout_us) const;
131  

133  

132  
    // Self-pipe for interrupting select()
134  
    // Self-pipe for interrupting select()
133  
    int pipe_fds_[2]; // [0]=read, [1]=write
135  
    int pipe_fds_[2]; // [0]=read, [1]=write
134  

136  

135  
    // Per-fd tracking for fd_set building
137  
    // Per-fd tracking for fd_set building
136  
    mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
138  
    mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
137  
    mutable int max_fd_ = -1;
139  
    mutable int max_fd_ = -1;
138  
};
140  
};
139  

141  

140  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
142  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
141  
    : pipe_fds_{-1, -1}
143  
    : pipe_fds_{-1, -1}
142  
    , max_fd_(-1)
144  
    , max_fd_(-1)
143  
{
145  
{
144  
    if (::pipe(pipe_fds_) < 0)
146  
    if (::pipe(pipe_fds_) < 0)
145  
        detail::throw_system_error(make_err(errno), "pipe");
147  
        detail::throw_system_error(make_err(errno), "pipe");
146  

148  

147  
    for (int i = 0; i < 2; ++i)
149  
    for (int i = 0; i < 2; ++i)
148  
    {
150  
    {
149  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
151  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
150  
        if (flags == -1)
152  
        if (flags == -1)
151  
        {
153  
        {
152  
            int errn = errno;
154  
            int errn = errno;
153  
            ::close(pipe_fds_[0]);
155  
            ::close(pipe_fds_[0]);
154  
            ::close(pipe_fds_[1]);
156  
            ::close(pipe_fds_[1]);
155  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
157  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
156  
        }
158  
        }
157  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
159  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
158  
        {
160  
        {
159  
            int errn = errno;
161  
            int errn = errno;
160  
            ::close(pipe_fds_[0]);
162  
            ::close(pipe_fds_[0]);
161  
            ::close(pipe_fds_[1]);
163  
            ::close(pipe_fds_[1]);
162  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
164  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
163  
        }
165  
        }
164  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
166  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
165  
        {
167  
        {
166  
            int errn = errno;
168  
            int errn = errno;
167  
            ::close(pipe_fds_[0]);
169  
            ::close(pipe_fds_[0]);
168  
            ::close(pipe_fds_[1]);
170  
            ::close(pipe_fds_[1]);
169  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
171  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
170  
        }
172  
        }
171  
    }
173  
    }
172  

174  

173  
    timer_svc_ = &get_timer_service(ctx, *this);
175  
    timer_svc_ = &get_timer_service(ctx, *this);
174  
    timer_svc_->set_on_earliest_changed(
176  
    timer_svc_->set_on_earliest_changed(
175  
        timer_service::callback(this, [](void* p) {
177  
        timer_service::callback(this, [](void* p) {
176  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
178  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
177  
        }));
179  
        }));
178  

180  

179  
    get_resolver_service(ctx, *this);
181  
    get_resolver_service(ctx, *this);
180  
    get_signal_service(ctx, *this);
182  
    get_signal_service(ctx, *this);
 
183 +
    get_stream_file_service(ctx, *this);
 
184 +
    get_random_access_file_service(ctx, *this);
181  

185  

182  
    completed_ops_.push(&task_op_);
186  
    completed_ops_.push(&task_op_);
183  
}
187  
}
184  

188  

185  
inline select_scheduler::~select_scheduler()
189  
inline select_scheduler::~select_scheduler()
186  
{
190  
{
187  
    if (pipe_fds_[0] >= 0)
191  
    if (pipe_fds_[0] >= 0)
188  
        ::close(pipe_fds_[0]);
192  
        ::close(pipe_fds_[0]);
189  
    if (pipe_fds_[1] >= 0)
193  
    if (pipe_fds_[1] >= 0)
190  
        ::close(pipe_fds_[1]);
194  
        ::close(pipe_fds_[1]);
191  
}
195  
}
192  

196  

193  
inline void
197  
inline void
194  
select_scheduler::shutdown()
198  
select_scheduler::shutdown()
195  
{
199  
{
196  
    shutdown_drain();
200  
    shutdown_drain();
197  

201  

198  
    if (pipe_fds_[1] >= 0)
202  
    if (pipe_fds_[1] >= 0)
199  
        interrupt_reactor();
203  
        interrupt_reactor();
200  
}
204  
}
201  

205  

202  
inline void
206  
inline void
203  
select_scheduler::register_descriptor(
207  
select_scheduler::register_descriptor(
204  
    int fd, select_descriptor_state* desc) const
208  
    int fd, select_descriptor_state* desc) const
205  
{
209  
{
206  
    if (fd < 0 || fd >= FD_SETSIZE)
210  
    if (fd < 0 || fd >= FD_SETSIZE)
207  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
211  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
208  

212  

209  
    desc->registered_events = reactor_event_read | reactor_event_write;
213  
    desc->registered_events = reactor_event_read | reactor_event_write;
210  
    desc->fd                = fd;
214  
    desc->fd                = fd;
211  
    desc->scheduler_        = this;
215  
    desc->scheduler_        = this;
212  
    desc->ready_events_.store(0, std::memory_order_relaxed);
216  
    desc->ready_events_.store(0, std::memory_order_relaxed);
213  

217  

214  
    {
218  
    {
215  
        std::lock_guard lock(desc->mutex);
219  
        std::lock_guard lock(desc->mutex);
216  
        desc->impl_ref_.reset();
220  
        desc->impl_ref_.reset();
217  
        desc->read_ready  = false;
221  
        desc->read_ready  = false;
218  
        desc->write_ready = false;
222  
        desc->write_ready = false;
219  
    }
223  
    }
220  

224  

221  
    {
225  
    {
222  
        std::lock_guard lock(mutex_);
226  
        std::lock_guard lock(mutex_);
223  
        registered_descs_[fd] = desc;
227  
        registered_descs_[fd] = desc;
224  
        if (fd > max_fd_)
228  
        if (fd > max_fd_)
225  
            max_fd_ = fd;
229  
            max_fd_ = fd;
226  
    }
230  
    }
227  

231  

228  
    interrupt_reactor();
232  
    interrupt_reactor();
229  
}
233  
}
230  

234  

231  
inline void
235  
inline void
232  
select_scheduler::deregister_descriptor(int fd) const
236  
select_scheduler::deregister_descriptor(int fd) const
233  
{
237  
{
234  
    std::lock_guard lock(mutex_);
238  
    std::lock_guard lock(mutex_);
235  

239  

236  
    auto it = registered_descs_.find(fd);
240  
    auto it = registered_descs_.find(fd);
237  
    if (it == registered_descs_.end())
241  
    if (it == registered_descs_.end())
238  
        return;
242  
        return;
239  

243  

240  
    registered_descs_.erase(it);
244  
    registered_descs_.erase(it);
241  

245  

242  
    if (fd == max_fd_)
246  
    if (fd == max_fd_)
243  
    {
247  
    {
244  
        max_fd_ = pipe_fds_[0];
248  
        max_fd_ = pipe_fds_[0];
245  
        for (auto& [registered_fd, state] : registered_descs_)
249  
        for (auto& [registered_fd, state] : registered_descs_)
246  
        {
250  
        {
247  
            if (registered_fd > max_fd_)
251  
            if (registered_fd > max_fd_)
248  
                max_fd_ = registered_fd;
252  
                max_fd_ = registered_fd;
249  
        }
253  
        }
250  
    }
254  
    }
251  
}
255  
}
252  

256  

253  
inline void
257  
inline void
254  
select_scheduler::notify_reactor() const
258  
select_scheduler::notify_reactor() const
255  
{
259  
{
256  
    interrupt_reactor();
260  
    interrupt_reactor();
257  
}
261  
}
258  

262  

259  
inline void
263  
inline void
260  
select_scheduler::interrupt_reactor() const
264  
select_scheduler::interrupt_reactor() const
261  
{
265  
{
262  
    char byte               = 1;
266  
    char byte               = 1;
263  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
267  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
264  
}
268  
}
265  

269  

266  
inline long
270  
inline long
267  
select_scheduler::calculate_timeout(long requested_timeout_us) const
271  
select_scheduler::calculate_timeout(long requested_timeout_us) const
268  
{
272  
{
269  
    if (requested_timeout_us == 0)
273  
    if (requested_timeout_us == 0)
270  
        return 0;
274  
        return 0;
271  

275  

272  
    auto nearest = timer_svc_->nearest_expiry();
276  
    auto nearest = timer_svc_->nearest_expiry();
273  
    if (nearest == timer_service::time_point::max())
277  
    if (nearest == timer_service::time_point::max())
274  
        return requested_timeout_us;
278  
        return requested_timeout_us;
275  

279  

276  
    auto now = std::chrono::steady_clock::now();
280  
    auto now = std::chrono::steady_clock::now();
277  
    if (nearest <= now)
281  
    if (nearest <= now)
278  
        return 0;
282  
        return 0;
279  

283  

280  
    auto timer_timeout_us =
284  
    auto timer_timeout_us =
281  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
285  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
282  
            .count();
286  
            .count();
283  

287  

284  
    constexpr auto long_max =
288  
    constexpr auto long_max =
285  
        static_cast<long long>((std::numeric_limits<long>::max)());
289  
        static_cast<long long>((std::numeric_limits<long>::max)());
286  
    auto capped_timer_us =
290  
    auto capped_timer_us =
287  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
291  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
288  
                              static_cast<long long>(0)),
292  
                              static_cast<long long>(0)),
289  
                   long_max);
293  
                   long_max);
290  

294  

291  
    if (requested_timeout_us < 0)
295  
    if (requested_timeout_us < 0)
292  
        return static_cast<long>(capped_timer_us);
296  
        return static_cast<long>(capped_timer_us);
293  

297  

294  
    return static_cast<long>(
298  
    return static_cast<long>(
295  
        (std::min)(static_cast<long long>(requested_timeout_us),
299  
        (std::min)(static_cast<long long>(requested_timeout_us),
296  
                   capped_timer_us));
300  
                   capped_timer_us));
297  
}
301  
}
298  

302  

299  
inline void
303  
inline void
300  
select_scheduler::run_task(
304  
select_scheduler::run_task(
301  
    std::unique_lock<std::mutex>& lock, context_type* ctx)
305  
    std::unique_lock<std::mutex>& lock, context_type* ctx)
302  
{
306  
{
303  
    long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1);
307  
    long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1);
304  

308  

305  
    // Snapshot registered descriptors while holding lock.
309  
    // Snapshot registered descriptors while holding lock.
306  
    // Record which fds need write monitoring to avoid a hot loop:
310  
    // Record which fds need write monitoring to avoid a hot loop:
307  
    // select is level-triggered so writable sockets (nearly always
311  
    // select is level-triggered so writable sockets (nearly always
308  
    // writable) would cause select() to return immediately every
312  
    // writable) would cause select() to return immediately every
309  
    // iteration if unconditionally added to write_fds.
313  
    // iteration if unconditionally added to write_fds.
310  
    struct fd_entry
314  
    struct fd_entry
311  
    {
315  
    {
312  
        int fd;
316  
        int fd;
313  
        select_descriptor_state* desc;
317  
        select_descriptor_state* desc;
314  
        bool needs_write;
318  
        bool needs_write;
315  
    };
319  
    };
316  
    fd_entry snapshot[FD_SETSIZE];
320  
    fd_entry snapshot[FD_SETSIZE];
317  
    int snapshot_count = 0;
321  
    int snapshot_count = 0;
318  

322  

319  
    for (auto& [fd, desc] : registered_descs_)
323  
    for (auto& [fd, desc] : registered_descs_)
320  
    {
324  
    {
321  
        if (snapshot_count < FD_SETSIZE)
325  
        if (snapshot_count < FD_SETSIZE)
322  
        {
326  
        {
323  
            std::lock_guard desc_lock(desc->mutex);
327  
            std::lock_guard desc_lock(desc->mutex);
324  
            snapshot[snapshot_count].fd   = fd;
328  
            snapshot[snapshot_count].fd   = fd;
325  
            snapshot[snapshot_count].desc = desc;
329  
            snapshot[snapshot_count].desc = desc;
326  
            snapshot[snapshot_count].needs_write =
330  
            snapshot[snapshot_count].needs_write =
327  
                (desc->write_op || desc->connect_op);
331  
                (desc->write_op || desc->connect_op);
328  
            ++snapshot_count;
332  
            ++snapshot_count;
329  
        }
333  
        }
330  
    }
334  
    }
331  

335  

332  
    if (lock.owns_lock())
336  
    if (lock.owns_lock())
333  
        lock.unlock();
337  
        lock.unlock();
334  

338  

335  
    task_cleanup on_exit{this, &lock, ctx};
339  
    task_cleanup on_exit{this, &lock, ctx};
336  

340  

337  
    fd_set read_fds, write_fds, except_fds;
341  
    fd_set read_fds, write_fds, except_fds;
338  
    FD_ZERO(&read_fds);
342  
    FD_ZERO(&read_fds);
339  
    FD_ZERO(&write_fds);
343  
    FD_ZERO(&write_fds);
340  
    FD_ZERO(&except_fds);
344  
    FD_ZERO(&except_fds);
341  

345  

342  
    FD_SET(pipe_fds_[0], &read_fds);
346  
    FD_SET(pipe_fds_[0], &read_fds);
343  
    int nfds = pipe_fds_[0];
347  
    int nfds = pipe_fds_[0];
344  

348  

345  
    for (int i = 0; i < snapshot_count; ++i)
349  
    for (int i = 0; i < snapshot_count; ++i)
346  
    {
350  
    {
347  
        int fd = snapshot[i].fd;
351  
        int fd = snapshot[i].fd;
348  
        FD_SET(fd, &read_fds);
352  
        FD_SET(fd, &read_fds);
349  
        if (snapshot[i].needs_write)
353  
        if (snapshot[i].needs_write)
350  
            FD_SET(fd, &write_fds);
354  
            FD_SET(fd, &write_fds);
351  
        FD_SET(fd, &except_fds);
355  
        FD_SET(fd, &except_fds);
352  
        if (fd > nfds)
356  
        if (fd > nfds)
353  
            nfds = fd;
357  
            nfds = fd;
354  
    }
358  
    }
355  

359  

356  
    struct timeval tv;
360  
    struct timeval tv;
357  
    struct timeval* tv_ptr = nullptr;
361  
    struct timeval* tv_ptr = nullptr;
358  
    if (effective_timeout_us >= 0)
362  
    if (effective_timeout_us >= 0)
359  
    {
363  
    {
360  
        tv.tv_sec  = effective_timeout_us / 1000000;
364  
        tv.tv_sec  = effective_timeout_us / 1000000;
361  
        tv.tv_usec = effective_timeout_us % 1000000;
365  
        tv.tv_usec = effective_timeout_us % 1000000;
362  
        tv_ptr     = &tv;
366  
        tv_ptr     = &tv;
363  
    }
367  
    }
364  

368  

365  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
369  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
366  

370  

367  
    // EINTR: signal interrupted select(), just retry.
371  
    // EINTR: signal interrupted select(), just retry.
368  
    // EBADF: an fd was closed between snapshot and select(); retry
372  
    // EBADF: an fd was closed between snapshot and select(); retry
369  
    // with a fresh snapshot from registered_descs_.
373  
    // with a fresh snapshot from registered_descs_.
370  
    if (ready < 0)
374  
    if (ready < 0)
371  
    {
375  
    {
372  
        if (errno == EINTR || errno == EBADF)
376  
        if (errno == EINTR || errno == EBADF)
373  
            return;
377  
            return;
374  
        detail::throw_system_error(make_err(errno), "select");
378  
        detail::throw_system_error(make_err(errno), "select");
375  
    }
379  
    }
376  

380  

377  
    // Process timers outside the lock
381  
    // Process timers outside the lock
378  
    timer_svc_->process_expired();
382  
    timer_svc_->process_expired();
379  

383  

380  
    op_queue local_ops;
384  
    op_queue local_ops;
381  

385  

382  
    if (ready > 0)
386  
    if (ready > 0)
383  
    {
387  
    {
384  
        if (FD_ISSET(pipe_fds_[0], &read_fds))
388  
        if (FD_ISSET(pipe_fds_[0], &read_fds))
385  
        {
389  
        {
386  
            char buf[256];
390  
            char buf[256];
387  
            while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
391  
            while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
388  
            {
392  
            {
389  
            }
393  
            }
390  
        }
394  
        }
391  

395  

392  
        for (int i = 0; i < snapshot_count; ++i)
396  
        for (int i = 0; i < snapshot_count; ++i)
393  
        {
397  
        {
394  
            int fd                        = snapshot[i].fd;
398  
            int fd                        = snapshot[i].fd;
395  
            select_descriptor_state* desc = snapshot[i].desc;
399  
            select_descriptor_state* desc = snapshot[i].desc;
396  

400  

397  
            std::uint32_t flags = 0;
401  
            std::uint32_t flags = 0;
398  
            if (FD_ISSET(fd, &read_fds))
402  
            if (FD_ISSET(fd, &read_fds))
399  
                flags |= reactor_event_read;
403  
                flags |= reactor_event_read;
400  
            if (FD_ISSET(fd, &write_fds))
404  
            if (FD_ISSET(fd, &write_fds))
401  
                flags |= reactor_event_write;
405  
                flags |= reactor_event_write;
402  
            if (FD_ISSET(fd, &except_fds))
406  
            if (FD_ISSET(fd, &except_fds))
403  
                flags |= reactor_event_error;
407  
                flags |= reactor_event_error;
404  

408  

405  
            if (flags == 0)
409  
            if (flags == 0)
406  
                continue;
410  
                continue;
407  

411  

408  
            desc->add_ready_events(flags);
412  
            desc->add_ready_events(flags);
409  

413  

410  
            bool expected = false;
414  
            bool expected = false;
411  
            if (desc->is_enqueued_.compare_exchange_strong(
415  
            if (desc->is_enqueued_.compare_exchange_strong(
412  
                    expected, true, std::memory_order_release,
416  
                    expected, true, std::memory_order_release,
413  
                    std::memory_order_relaxed))
417  
                    std::memory_order_relaxed))
414  
            {
418  
            {
415  
                local_ops.push(desc);
419  
                local_ops.push(desc);
416  
            }
420  
            }
417  
        }
421  
        }
418  
    }
422  
    }
419  

423  

420  
    lock.lock();
424  
    lock.lock();
421  

425  

422  
    if (!local_ops.empty())
426  
    if (!local_ops.empty())
423  
        completed_ops_.splice(local_ops);
427  
        completed_ops_.splice(local_ops);
424  
}
428  
}
425  

429  

426  
} // namespace boost::corosio::detail
430  
} // namespace boost::corosio::detail
427  

431  

428  
#endif // BOOST_COROSIO_HAS_SELECT
432  
#endif // BOOST_COROSIO_HAS_SELECT
429  

433  

430  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
434  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP