include/boost/corosio/native/detail/select/select_scheduler.hpp

86.6% Lines (142/164) 100.0% List of functions (10/10)
select_scheduler.hpp
f(x) Functions (10)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21
22 #include <boost/corosio/native/detail/select/select_op.hpp>
23 #include <boost/corosio/detail/timer_service.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29
30 #include <boost/corosio/detail/except.hpp>
31
32 #include <sys/select.h>
33 #include <unistd.h>
34 #include <errno.h>
35 #include <fcntl.h>
36
37 #include <atomic>
38 #include <chrono>
39 #include <cstdint>
40 #include <limits>
41 #include <mutex>
42 #include <unordered_map>
43
44 namespace boost::corosio::detail {
45
46 struct select_op;
47 struct select_descriptor_state;
48
49 /** POSIX scheduler using select() for I/O multiplexing.
50
51 This scheduler implements the scheduler interface using the POSIX select()
52 call for I/O event notification. It inherits the shared reactor threading
53 model from reactor_scheduler_base: signal state machine, inline completion
54 budget, work counting, and the do_one event loop.
55
56 The design mirrors epoll_scheduler for behavioral consistency:
57 - Same single-reactor thread coordination model
58 - Same deferred I/O pattern (reactor marks ready; workers do I/O)
59 - Same timer integration pattern
60
61 Known Limitations:
62 - FD_SETSIZE (~1024) limits maximum concurrent connections
63 - O(n) scanning: rebuilds fd_sets each iteration
64 - Level-triggered only (no edge-triggered mode)
65
66 @par Thread Safety
67 All public member functions are thread-safe.
68 */
69 class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler_base
70 {
71 public:
72 /** Construct the scheduler.
73
74 Creates a self-pipe for reactor interruption.
75
76 @param ctx Reference to the owning execution_context.
77 @param concurrency_hint Hint for expected thread count (unused).
78 */
79 select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
80
81 /// Destroy the scheduler.
82 ~select_scheduler() override;
83
84 select_scheduler(select_scheduler const&) = delete;
85 select_scheduler& operator=(select_scheduler const&) = delete;
86
87 /// Shut down the scheduler, draining pending operations.
88 void shutdown() override;
89
90 /** Return the maximum file descriptor value supported.
91
92 Returns FD_SETSIZE - 1, the maximum fd value that can be
93 monitored by select(). Operations with fd >= FD_SETSIZE
94 will fail with EINVAL.
95
96 @return The maximum supported file descriptor value.
97 */
98 static constexpr int max_fd() noexcept
99 {
100 return FD_SETSIZE - 1;
101 }
102
103 /** Register a descriptor for persistent monitoring.
104
105 The fd is added to the registered_descs_ map and will be
106 included in subsequent select() calls. The reactor is
107 interrupted so a blocked select() rebuilds its fd_sets.
108
109 @param fd The file descriptor to register.
110 @param desc Pointer to descriptor state for this fd.
111 */
112 void register_descriptor(int fd, select_descriptor_state* desc) const;
113
114 /** Deregister a persistently registered descriptor.
115
116 @param fd The file descriptor to deregister.
117 */
118 void deregister_descriptor(int fd) const;
119
120 /** Interrupt the reactor so it rebuilds its fd_sets.
121
122 Called when a write or connect op is registered after
123 the reactor's snapshot was taken. Without this, select()
124 may block not watching for writability on the fd.
125 */
126 void notify_reactor() const;
127
128 private:
129 void
130 run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
131 void interrupt_reactor() const override;
132 long calculate_timeout(long requested_timeout_us) const;
133
134 // Self-pipe for interrupting select()
135 int pipe_fds_[2]; // [0]=read, [1]=write
136
137 // Per-fd tracking for fd_set building
138 mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
139 mutable int max_fd_ = -1;
140 };
141
142 195x inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
143 195x : pipe_fds_{-1, -1}
144 195x , max_fd_(-1)
145 {
146 195x if (::pipe(pipe_fds_) < 0)
147 detail::throw_system_error(make_err(errno), "pipe");
148
149 585x for (int i = 0; i < 2; ++i)
150 {
151 390x int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
152 390x if (flags == -1)
153 {
154 int errn = errno;
155 ::close(pipe_fds_[0]);
156 ::close(pipe_fds_[1]);
157 detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
158 }
159 390x if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
160 {
161 int errn = errno;
162 ::close(pipe_fds_[0]);
163 ::close(pipe_fds_[1]);
164 detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
165 }
166 390x if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
167 {
168 int errn = errno;
169 ::close(pipe_fds_[0]);
170 ::close(pipe_fds_[1]);
171 detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
172 }
173 }
174
175 195x timer_svc_ = &get_timer_service(ctx, *this);
176 195x timer_svc_->set_on_earliest_changed(
177 3968x timer_service::callback(this, [](void* p) {
178 3773x static_cast<select_scheduler*>(p)->interrupt_reactor();
179 3773x }));
180
181 195x get_resolver_service(ctx, *this);
182 195x get_signal_service(ctx, *this);
183 195x get_stream_file_service(ctx, *this);
184 195x get_random_access_file_service(ctx, *this);
185
186 195x completed_ops_.push(&task_op_);
187 195x }
188
189 390x inline select_scheduler::~select_scheduler()
190 {
191 195x if (pipe_fds_[0] >= 0)
192 195x ::close(pipe_fds_[0]);
193 195x if (pipe_fds_[1] >= 0)
194 195x ::close(pipe_fds_[1]);
195 390x }
196
197 inline void
198 195x select_scheduler::shutdown()
199 {
200 195x shutdown_drain();
201
202 195x if (pipe_fds_[1] >= 0)
203 195x interrupt_reactor();
204 195x }
205
206 inline void
207 7200x select_scheduler::register_descriptor(
208 int fd, select_descriptor_state* desc) const
209 {
210 7200x if (fd < 0 || fd >= FD_SETSIZE)
211 detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
212
213 7200x desc->registered_events = reactor_event_read | reactor_event_write;
214 7200x desc->fd = fd;
215 7200x desc->scheduler_ = this;
216 7200x desc->ready_events_.store(0, std::memory_order_relaxed);
217
218 {
219 7200x std::lock_guard lock(desc->mutex);
220 7200x desc->impl_ref_.reset();
221 7200x desc->read_ready = false;
222 7200x desc->write_ready = false;
223 7200x }
224
225 {
226 7200x std::lock_guard lock(mutex_);
227 7200x registered_descs_[fd] = desc;
228 7200x if (fd > max_fd_)
229 7196x max_fd_ = fd;
230 7200x }
231
232 7200x interrupt_reactor();
233 7200x }
234
235 inline void
236 7200x select_scheduler::deregister_descriptor(int fd) const
237 {
238 7200x std::lock_guard lock(mutex_);
239
240 7200x auto it = registered_descs_.find(fd);
241 7200x if (it == registered_descs_.end())
242 return;
243
244 7200x registered_descs_.erase(it);
245
246 7200x if (fd == max_fd_)
247 {
248 7142x max_fd_ = pipe_fds_[0];
249 14199x for (auto& [registered_fd, state] : registered_descs_)
250 {
251 7057x if (registered_fd > max_fd_)
252 7048x max_fd_ = registered_fd;
253 }
254 }
255 7200x }
256
257 inline void
258 25922x select_scheduler::notify_reactor() const
259 {
260 25922x interrupt_reactor();
261 25922x }
262
263 inline void
264 37233x select_scheduler::interrupt_reactor() const
265 {
266 37233x char byte = 1;
267 37233x [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
268 37233x }
269
270 inline long
271 135508x select_scheduler::calculate_timeout(long requested_timeout_us) const
272 {
273 135508x if (requested_timeout_us == 0)
274 return 0;
275
276 135508x auto nearest = timer_svc_->nearest_expiry();
277 135508x if (nearest == timer_service::time_point::max())
278 46x return requested_timeout_us;
279
280 135462x auto now = std::chrono::steady_clock::now();
281 135462x if (nearest <= now)
282 318x return 0;
283
284 auto timer_timeout_us =
285 135144x std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
286 135144x .count();
287
288 135144x constexpr auto long_max =
289 static_cast<long long>((std::numeric_limits<long>::max)());
290 auto capped_timer_us =
291 135144x (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
292 135144x static_cast<long long>(0)),
293 135144x long_max);
294
295 135144x if (requested_timeout_us < 0)
296 135144x return static_cast<long>(capped_timer_us);
297
298 return static_cast<long>(
299 (std::min)(static_cast<long long>(requested_timeout_us),
300 capped_timer_us));
301 }
302
303 inline void
304 160087x select_scheduler::run_task(
305 std::unique_lock<std::mutex>& lock, context_type* ctx)
306 {
307 160087x long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1);
308
309 // Snapshot registered descriptors while holding lock.
310 // Record which fds need write monitoring to avoid a hot loop:
311 // select is level-triggered so writable sockets (nearly always
312 // writable) would cause select() to return immediately every
313 // iteration if unconditionally added to write_fds.
314 struct fd_entry
315 {
316 int fd;
317 select_descriptor_state* desc;
318 bool needs_write;
319 };
320 fd_entry snapshot[FD_SETSIZE];
321 160087x int snapshot_count = 0;
322
323 473829x for (auto& [fd, desc] : registered_descs_)
324 {
325 313742x if (snapshot_count < FD_SETSIZE)
326 {
327 313742x std::lock_guard desc_lock(desc->mutex);
328 313742x snapshot[snapshot_count].fd = fd;
329 313742x snapshot[snapshot_count].desc = desc;
330 313742x snapshot[snapshot_count].needs_write =
331 313742x (desc->write_op || desc->connect_op);
332 313742x ++snapshot_count;
333 313742x }
334 }
335
336 160087x if (lock.owns_lock())
337 135508x lock.unlock();
338
339 160087x task_cleanup on_exit{this, &lock, ctx};
340
341 fd_set read_fds, write_fds, except_fds;
342 2721479x FD_ZERO(&read_fds);
343 2721479x FD_ZERO(&write_fds);
344 2721479x FD_ZERO(&except_fds);
345
346 160087x FD_SET(pipe_fds_[0], &read_fds);
347 160087x int nfds = pipe_fds_[0];
348
349 473829x for (int i = 0; i < snapshot_count; ++i)
350 {
351 313742x int fd = snapshot[i].fd;
352 313742x FD_SET(fd, &read_fds);
353 313742x if (snapshot[i].needs_write)
354 3547x FD_SET(fd, &write_fds);
355 313742x FD_SET(fd, &except_fds);
356 313742x if (fd > nfds)
357 159822x nfds = fd;
358 }
359
360 struct timeval tv;
361 160087x struct timeval* tv_ptr = nullptr;
362 160087x if (effective_timeout_us >= 0)
363 {
364 160041x tv.tv_sec = effective_timeout_us / 1000000;
365 160041x tv.tv_usec = effective_timeout_us % 1000000;
366 160041x tv_ptr = &tv;
367 }
368
369 160087x int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
370
371 // EINTR: signal interrupted select(), just retry.
372 // EBADF: an fd was closed between snapshot and select(); retry
373 // with a fresh snapshot from registered_descs_.
374 160087x if (ready < 0)
375 {
376 if (errno == EINTR || errno == EBADF)
377 return;
378 detail::throw_system_error(make_err(errno), "select");
379 }
380
381 // Process timers outside the lock
382 160087x timer_svc_->process_expired();
383
384 160087x op_queue local_ops;
385
386 160087x if (ready > 0)
387 {
388 144414x if (FD_ISSET(pipe_fds_[0], &read_fds))
389 {
390 char buf[256];
391 38654x while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
392 {
393 }
394 }
395
396 433962x for (int i = 0; i < snapshot_count; ++i)
397 {
398 289548x int fd = snapshot[i].fd;
399 289548x select_descriptor_state* desc = snapshot[i].desc;
400
401 289548x std::uint32_t flags = 0;
402 289548x if (FD_ISSET(fd, &read_fds))
403 137260x flags |= reactor_event_read;
404 289548x if (FD_ISSET(fd, &write_fds))
405 3547x flags |= reactor_event_write;
406 289548x if (FD_ISSET(fd, &except_fds))
407 flags |= reactor_event_error;
408
409 289548x if (flags == 0)
410 148743x continue;
411
412 140805x desc->add_ready_events(flags);
413
414 140805x bool expected = false;
415 140805x if (desc->is_enqueued_.compare_exchange_strong(
416 expected, true, std::memory_order_release,
417 std::memory_order_relaxed))
418 {
419 140805x local_ops.push(desc);
420 }
421 }
422 }
423
424 160087x lock.lock();
425
426 160087x if (!local_ops.empty())
427 137260x completed_ops_.splice(local_ops);
428 160087x }
429
430 } // namespace boost::corosio::detail
431
432 #endif // BOOST_COROSIO_HAS_SELECT
433
434 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
435