include/boost/corosio/detail/timer_service.hpp

91.7% Lines (341/372) 97.7% List of functions (43/44)
timer_service.hpp
f(x) Functions (44)
Function Calls Lines Blocks
boost::corosio::detail::timer_service::callback::callback() :97 515x 100.0% 100.0% boost::corosio::detail::timer_service::callback::callback(void*, void (*)(void*)) :100 515x 100.0% 100.0% boost::corosio::detail::timer_service::callback::operator()() const :109 8372x 100.0% 100.0% boost::corosio::detail::timer_service::timer_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :138 515x 100.0% 100.0% boost::corosio::detail::timer_service::get_scheduler() :144 16830x 100.0% 100.0% boost::corosio::detail::timer_service::~timer_service() :150 1030x 100.0% 100.0% boost::corosio::detail::timer_service::set_on_earliest_changed(boost::corosio::detail::timer_service::callback) :156 515x 100.0% 100.0% boost::corosio::detail::timer_service::nearest_expiry() const :169 144666x 100.0% 73.0% boost::corosio::detail::timer_service::refresh_cached_nearest() :212 173127x 100.0% 70.0% boost::corosio::detail::waiter_node::completion_op::completion_op() :236 210x 100.0% 100.0% boost::corosio::detail::waiter_node::waiter_node() :262 210x 100.0% 100.0% boost::corosio::detail::try_pop_tl_cache(boost::corosio::detail::timer_service*) :299 8625x 87.5% 78.0% boost::corosio::detail::try_push_tl_cache(boost::corosio::detail::timer_service::implementation*) :314 8617x 100.0% 100.0% boost::corosio::detail::try_pop_waiter_tl_cache() :325 8419x 100.0% 100.0% boost::corosio::detail::try_push_waiter_tl_cache(boost::corosio::detail::waiter_node*) :337 8403x 100.0% 100.0% boost::corosio::detail::timer_service_invalidate_cache() :348 515x 100.0% 100.0% boost::corosio::detail::timer_service::implementation::implementation(boost::corosio::detail::timer_service&) :359 249x 100.0% 100.0% boost::corosio::detail::timer_service::shutdown() :366 515x 100.0% 82.0% boost::corosio::detail::timer_service::construct() :423 8625x 66.7% 76.0% boost::corosio::detail::timer_service::destroy(boost::corosio::io_object::implementation*) :452 8623x 100.0% 100.0% boost::corosio::detail::timer_service::destroy_impl(boost::corosio::detail::timer_service::implementation&) :458 8623x 73.3% 64.0% boost::corosio::detail::timer_service::create_waiter() :485 8419x 100.0% 87.0% boost::corosio::detail::timer_service::destroy_waiter(boost::corosio::detail::waiter_node*) :503 8403x 100.0% 100.0% boost::corosio::detail::timer_service::update_timer(boost::corosio::detail::timer_service::implementation&, std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long, std::ratio<1l, 1000000000l> > >) :514 6x 93.1% 79.0% boost::corosio::detail::timer_service::insert_waiter(boost::corosio::detail::timer_service::implementation&, boost::corosio::detail::waiter_node*) :564 8419x 100.0% 82.0% boost::corosio::detail::timer_service::cancel_timer(boost::corosio::detail::timer_service::implementation&) :584 8625x 87.5% 78.0% boost::corosio::detail::timer_service::cancel_waiter(boost::corosio::detail::waiter_node*) :624 30x 92.9% 80.0% boost::corosio::detail::timer_service::cancel_one_waiter(boost::corosio::detail::timer_service::implementation&) :647 2x 76.5% 70.0% boost::corosio::detail::timer_service::process_expired() :674 164668x 100.0% 91.0% boost::corosio::detail::timer_service::remove_timer_impl(boost::corosio::detail::timer_service::implementation&) :709 8389x 84.6% 65.0% boost::corosio::detail::timer_service::up_heap(unsigned long) :736 8403x 100.0% 100.0% boost::corosio::detail::timer_service::down_heap(unsigned long) :749 8237x 69.2% 62.0% boost::corosio::detail::timer_service::swap_heap(unsigned long, unsigned long) :769 16453x 100.0% 100.0% boost::corosio::detail::waiter_node::canceller::operator()() const :781 30x 100.0% 100.0% boost::corosio::detail::waiter_node::completion_op::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :787 0 0.0% 0.0% boost::corosio::detail::waiter_node::completion_op::operator()() :801 8403x 100.0% 100.0% boost::corosio::detail::waiter_node::completion_op::destroy() :826 8x 100.0% 100.0% boost::corosio::detail::timer_service::implementation::wait(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::capy::continuation*) :853 8420x 100.0% 97.0% boost::corosio::detail::timer_service_access::get_scheduler(boost::corosio::io_context&) :897 8625x 100.0% 100.0% boost::corosio::detail::timer_service_direct(boost::capy::execution_context&) :905 8625x 100.0% 100.0% boost::corosio::detail::timer_service_update_expiry(boost::corosio::io_timer::implementation&) :912 6x 100.0% 100.0% boost::corosio::detail::timer_service_cancel(boost::corosio::io_timer::implementation&) :919 8x 100.0% 100.0% boost::corosio::detail::timer_service_cancel_one(boost::corosio::io_timer::implementation&) :926 2x 100.0% 100.0% boost::corosio::detail::get_timer_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :933 515x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13
14 #include <boost/corosio/timer.hpp>
15 #include <boost/corosio/io_context.hpp>
16 #include <boost/corosio/detail/scheduler_op.hpp>
17 #include <boost/corosio/native/native_scheduler.hpp>
18 #include <boost/corosio/detail/intrusive.hpp>
19 #include <boost/corosio/detail/thread_local_ptr.hpp>
20 #include <boost/capy/error.hpp>
21 #include <boost/capy/ex/execution_context.hpp>
22 #include <boost/capy/ex/executor_ref.hpp>
23 #include <system_error>
24
25 #include <atomic>
26 #include <chrono>
27 #include <coroutine>
28 #include <cstddef>
29 #include <limits>
30 #include <mutex>
31 #include <optional>
32 #include <stop_token>
33 #include <utility>
34 #include <vector>
35
36 namespace boost::corosio::detail {
37
38 struct scheduler;
39
40 /*
41 Timer Service
42 =============
43
44 Data Structures
45 ---------------
46 waiter_node holds per-waiter state: coroutine handle, executor,
47 error output, stop_token, embedded completion_op. Each concurrent
48 co_await t.wait() allocates one waiter_node.
49
50 timer_service::implementation holds per-timer state: expiry,
51 heap index, and an intrusive_list of waiter_nodes. Multiple
52 coroutines can wait on the same timer simultaneously.
53
54 timer_service owns a min-heap of active timers, a free list
55 of recycled impls, and a free list of recycled waiter_nodes. The
56 heap is ordered by expiry time; the scheduler queries
57 nearest_expiry() to set the epoll/timerfd timeout.
58
59 Optimization Strategy
60 ---------------------
61 1. Deferred heap insertion — expires_after() stores the expiry
62 but does not insert into the heap. Insertion happens in wait().
63 2. Thread-local impl cache — single-slot per-thread cache.
64 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
65 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
66 5. might_have_pending_waits_ flag — skips lock when no wait issued.
67 6. Thread-local waiter cache — single-slot per-thread cache.
68
69 Concurrency
70 -----------
71 stop_token callbacks can fire from any thread. The impl_
72 pointer on waiter_node is used as a "still in list" marker.
73 */
74
75 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
76
77 inline void timer_service_invalidate_cache() noexcept;
78
79 // timer_service class body — member function definitions are
80 // out-of-class (after implementation and waiter_node are complete)
81 class BOOST_COROSIO_DECL timer_service final
82 : public capy::execution_context::service
83 , public io_object::io_service
84 {
85 public:
86 using clock_type = std::chrono::steady_clock;
87 using time_point = clock_type::time_point;
88
89 /// Type-erased callback for earliest-expiry-changed notifications.
90 class callback
91 {
92 void* ctx_ = nullptr;
93 void (*fn_)(void*) = nullptr;
94
95 public:
96 /// Construct an empty callback.
97 515x callback() = default;
98
99 /// Construct a callback with the given context and function.
100 515x callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
101
102 /// Return true if the callback is non-empty.
103 explicit operator bool() const noexcept
104 {
105 return fn_ != nullptr;
106 }
107
108 /// Invoke the callback.
109 8372x void operator()() const
110 {
111 8372x if (fn_)
112 8372x fn_(ctx_);
113 8372x }
114 };
115
116 struct implementation;
117
118 private:
119 struct heap_entry
120 {
121 time_point time_;
122 implementation* timer_;
123 };
124
125 scheduler* sched_ = nullptr;
126 mutable std::mutex mutex_;
127 std::vector<heap_entry> heap_;
128 implementation* free_list_ = nullptr;
129 waiter_node* waiter_free_list_ = nullptr;
130 callback on_earliest_changed_;
131 bool shutting_down_ = false;
132 // Avoids mutex in nearest_expiry() and empty()
133 mutable std::atomic<std::int64_t> cached_nearest_ns_{
134 (std::numeric_limits<std::int64_t>::max)()};
135
136 public:
137 /// Construct the timer service bound to a scheduler.
138 515x inline timer_service(capy::execution_context&, scheduler& sched)
139 515x : sched_(&sched)
140 {
141 515x }
142
143 /// Return the associated scheduler.
144 16830x inline scheduler& get_scheduler() noexcept
145 {
146 16830x return *sched_;
147 }
148
149 /// Destroy the timer service.
150 1030x ~timer_service() override = default;
151
152 timer_service(timer_service const&) = delete;
153 timer_service& operator=(timer_service const&) = delete;
154
155 /// Register a callback invoked when the earliest expiry changes.
156 515x inline void set_on_earliest_changed(callback cb)
157 {
158 515x on_earliest_changed_ = cb;
159 515x }
160
161 /// Return true if no timers are in the heap.
162 inline bool empty() const noexcept
163 {
164 return cached_nearest_ns_.load(std::memory_order_acquire) ==
165 (std::numeric_limits<std::int64_t>::max)();
166 }
167
168 /// Return the nearest timer expiry without acquiring the mutex.
169 144666x inline time_point nearest_expiry() const noexcept
170 {
171 144666x auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
172 144666x return time_point(time_point::duration(ns));
173 }
174
175 /// Cancel all pending timers and free cached resources.
176 inline void shutdown() override;
177
178 /// Construct a new timer implementation.
179 inline io_object::implementation* construct() override;
180
181 /// Destroy a timer implementation, cancelling pending waiters.
182 inline void destroy(io_object::implementation* p) override;
183
184 /// Cancel and recycle a timer implementation.
185 inline void destroy_impl(implementation& impl);
186
187 /// Create or recycle a waiter node.
188 inline waiter_node* create_waiter();
189
190 /// Return a waiter node to the cache or free list.
191 inline void destroy_waiter(waiter_node* w);
192
193 /// Update the timer expiry, cancelling existing waiters.
194 inline std::size_t update_timer(implementation& impl, time_point new_time);
195
196 /// Insert a waiter into the timer's waiter list and the heap.
197 inline void insert_waiter(implementation& impl, waiter_node* w);
198
199 /// Cancel all waiters on a timer.
200 inline std::size_t cancel_timer(implementation& impl);
201
202 /// Cancel a single waiter ( stop_token callback path ).
203 inline void cancel_waiter(waiter_node* w);
204
205 /// Cancel one waiter on a timer.
206 inline std::size_t cancel_one_waiter(implementation& impl);
207
208 /// Complete all waiters whose timers have expired.
209 inline std::size_t process_expired();
210
211 private:
212 173127x inline void refresh_cached_nearest() noexcept
213 {
214 173127x auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
215 172640x : heap_[0].time_.time_since_epoch().count();
216 173127x cached_nearest_ns_.store(ns, std::memory_order_release);
217 173127x }
218
219 inline void remove_timer_impl(implementation& impl);
220 inline void up_heap(std::size_t index);
221 inline void down_heap(std::size_t index);
222 inline void swap_heap(std::size_t i1, std::size_t i2);
223 };
224
225 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
226 : intrusive_list<waiter_node>::node
227 {
228 // Embedded completion op — avoids heap allocation per fire/cancel
229 struct completion_op final : scheduler_op
230 {
231 waiter_node* waiter_ = nullptr;
232
233 static void do_complete(
234 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
235
236 210x completion_op() noexcept : scheduler_op(&do_complete) {}
237
238 void operator()() override;
239 void destroy() override;
240 };
241
242 // Per-waiter stop_token cancellation
243 struct canceller
244 {
245 waiter_node* waiter_;
246 void operator()() const;
247 };
248
249 // nullptr once removed from timer's waiter list (concurrency marker)
250 timer_service::implementation* impl_ = nullptr;
251 timer_service* svc_ = nullptr;
252 std::coroutine_handle<> h_;
253 capy::continuation* cont_ = nullptr;
254 capy::executor_ref d_;
255 std::error_code* ec_out_ = nullptr;
256 std::stop_token token_;
257 std::optional<std::stop_callback<canceller>> stop_cb_;
258 completion_op op_;
259 std::error_code ec_value_;
260 waiter_node* next_free_ = nullptr;
261
262 210x waiter_node() noexcept
263 210x {
264 210x op_.waiter_ = this;
265 210x }
266 };
267
268 struct timer_service::implementation final : timer::implementation
269 {
270 using clock_type = std::chrono::steady_clock;
271 using time_point = clock_type::time_point;
272 using duration = clock_type::duration;
273
274 timer_service* svc_ = nullptr;
275 intrusive_list<waiter_node> waiters_;
276
277 // Free list linkage (reused when impl is on free_list)
278 implementation* next_free_ = nullptr;
279
280 inline explicit implementation(timer_service& svc) noexcept;
281
282 inline std::coroutine_handle<> wait(
283 std::coroutine_handle<>,
284 capy::executor_ref,
285 std::stop_token,
286 std::error_code*,
287 capy::continuation*) override;
288 };
289
290 // Thread-local caches avoid hot-path mutex acquisitions:
291 // 1. Impl cache — single-slot, validated by comparing svc_
292 // 2. Waiter cache — single-slot, no service affinity
293 // All caches are cleared by timer_service_invalidate_cache() during shutdown.
294
295 inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
296 inline thread_local_ptr<waiter_node> tl_cached_waiter;
297
298 inline timer_service::implementation*
299 8625x try_pop_tl_cache(timer_service* svc) noexcept
300 {
301 8625x auto* impl = tl_cached_impl.get();
302 8625x if (impl)
303 {
304 8376x tl_cached_impl.set(nullptr);
305 8376x if (impl->svc_ == svc)
306 8376x return impl;
307 // Stale impl from a destroyed service
308 delete impl;
309 }
310 249x return nullptr;
311 }
312
313 inline bool
314 8617x try_push_tl_cache(timer_service::implementation* impl) noexcept
315 {
316 8617x if (!tl_cached_impl.get())
317 {
318 8537x tl_cached_impl.set(impl);
319 8537x return true;
320 }
321 80x return false;
322 }
323
324 inline waiter_node*
325 8419x try_pop_waiter_tl_cache() noexcept
326 {
327 8419x auto* w = tl_cached_waiter.get();
328 8419x if (w)
329 {
330 8207x tl_cached_waiter.set(nullptr);
331 8207x return w;
332 }
333 212x return nullptr;
334 }
335
336 inline bool
337 8403x try_push_waiter_tl_cache(waiter_node* w) noexcept
338 {
339 8403x if (!tl_cached_waiter.get())
340 {
341 8323x tl_cached_waiter.set(w);
342 8323x return true;
343 }
344 80x return false;
345 }
346
347 inline void
348 515x timer_service_invalidate_cache() noexcept
349 {
350 515x delete tl_cached_impl.get();
351 515x tl_cached_impl.set(nullptr);
352
353 515x delete tl_cached_waiter.get();
354 515x tl_cached_waiter.set(nullptr);
355 515x }
356
357 // timer_service out-of-class member function definitions
358
359 249x inline timer_service::implementation::implementation(
360 249x timer_service& svc) noexcept
361 249x : svc_(&svc)
362 {
363 249x }
364
365 inline void
366 515x timer_service::shutdown()
367 {
368 515x timer_service_invalidate_cache();
369 515x shutting_down_ = true;
370
371 // Snapshot impls and detach them from the heap so that
372 // coroutine-owned timer destructors (triggered by h.destroy()
373 // below) cannot re-enter remove_timer_impl() and mutate the
374 // vector during iteration.
375 515x std::vector<implementation*> impls;
376 515x impls.reserve(heap_.size());
377 523x for (auto& entry : heap_)
378 {
379 8x entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
380 8x impls.push_back(entry.timer_);
381 }
382 515x heap_.clear();
383 515x cached_nearest_ns_.store(
384 (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
385
386 // Cancel waiting timers. Each waiter called work_started()
387 // in implementation::wait(). On IOCP the scheduler shutdown
388 // loop exits when outstanding_work_ reaches zero, so we must
389 // call work_finished() here to balance it. On other backends
390 // this is harmless.
391 523x for (auto* impl : impls)
392 {
393 16x while (auto* w = impl->waiters_.pop_front())
394 {
395 8x w->stop_cb_.reset();
396 8x auto h = std::exchange(w->h_, {});
397 8x sched_->work_finished();
398 8x if (h)
399 8x h.destroy();
400 8x delete w;
401 8x }
402 8x delete impl;
403 }
404
405 // Delete free-listed impls
406 595x while (free_list_)
407 {
408 80x auto* next = free_list_->next_free_;
409 80x delete free_list_;
410 80x free_list_ = next;
411 }
412
413 // Delete free-listed waiters
414 593x while (waiter_free_list_)
415 {
416 78x auto* next = waiter_free_list_->next_free_;
417 78x delete waiter_free_list_;
418 78x waiter_free_list_ = next;
419 }
420 515x }
421
422 inline io_object::implementation*
423 8625x timer_service::construct()
424 {
425 8625x implementation* impl = try_pop_tl_cache(this);
426 8625x if (impl)
427 {
428 8376x impl->svc_ = this;
429 8376x impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
430 8376x impl->might_have_pending_waits_ = false;
431 8376x return impl;
432 }
433
434 249x std::lock_guard lock(mutex_);
435 249x if (free_list_)
436 {
437 impl = free_list_;
438 free_list_ = impl->next_free_;
439 impl->next_free_ = nullptr;
440 impl->svc_ = this;
441 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
442 impl->might_have_pending_waits_ = false;
443 }
444 else
445 {
446 249x impl = new implementation(*this);
447 }
448 249x return impl;
449 249x }
450
451 inline void
452 8623x timer_service::destroy(io_object::implementation* p)
453 {
454 8623x destroy_impl(static_cast<implementation&>(*p));
455 8623x }
456
457 inline void
458 8623x timer_service::destroy_impl(implementation& impl)
459 {
460 // During shutdown the impl is owned by the shutdown loop.
461 // Re-entering here (from a coroutine-owned timer destructor
462 // triggered by h.destroy()) must not modify the heap or
463 // recycle the impl — shutdown deletes it directly.
464 8623x if (shutting_down_)
465 8543x return;
466
467 8617x cancel_timer(impl);
468
469 8617x if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
470 {
471 std::lock_guard lock(mutex_);
472 remove_timer_impl(impl);
473 refresh_cached_nearest();
474 }
475
476 8617x if (try_push_tl_cache(&impl))
477 8537x return;
478
479 80x std::lock_guard lock(mutex_);
480 80x impl.next_free_ = free_list_;
481 80x free_list_ = &impl;
482 80x }
483
484 inline waiter_node*
485 8419x timer_service::create_waiter()
486 {
487 8419x if (auto* w = try_pop_waiter_tl_cache())
488 8207x return w;
489
490 212x std::lock_guard lock(mutex_);
491 212x if (waiter_free_list_)
492 {
493 2x auto* w = waiter_free_list_;
494 2x waiter_free_list_ = w->next_free_;
495 2x w->next_free_ = nullptr;
496 2x return w;
497 }
498
499 210x return new waiter_node();
500 212x }
501
502 inline void
503 8403x timer_service::destroy_waiter(waiter_node* w)
504 {
505 8403x if (try_push_waiter_tl_cache(w))
506 8323x return;
507
508 80x std::lock_guard lock(mutex_);
509 80x w->next_free_ = waiter_free_list_;
510 80x waiter_free_list_ = w;
511 80x }
512
513 inline std::size_t
514 6x timer_service::update_timer(implementation& impl, time_point new_time)
515 {
516 bool in_heap =
517 6x (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
518 6x if (!in_heap && impl.waiters_.empty())
519 return 0;
520
521 6x bool notify = false;
522 6x intrusive_list<waiter_node> canceled;
523
524 {
525 6x std::lock_guard lock(mutex_);
526
527 16x while (auto* w = impl.waiters_.pop_front())
528 {
529 10x w->impl_ = nullptr;
530 10x canceled.push_back(w);
531 10x }
532
533 6x if (impl.heap_index_ < heap_.size())
534 {
535 6x time_point old_time = heap_[impl.heap_index_].time_;
536 6x heap_[impl.heap_index_].time_ = new_time;
537
538 6x if (new_time < old_time)
539 6x up_heap(impl.heap_index_);
540 else
541 down_heap(impl.heap_index_);
542
543 6x notify = (impl.heap_index_ == 0);
544 }
545
546 6x refresh_cached_nearest();
547 6x }
548
549 6x std::size_t count = 0;
550 16x while (auto* w = canceled.pop_front())
551 {
552 10x w->ec_value_ = make_error_code(capy::error::canceled);
553 10x sched_->post(&w->op_);
554 10x ++count;
555 10x }
556
557 6x if (notify)
558 6x on_earliest_changed_();
559
560 6x return count;
561 }
562
563 inline void
564 8419x timer_service::insert_waiter(implementation& impl, waiter_node* w)
565 {
566 8419x bool notify = false;
567 {
568 8419x std::lock_guard lock(mutex_);
569 8419x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
570 {
571 8397x impl.heap_index_ = heap_.size();
572 8397x heap_.push_back({impl.expiry_, &impl});
573 8397x up_heap(heap_.size() - 1);
574 8397x notify = (impl.heap_index_ == 0);
575 8397x refresh_cached_nearest();
576 }
577 8419x impl.waiters_.push_back(w);
578 8419x }
579 8419x if (notify)
580 8366x on_earliest_changed_();
581 8419x }
582
583 inline std::size_t
584 8625x timer_service::cancel_timer(implementation& impl)
585 {
586 8625x if (!impl.might_have_pending_waits_)
587 8601x return 0;
588
589 // Not in heap and no waiters — just clear the flag
590 24x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
591 impl.waiters_.empty())
592 {
593 impl.might_have_pending_waits_ = false;
594 return 0;
595 }
596
597 24x intrusive_list<waiter_node> canceled;
598
599 {
600 24x std::lock_guard lock(mutex_);
601 24x remove_timer_impl(impl);
602 52x while (auto* w = impl.waiters_.pop_front())
603 {
604 28x w->impl_ = nullptr;
605 28x canceled.push_back(w);
606 28x }
607 24x refresh_cached_nearest();
608 24x }
609
610 24x impl.might_have_pending_waits_ = false;
611
612 24x std::size_t count = 0;
613 52x while (auto* w = canceled.pop_front())
614 {
615 28x w->ec_value_ = make_error_code(capy::error::canceled);
616 28x sched_->post(&w->op_);
617 28x ++count;
618 28x }
619
620 24x return count;
621 }
622
623 inline void
624 30x timer_service::cancel_waiter(waiter_node* w)
625 {
626 {
627 30x std::lock_guard lock(mutex_);
628 // Already removed by cancel_timer or process_expired
629 30x if (!w->impl_)
630 return;
631 30x auto* impl = w->impl_;
632 30x w->impl_ = nullptr;
633 30x impl->waiters_.remove(w);
634 30x if (impl->waiters_.empty())
635 {
636 28x remove_timer_impl(*impl);
637 28x impl->might_have_pending_waits_ = false;
638 }
639 30x refresh_cached_nearest();
640 30x }
641
642 30x w->ec_value_ = make_error_code(capy::error::canceled);
643 30x sched_->post(&w->op_);
644 }
645
646 inline std::size_t
647 2x timer_service::cancel_one_waiter(implementation& impl)
648 {
649 2x if (!impl.might_have_pending_waits_)
650 return 0;
651
652 2x waiter_node* w = nullptr;
653
654 {
655 2x std::lock_guard lock(mutex_);
656 2x w = impl.waiters_.pop_front();
657 2x if (!w)
658 return 0;
659 2x w->impl_ = nullptr;
660 2x if (impl.waiters_.empty())
661 {
662 remove_timer_impl(impl);
663 impl.might_have_pending_waits_ = false;
664 }
665 2x refresh_cached_nearest();
666 2x }
667
668 2x w->ec_value_ = make_error_code(capy::error::canceled);
669 2x sched_->post(&w->op_);
670 2x return 1;
671 }
672
673 inline std::size_t
674 164668x timer_service::process_expired()
675 {
676 164668x intrusive_list<waiter_node> expired;
677
678 {
679 164668x std::lock_guard lock(mutex_);
680 164668x auto now = clock_type::now();
681
682 173005x while (!heap_.empty() && heap_[0].time_ <= now)
683 {
684 8337x implementation* t = heap_[0].timer_;
685 8337x remove_timer_impl(*t);
686 16678x while (auto* w = t->waiters_.pop_front())
687 {
688 8341x w->impl_ = nullptr;
689 8341x w->ec_value_ = {};
690 8341x expired.push_back(w);
691 8341x }
692 8337x t->might_have_pending_waits_ = false;
693 }
694
695 164668x refresh_cached_nearest();
696 164668x }
697
698 164668x std::size_t count = 0;
699 173009x while (auto* w = expired.pop_front())
700 {
701 8341x sched_->post(&w->op_);
702 8341x ++count;
703 8341x }
704
705 164668x return count;
706 }
707
708 inline void
709 8389x timer_service::remove_timer_impl(implementation& impl)
710 {
711 8389x std::size_t index = impl.heap_index_;
712 8389x if (index >= heap_.size())
713 return; // Not in heap
714
715 8389x if (index == heap_.size() - 1)
716 {
717 // Last element, just pop
718 152x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
719 152x heap_.pop_back();
720 }
721 else
722 {
723 // Swap with last and reheapify
724 8237x swap_heap(index, heap_.size() - 1);
725 8237x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
726 8237x heap_.pop_back();
727
728 8237x if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
729 up_heap(index);
730 else
731 8237x down_heap(index);
732 }
733 }
734
735 inline void
736 8403x timer_service::up_heap(std::size_t index)
737 {
738 16619x while (index > 0)
739 {
740 8247x std::size_t parent = (index - 1) / 2;
741 8247x if (!(heap_[index].time_ < heap_[parent].time_))
742 31x break;
743 8216x swap_heap(index, parent);
744 8216x index = parent;
745 }
746 8403x }
747
748 inline void
749 8237x timer_service::down_heap(std::size_t index)
750 {
751 8237x std::size_t child = index * 2 + 1;
752 8237x while (child < heap_.size())
753 {
754 6x std::size_t min_child = (child + 1 == heap_.size() ||
755 heap_[child].time_ < heap_[child + 1].time_)
756 6x ? child
757 6x : child + 1;
758
759 6x if (heap_[index].time_ < heap_[min_child].time_)
760 6x break;
761
762 swap_heap(index, min_child);
763 index = min_child;
764 child = index * 2 + 1;
765 }
766 8237x }
767
768 inline void
769 16453x timer_service::swap_heap(std::size_t i1, std::size_t i2)
770 {
771 16453x heap_entry tmp = heap_[i1];
772 16453x heap_[i1] = heap_[i2];
773 16453x heap_[i2] = tmp;
774 16453x heap_[i1].timer_->heap_index_ = i1;
775 16453x heap_[i2].timer_->heap_index_ = i2;
776 16453x }
777
778 // waiter_node out-of-class member function definitions
779
780 inline void
781 30x waiter_node::canceller::operator()() const
782 {
783 30x waiter_->svc_->cancel_waiter(waiter_);
784 30x }
785
786 inline void
787 waiter_node::completion_op::do_complete(
788 [[maybe_unused]] void* owner,
789 scheduler_op* base,
790 std::uint32_t,
791 std::uint32_t)
792 {
793 // owner is always non-null here. The destroy path (owner == nullptr)
794 // is unreachable because completion_op overrides destroy() directly,
795 // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
796 BOOST_COROSIO_ASSERT(owner);
797 static_cast<completion_op*>(base)->operator()();
798 }
799
800 inline void
801 8403x waiter_node::completion_op::operator()()
802 {
803 8403x auto* w = waiter_;
804 8403x w->stop_cb_.reset();
805 8403x if (w->ec_out_)
806 8403x *w->ec_out_ = w->ec_value_;
807
808 8403x auto* cont = w->cont_;
809 8403x auto d = w->d_;
810 8403x auto* svc = w->svc_;
811 8403x auto& sched = svc->get_scheduler();
812
813 8403x svc->destroy_waiter(w);
814
815 8403x d.post(*cont);
816 8403x sched.work_finished();
817 8403x }
818
819 // GCC 14 false-positive: inlining ~optional<stop_callback> through
820 // delete loses track that stop_cb_ was already .reset() above.
821 #if defined(__GNUC__) && !defined(__clang__)
822 #pragma GCC diagnostic push
823 #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
824 #endif
825 inline void
826 8x waiter_node::completion_op::destroy()
827 {
828 // Called during scheduler shutdown drain when this completion_op is
829 // in the scheduler's ready queue (posted by cancel_timer() or
830 // process_expired()). Balances the work_started() from
831 // implementation::wait(). The scheduler drain loop separately
832 // balances the work_started() from post(). On IOCP both decrements
833 // are required for outstanding_work_ to reach zero; on other
834 // backends this is harmless.
835 //
836 // This override also prevents scheduler_op::destroy() from calling
837 // do_complete(nullptr, ...). See also: timer_service::shutdown()
838 // which drains waiters still in the timer heap (the other path).
839 8x auto* w = waiter_;
840 8x w->stop_cb_.reset();
841 8x auto h = std::exchange(w->h_, {});
842 8x auto& sched = w->svc_->get_scheduler();
843 8x delete w;
844 8x sched.work_finished();
845 8x if (h)
846 8x h.destroy();
847 8x }
848 #if defined(__GNUC__) && !defined(__clang__)
849 #pragma GCC diagnostic pop
850 #endif
851
852 inline std::coroutine_handle<>
853 8420x timer_service::implementation::wait(
854 std::coroutine_handle<> h,
855 capy::executor_ref d,
856 std::stop_token token,
857 std::error_code* ec,
858 capy::continuation* cont)
859 {
860 // Already-expired fast path — no waiter_node, no mutex.
861 // Post instead of dispatch so the coroutine yields to the
862 // scheduler, allowing other queued work to run.
863 8420x if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
864 {
865 8398x if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
866 {
867 1x if (ec)
868 1x *ec = {};
869 1x d.post(*cont);
870 1x return std::noop_coroutine();
871 }
872 }
873
874 8419x auto* w = svc_->create_waiter();
875 8419x w->impl_ = this;
876 8419x w->svc_ = svc_;
877 8419x w->h_ = h;
878 8419x w->cont_ = cont;
879 8419x w->d_ = d;
880 8419x w->token_ = std::move(token);
881 8419x w->ec_out_ = ec;
882
883 8419x svc_->insert_waiter(*this, w);
884 8419x might_have_pending_waits_ = true;
885 8419x svc_->get_scheduler().work_started();
886
887 8419x if (w->token_.stop_possible())
888 48x w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
889
890 8419x return std::noop_coroutine();
891 }
892
893 // Free functions
894
895 struct timer_service_access
896 {
897 8625x static native_scheduler& get_scheduler(io_context& ctx) noexcept
898 {
899 8625x return static_cast<native_scheduler&>(*ctx.sched_);
900 }
901 };
902
903 // Bypass find_service() mutex by reading the scheduler's cached pointer
904 inline io_object::io_service&
905 8625x timer_service_direct(capy::execution_context& ctx) noexcept
906 {
907 8625x return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
908 8625x .timer_svc_;
909 }
910
911 inline std::size_t
912 6x timer_service_update_expiry(timer::implementation& base)
913 {
914 6x auto& impl = static_cast<timer_service::implementation&>(base);
915 6x return impl.svc_->update_timer(impl, impl.expiry_);
916 }
917
918 inline std::size_t
919 8x timer_service_cancel(timer::implementation& base) noexcept
920 {
921 8x auto& impl = static_cast<timer_service::implementation&>(base);
922 8x return impl.svc_->cancel_timer(impl);
923 }
924
925 inline std::size_t
926 2x timer_service_cancel_one(timer::implementation& base) noexcept
927 {
928 2x auto& impl = static_cast<timer_service::implementation&>(base);
929 2x return impl.svc_->cancel_one_waiter(impl);
930 }
931
932 inline timer_service&
933 515x get_timer_service(capy::execution_context& ctx, scheduler& sched)
934 {
935 515x return ctx.make_service<timer_service>(sched);
936 }
937
938 } // namespace boost::corosio::detail
939
940 #endif
941