LCOV - code coverage report
Current view: top level - corosio/native/detail/epoll - epoll_scheduler.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 84.2 % 139 117 22
Test Date: 2026-03-26 16:40:44 Functions: 100.0 % 10 10

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_EPOLL
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : 
      20                 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
      21                 : 
      22                 : #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
      23                 : #include <boost/corosio/detail/timer_service.hpp>
      24                 : #include <boost/corosio/native/detail/make_err.hpp>
      25                 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
      26                 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
      27                 : #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
      28                 : #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
      29                 : 
      30                 : #include <boost/corosio/detail/except.hpp>
      31                 : 
      32                 : #include <atomic>
      33                 : #include <chrono>
      34                 : #include <cstdint>
      35                 : #include <mutex>
      36                 : 
      37                 : #include <errno.h>
      38                 : #include <sys/epoll.h>
      39                 : #include <sys/eventfd.h>
      40                 : #include <sys/timerfd.h>
      41                 : #include <unistd.h>
      42                 : 
      43                 : namespace boost::corosio::detail {
      44                 : 
      45                 : struct epoll_op;
      46                 : struct descriptor_state;
      47                 : 
      48                 : /** Linux scheduler using epoll for I/O multiplexing.
      49                 : 
      50                 :     This scheduler implements the scheduler interface using Linux epoll
      51                 :     for efficient I/O event notification. It uses a single reactor model
      52                 :     where one thread runs epoll_wait while other threads
      53                 :     wait on a condition variable for handler work. This design provides:
      54                 : 
      55                 :     - Handler parallelism: N posted handlers can execute on N threads
      56                 :     - No thundering herd: condition_variable wakes exactly one thread
      57                 :     - IOCP parity: Behavior matches Windows I/O completion port semantics
      58                 : 
      59                 :     When threads call run(), they first try to execute queued handlers.
      60                 :     If the queue is empty and no reactor is running, one thread becomes
      61                 :     the reactor and runs epoll_wait. Other threads wait on a condition
      62                 :     variable until handlers are available.
      63                 : 
      64                 :     @par Thread Safety
      65                 :     All public member functions are thread-safe.
      66                 : */
      67                 : class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
      68                 : {
      69                 : public:
      70                 :     /** Construct the scheduler.
      71                 : 
      72                 :         Creates an epoll instance, eventfd for reactor interruption,
      73                 :         and timerfd for kernel-managed timer expiry.
      74                 : 
      75                 :         @param ctx Reference to the owning execution_context.
      76                 :         @param concurrency_hint Hint for expected thread count (unused).
      77                 :     */
      78                 :     epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
      79                 : 
      80                 :     /// Destroy the scheduler.
      81                 :     ~epoll_scheduler() override;
      82                 : 
      83                 :     epoll_scheduler(epoll_scheduler const&)            = delete;
      84                 :     epoll_scheduler& operator=(epoll_scheduler const&) = delete;
      85                 : 
      86                 :     /// Shut down the scheduler, draining pending operations.
      87                 :     void shutdown() override;
      88                 : 
      89                 :     /** Return the epoll file descriptor.
      90                 : 
      91                 :         Used by socket services to register file descriptors
      92                 :         for I/O event notification.
      93                 : 
      94                 :         @return The epoll file descriptor.
      95                 :     */
      96                 :     int epoll_fd() const noexcept
      97                 :     {
      98                 :         return epoll_fd_;
      99                 :     }
     100                 : 
     101                 :     /** Register a descriptor for persistent monitoring.
     102                 : 
     103                 :         The fd is registered once and stays registered until explicitly
     104                 :         deregistered. Events are dispatched via descriptor_state which
     105                 :         tracks pending read/write/connect operations.
     106                 : 
     107                 :         @param fd The file descriptor to register.
     108                 :         @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
     109                 :     */
     110                 :     void register_descriptor(int fd, descriptor_state* desc) const;
     111                 : 
     112                 :     /** Deregister a persistently registered descriptor.
     113                 : 
     114                 :         @param fd The file descriptor to deregister.
     115                 :     */
     116                 :     void deregister_descriptor(int fd) const;
     117                 : 
     118                 : private:
     119                 :     void
     120                 :     run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
     121                 :     void interrupt_reactor() const override;
     122                 :     void update_timerfd() const;
     123                 : 
     124                 :     int epoll_fd_;
     125                 :     int event_fd_;
     126                 :     int timer_fd_;
     127                 : 
     128                 :     // Edge-triggered eventfd state
     129                 :     mutable std::atomic<bool> eventfd_armed_{false};
     130                 : 
     131                 :     // Set when the earliest timer changes; flushed before epoll_wait
     132                 :     mutable std::atomic<bool> timerfd_stale_{false};
     133                 : };
     134                 : 
     135 HIT         320 : inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
     136             320 :     : epoll_fd_(-1)
     137             320 :     , event_fd_(-1)
     138             320 :     , timer_fd_(-1)
     139                 : {
     140             320 :     epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
     141             320 :     if (epoll_fd_ < 0)
     142 MIS           0 :         detail::throw_system_error(make_err(errno), "epoll_create1");
     143                 : 
     144 HIT         320 :     event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
     145             320 :     if (event_fd_ < 0)
     146                 :     {
     147 MIS           0 :         int errn = errno;
     148               0 :         ::close(epoll_fd_);
     149               0 :         detail::throw_system_error(make_err(errn), "eventfd");
     150                 :     }
     151                 : 
     152 HIT         320 :     timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
     153             320 :     if (timer_fd_ < 0)
     154                 :     {
     155 MIS           0 :         int errn = errno;
     156               0 :         ::close(event_fd_);
     157               0 :         ::close(epoll_fd_);
     158               0 :         detail::throw_system_error(make_err(errn), "timerfd_create");
     159                 :     }
     160                 : 
     161 HIT         320 :     epoll_event ev{};
     162             320 :     ev.events   = EPOLLIN | EPOLLET;
     163             320 :     ev.data.ptr = nullptr;
     164             320 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
     165                 :     {
     166 MIS           0 :         int errn = errno;
     167               0 :         ::close(timer_fd_);
     168               0 :         ::close(event_fd_);
     169               0 :         ::close(epoll_fd_);
     170               0 :         detail::throw_system_error(make_err(errn), "epoll_ctl");
     171                 :     }
     172                 : 
     173 HIT         320 :     epoll_event timer_ev{};
     174             320 :     timer_ev.events   = EPOLLIN | EPOLLERR;
     175             320 :     timer_ev.data.ptr = &timer_fd_;
     176             320 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
     177                 :     {
     178 MIS           0 :         int errn = errno;
     179               0 :         ::close(timer_fd_);
     180               0 :         ::close(event_fd_);
     181               0 :         ::close(epoll_fd_);
     182               0 :         detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
     183                 :     }
     184                 : 
     185 HIT         320 :     timer_svc_ = &get_timer_service(ctx, *this);
     186             320 :     timer_svc_->set_on_earliest_changed(
     187            4919 :         timer_service::callback(this, [](void* p) {
     188            4599 :             auto* self = static_cast<epoll_scheduler*>(p);
     189            4599 :             self->timerfd_stale_.store(true, std::memory_order_release);
     190            4599 :             self->interrupt_reactor();
     191            4599 :         }));
     192                 : 
     193             320 :     get_resolver_service(ctx, *this);
     194             320 :     get_signal_service(ctx, *this);
     195             320 :     get_stream_file_service(ctx, *this);
     196             320 :     get_random_access_file_service(ctx, *this);
     197                 : 
     198             320 :     completed_ops_.push(&task_op_);
     199             320 : }
     200                 : 
     201             640 : inline epoll_scheduler::~epoll_scheduler()
     202                 : {
     203             320 :     if (timer_fd_ >= 0)
     204             320 :         ::close(timer_fd_);
     205             320 :     if (event_fd_ >= 0)
     206             320 :         ::close(event_fd_);
     207             320 :     if (epoll_fd_ >= 0)
     208             320 :         ::close(epoll_fd_);
     209             640 : }
     210                 : 
     211                 : inline void
     212             320 : epoll_scheduler::shutdown()
     213                 : {
     214             320 :     shutdown_drain();
     215                 : 
     216             320 :     if (event_fd_ >= 0)
     217             320 :         interrupt_reactor();
     218             320 : }
     219                 : 
     220                 : inline void
     221            8873 : epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
     222                 : {
     223            8873 :     epoll_event ev{};
     224            8873 :     ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
     225            8873 :     ev.data.ptr = desc;
     226                 : 
     227            8873 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
     228 MIS           0 :         detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
     229                 : 
     230 HIT        8873 :     desc->registered_events = ev.events;
     231            8873 :     desc->fd                = fd;
     232            8873 :     desc->scheduler_        = this;
     233            8873 :     desc->ready_events_.store(0, std::memory_order_relaxed);
     234                 : 
     235            8873 :     std::lock_guard lock(desc->mutex);
     236            8873 :     desc->impl_ref_.reset();
     237            8873 :     desc->read_ready  = false;
     238            8873 :     desc->write_ready = false;
     239            8873 : }
     240                 : 
     241                 : inline void
     242            8873 : epoll_scheduler::deregister_descriptor(int fd) const
     243                 : {
     244            8873 :     ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
     245            8873 : }
     246                 : 
     247                 : inline void
     248            5208 : epoll_scheduler::interrupt_reactor() const
     249                 : {
     250            5208 :     bool expected = false;
     251            5208 :     if (eventfd_armed_.compare_exchange_strong(
     252                 :             expected, true, std::memory_order_release,
     253                 :             std::memory_order_relaxed))
     254                 :     {
     255            4998 :         std::uint64_t val       = 1;
     256            4998 :         [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
     257                 :     }
     258            5208 : }
     259                 : 
     260                 : inline void
     261            9158 : epoll_scheduler::update_timerfd() const
     262                 : {
     263            9158 :     auto nearest = timer_svc_->nearest_expiry();
     264                 : 
     265            9158 :     itimerspec ts{};
     266            9158 :     int flags = 0;
     267                 : 
     268            9158 :     if (nearest == timer_service::time_point::max())
     269                 :     {
     270                 :         // No timers — disarm by setting to 0 (relative)
     271                 :     }
     272                 :     else
     273                 :     {
     274            9105 :         auto now = std::chrono::steady_clock::now();
     275            9105 :         if (nearest <= now)
     276                 :         {
     277                 :             // Use 1ns instead of 0 — zero disarms the timerfd
     278             282 :             ts.it_value.tv_nsec = 1;
     279                 :         }
     280                 :         else
     281                 :         {
     282            8823 :             auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
     283            8823 :                             nearest - now)
     284            8823 :                             .count();
     285            8823 :             ts.it_value.tv_sec  = nsec / 1000000000;
     286            8823 :             ts.it_value.tv_nsec = nsec % 1000000000;
     287            8823 :             if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
     288 MIS           0 :                 ts.it_value.tv_nsec = 1;
     289                 :         }
     290                 :     }
     291                 : 
     292 HIT        9158 :     if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
     293 MIS           0 :         detail::throw_system_error(make_err(errno), "timerfd_settime");
     294 HIT        9158 : }
     295                 : 
     296                 : inline void
     297           36316 : epoll_scheduler::run_task(std::unique_lock<std::mutex>& lock, context_type* ctx)
     298                 : {
     299           36316 :     int timeout_ms = task_interrupted_ ? 0 : -1;
     300                 : 
     301           36316 :     if (lock.owns_lock())
     302           13062 :         lock.unlock();
     303                 : 
     304           36316 :     task_cleanup on_exit{this, &lock, ctx};
     305                 : 
     306                 :     // Flush deferred timerfd programming before blocking
     307           36316 :     if (timerfd_stale_.exchange(false, std::memory_order_acquire))
     308            4577 :         update_timerfd();
     309                 : 
     310                 :     epoll_event events[128];
     311           36316 :     int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
     312                 : 
     313           36316 :     if (nfds < 0 && errno != EINTR)
     314 MIS           0 :         detail::throw_system_error(make_err(errno), "epoll_wait");
     315                 : 
     316 HIT       36316 :     bool check_timers = false;
     317           36316 :     op_queue local_ops;
     318                 : 
     319           81177 :     for (int i = 0; i < nfds; ++i)
     320                 :     {
     321           44861 :         if (events[i].data.ptr == nullptr)
     322                 :         {
     323                 :             std::uint64_t val;
     324                 :             // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
     325            4678 :             [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
     326            4678 :             eventfd_armed_.store(false, std::memory_order_relaxed);
     327            4678 :             continue;
     328            4678 :         }
     329                 : 
     330           40183 :         if (events[i].data.ptr == &timer_fd_)
     331                 :         {
     332                 :             std::uint64_t expirations;
     333                 :             // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
     334                 :             [[maybe_unused]] auto r =
     335            4581 :                 ::read(timer_fd_, &expirations, sizeof(expirations));
     336            4581 :             check_timers = true;
     337            4581 :             continue;
     338            4581 :         }
     339                 : 
     340           35602 :         auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
     341           35602 :         desc->add_ready_events(events[i].events);
     342                 : 
     343           35602 :         bool expected = false;
     344           35602 :         if (desc->is_enqueued_.compare_exchange_strong(
     345                 :                 expected, true, std::memory_order_release,
     346                 :                 std::memory_order_relaxed))
     347                 :         {
     348           35602 :             local_ops.push(desc);
     349                 :         }
     350                 :     }
     351                 : 
     352           36316 :     if (check_timers)
     353                 :     {
     354            4581 :         timer_svc_->process_expired();
     355            4581 :         update_timerfd();
     356                 :     }
     357                 : 
     358           36316 :     lock.lock();
     359                 : 
     360           36316 :     if (!local_ops.empty())
     361           22717 :         completed_ops_.splice(local_ops);
     362           36316 : }
     363                 : 
     364                 : } // namespace boost::corosio::detail
     365                 : 
     366                 : #endif // BOOST_COROSIO_HAS_EPOLL
     367                 : 
     368                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
        

Generated by: LCOV version 2.3