TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Steve Gerbino
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/corosio
9 : //
10 :
11 : #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 : #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13 :
14 : #include <boost/corosio/timer.hpp>
15 : #include <boost/corosio/io_context.hpp>
16 : #include <boost/corosio/detail/scheduler_op.hpp>
17 : #include <boost/corosio/native/native_scheduler.hpp>
18 : #include <boost/corosio/detail/intrusive.hpp>
19 : #include <boost/corosio/detail/thread_local_ptr.hpp>
20 : #include <boost/capy/error.hpp>
21 : #include <boost/capy/ex/execution_context.hpp>
22 : #include <boost/capy/ex/executor_ref.hpp>
23 : #include <system_error>
24 :
25 : #include <atomic>
26 : #include <chrono>
27 : #include <coroutine>
28 : #include <cstddef>
29 : #include <limits>
30 : #include <mutex>
31 : #include <optional>
32 : #include <stop_token>
33 : #include <utility>
34 : #include <vector>
35 :
36 : namespace boost::corosio::detail {
37 :
38 : struct scheduler;
39 :
40 : /*
41 : Timer Service
42 : =============
43 :
44 : Data Structures
45 : ---------------
46 : waiter_node holds per-waiter state: coroutine handle, executor,
47 : error output, stop_token, embedded completion_op. Each concurrent
48 : co_await t.wait() allocates one waiter_node.
49 :
50 : timer_service::implementation holds per-timer state: expiry,
51 : heap index, and an intrusive_list of waiter_nodes. Multiple
52 : coroutines can wait on the same timer simultaneously.
53 :
54 : timer_service owns a min-heap of active timers, a free list
55 : of recycled impls, and a free list of recycled waiter_nodes. The
56 : heap is ordered by expiry time; the scheduler queries
57 : nearest_expiry() to set the epoll/timerfd timeout.
58 :
59 : Optimization Strategy
60 : ---------------------
61 : 1. Deferred heap insertion — expires_after() stores the expiry
62 : but does not insert into the heap. Insertion happens in wait().
63 : 2. Thread-local impl cache — single-slot per-thread cache.
64 : 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
65 : 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
66 : 5. might_have_pending_waits_ flag — skips lock when no wait issued.
67 : 6. Thread-local waiter cache — single-slot per-thread cache.
68 :
69 : Concurrency
70 : -----------
71 : stop_token callbacks can fire from any thread. The impl_
72 : pointer on waiter_node is used as a "still in list" marker.
73 : */
74 :
75 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
76 :
77 : inline void timer_service_invalidate_cache() noexcept;
78 :
79 : // timer_service class body — member function definitions are
80 : // out-of-class (after implementation and waiter_node are complete)
81 : class BOOST_COROSIO_DECL timer_service final
82 : : public capy::execution_context::service
83 : , public io_object::io_service
84 : {
85 : public:
86 : using clock_type = std::chrono::steady_clock;
87 : using time_point = clock_type::time_point;
88 :
89 : /// Type-erased callback for earliest-expiry-changed notifications.
90 : class callback
91 : {
92 : void* ctx_ = nullptr;
93 : void (*fn_)(void*) = nullptr;
94 :
95 : public:
96 : /// Construct an empty callback.
97 HIT 515 : callback() = default;
98 :
99 : /// Construct a callback with the given context and function.
100 515 : callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
101 :
102 : /// Return true if the callback is non-empty.
103 : explicit operator bool() const noexcept
104 : {
105 : return fn_ != nullptr;
106 : }
107 :
108 : /// Invoke the callback.
109 8372 : void operator()() const
110 : {
111 8372 : if (fn_)
112 8372 : fn_(ctx_);
113 8372 : }
114 : };
115 :
116 : struct implementation;
117 :
118 : private:
119 : struct heap_entry
120 : {
121 : time_point time_;
122 : implementation* timer_;
123 : };
124 :
125 : scheduler* sched_ = nullptr;
126 : mutable std::mutex mutex_;
127 : std::vector<heap_entry> heap_;
128 : implementation* free_list_ = nullptr;
129 : waiter_node* waiter_free_list_ = nullptr;
130 : callback on_earliest_changed_;
131 : bool shutting_down_ = false;
132 : // Avoids mutex in nearest_expiry() and empty()
133 : mutable std::atomic<std::int64_t> cached_nearest_ns_{
134 : (std::numeric_limits<std::int64_t>::max)()};
135 :
136 : public:
137 : /// Construct the timer service bound to a scheduler.
138 515 : inline timer_service(capy::execution_context&, scheduler& sched)
139 515 : : sched_(&sched)
140 : {
141 515 : }
142 :
143 : /// Return the associated scheduler.
144 16830 : inline scheduler& get_scheduler() noexcept
145 : {
146 16830 : return *sched_;
147 : }
148 :
149 : /// Destroy the timer service.
150 1030 : ~timer_service() override = default;
151 :
152 : timer_service(timer_service const&) = delete;
153 : timer_service& operator=(timer_service const&) = delete;
154 :
155 : /// Register a callback invoked when the earliest expiry changes.
156 515 : inline void set_on_earliest_changed(callback cb)
157 : {
158 515 : on_earliest_changed_ = cb;
159 515 : }
160 :
161 : /// Return true if no timers are in the heap.
162 : inline bool empty() const noexcept
163 : {
164 : return cached_nearest_ns_.load(std::memory_order_acquire) ==
165 : (std::numeric_limits<std::int64_t>::max)();
166 : }
167 :
168 : /// Return the nearest timer expiry without acquiring the mutex.
169 144666 : inline time_point nearest_expiry() const noexcept
170 : {
171 144666 : auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
172 144666 : return time_point(time_point::duration(ns));
173 : }
174 :
175 : /// Cancel all pending timers and free cached resources.
176 : inline void shutdown() override;
177 :
178 : /// Construct a new timer implementation.
179 : inline io_object::implementation* construct() override;
180 :
181 : /// Destroy a timer implementation, cancelling pending waiters.
182 : inline void destroy(io_object::implementation* p) override;
183 :
184 : /// Cancel and recycle a timer implementation.
185 : inline void destroy_impl(implementation& impl);
186 :
187 : /// Create or recycle a waiter node.
188 : inline waiter_node* create_waiter();
189 :
190 : /// Return a waiter node to the cache or free list.
191 : inline void destroy_waiter(waiter_node* w);
192 :
193 : /// Update the timer expiry, cancelling existing waiters.
194 : inline std::size_t update_timer(implementation& impl, time_point new_time);
195 :
196 : /// Insert a waiter into the timer's waiter list and the heap.
197 : inline void insert_waiter(implementation& impl, waiter_node* w);
198 :
199 : /// Cancel all waiters on a timer.
200 : inline std::size_t cancel_timer(implementation& impl);
201 :
202 : /// Cancel a single waiter ( stop_token callback path ).
203 : inline void cancel_waiter(waiter_node* w);
204 :
205 : /// Cancel one waiter on a timer.
206 : inline std::size_t cancel_one_waiter(implementation& impl);
207 :
208 : /// Complete all waiters whose timers have expired.
209 : inline std::size_t process_expired();
210 :
211 : private:
212 173127 : inline void refresh_cached_nearest() noexcept
213 : {
214 173127 : auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
215 172640 : : heap_[0].time_.time_since_epoch().count();
216 173127 : cached_nearest_ns_.store(ns, std::memory_order_release);
217 173127 : }
218 :
219 : inline void remove_timer_impl(implementation& impl);
220 : inline void up_heap(std::size_t index);
221 : inline void down_heap(std::size_t index);
222 : inline void swap_heap(std::size_t i1, std::size_t i2);
223 : };
224 :
225 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
226 : : intrusive_list<waiter_node>::node
227 : {
228 : // Embedded completion op — avoids heap allocation per fire/cancel
229 : struct completion_op final : scheduler_op
230 : {
231 : waiter_node* waiter_ = nullptr;
232 :
233 : static void do_complete(
234 : void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
235 :
236 210 : completion_op() noexcept : scheduler_op(&do_complete) {}
237 :
238 : void operator()() override;
239 : void destroy() override;
240 : };
241 :
242 : // Per-waiter stop_token cancellation
243 : struct canceller
244 : {
245 : waiter_node* waiter_;
246 : void operator()() const;
247 : };
248 :
249 : // nullptr once removed from timer's waiter list (concurrency marker)
250 : timer_service::implementation* impl_ = nullptr;
251 : timer_service* svc_ = nullptr;
252 : std::coroutine_handle<> h_;
253 : capy::continuation* cont_ = nullptr;
254 : capy::executor_ref d_;
255 : std::error_code* ec_out_ = nullptr;
256 : std::stop_token token_;
257 : std::optional<std::stop_callback<canceller>> stop_cb_;
258 : completion_op op_;
259 : std::error_code ec_value_;
260 : waiter_node* next_free_ = nullptr;
261 :
262 210 : waiter_node() noexcept
263 210 : {
264 210 : op_.waiter_ = this;
265 210 : }
266 : };
267 :
268 : struct timer_service::implementation final : timer::implementation
269 : {
270 : using clock_type = std::chrono::steady_clock;
271 : using time_point = clock_type::time_point;
272 : using duration = clock_type::duration;
273 :
274 : timer_service* svc_ = nullptr;
275 : intrusive_list<waiter_node> waiters_;
276 :
277 : // Free list linkage (reused when impl is on free_list)
278 : implementation* next_free_ = nullptr;
279 :
280 : inline explicit implementation(timer_service& svc) noexcept;
281 :
282 : inline std::coroutine_handle<> wait(
283 : std::coroutine_handle<>,
284 : capy::executor_ref,
285 : std::stop_token,
286 : std::error_code*,
287 : capy::continuation*) override;
288 : };
289 :
290 : // Thread-local caches avoid hot-path mutex acquisitions:
291 : // 1. Impl cache — single-slot, validated by comparing svc_
292 : // 2. Waiter cache — single-slot, no service affinity
293 : // All caches are cleared by timer_service_invalidate_cache() during shutdown.
294 :
295 : inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
296 : inline thread_local_ptr<waiter_node> tl_cached_waiter;
297 :
298 : inline timer_service::implementation*
299 8625 : try_pop_tl_cache(timer_service* svc) noexcept
300 : {
301 8625 : auto* impl = tl_cached_impl.get();
302 8625 : if (impl)
303 : {
304 8376 : tl_cached_impl.set(nullptr);
305 8376 : if (impl->svc_ == svc)
306 8376 : return impl;
307 : // Stale impl from a destroyed service
308 MIS 0 : delete impl;
309 : }
310 HIT 249 : return nullptr;
311 : }
312 :
313 : inline bool
314 8617 : try_push_tl_cache(timer_service::implementation* impl) noexcept
315 : {
316 8617 : if (!tl_cached_impl.get())
317 : {
318 8537 : tl_cached_impl.set(impl);
319 8537 : return true;
320 : }
321 80 : return false;
322 : }
323 :
324 : inline waiter_node*
325 8419 : try_pop_waiter_tl_cache() noexcept
326 : {
327 8419 : auto* w = tl_cached_waiter.get();
328 8419 : if (w)
329 : {
330 8207 : tl_cached_waiter.set(nullptr);
331 8207 : return w;
332 : }
333 212 : return nullptr;
334 : }
335 :
336 : inline bool
337 8403 : try_push_waiter_tl_cache(waiter_node* w) noexcept
338 : {
339 8403 : if (!tl_cached_waiter.get())
340 : {
341 8323 : tl_cached_waiter.set(w);
342 8323 : return true;
343 : }
344 80 : return false;
345 : }
346 :
347 : inline void
348 515 : timer_service_invalidate_cache() noexcept
349 : {
350 515 : delete tl_cached_impl.get();
351 515 : tl_cached_impl.set(nullptr);
352 :
353 515 : delete tl_cached_waiter.get();
354 515 : tl_cached_waiter.set(nullptr);
355 515 : }
356 :
357 : // timer_service out-of-class member function definitions
358 :
359 249 : inline timer_service::implementation::implementation(
360 249 : timer_service& svc) noexcept
361 249 : : svc_(&svc)
362 : {
363 249 : }
364 :
365 : inline void
366 515 : timer_service::shutdown()
367 : {
368 515 : timer_service_invalidate_cache();
369 515 : shutting_down_ = true;
370 :
371 : // Snapshot impls and detach them from the heap so that
372 : // coroutine-owned timer destructors (triggered by h.destroy()
373 : // below) cannot re-enter remove_timer_impl() and mutate the
374 : // vector during iteration.
375 515 : std::vector<implementation*> impls;
376 515 : impls.reserve(heap_.size());
377 523 : for (auto& entry : heap_)
378 : {
379 8 : entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
380 8 : impls.push_back(entry.timer_);
381 : }
382 515 : heap_.clear();
383 515 : cached_nearest_ns_.store(
384 : (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
385 :
386 : // Cancel waiting timers. Each waiter called work_started()
387 : // in implementation::wait(). On IOCP the scheduler shutdown
388 : // loop exits when outstanding_work_ reaches zero, so we must
389 : // call work_finished() here to balance it. On other backends
390 : // this is harmless.
391 523 : for (auto* impl : impls)
392 : {
393 16 : while (auto* w = impl->waiters_.pop_front())
394 : {
395 8 : w->stop_cb_.reset();
396 8 : auto h = std::exchange(w->h_, {});
397 8 : sched_->work_finished();
398 8 : if (h)
399 8 : h.destroy();
400 8 : delete w;
401 8 : }
402 8 : delete impl;
403 : }
404 :
405 : // Delete free-listed impls
406 595 : while (free_list_)
407 : {
408 80 : auto* next = free_list_->next_free_;
409 80 : delete free_list_;
410 80 : free_list_ = next;
411 : }
412 :
413 : // Delete free-listed waiters
414 593 : while (waiter_free_list_)
415 : {
416 78 : auto* next = waiter_free_list_->next_free_;
417 78 : delete waiter_free_list_;
418 78 : waiter_free_list_ = next;
419 : }
420 515 : }
421 :
422 : inline io_object::implementation*
423 8625 : timer_service::construct()
424 : {
425 8625 : implementation* impl = try_pop_tl_cache(this);
426 8625 : if (impl)
427 : {
428 8376 : impl->svc_ = this;
429 8376 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
430 8376 : impl->might_have_pending_waits_ = false;
431 8376 : return impl;
432 : }
433 :
434 249 : std::lock_guard lock(mutex_);
435 249 : if (free_list_)
436 : {
437 MIS 0 : impl = free_list_;
438 0 : free_list_ = impl->next_free_;
439 0 : impl->next_free_ = nullptr;
440 0 : impl->svc_ = this;
441 0 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
442 0 : impl->might_have_pending_waits_ = false;
443 : }
444 : else
445 : {
446 HIT 249 : impl = new implementation(*this);
447 : }
448 249 : return impl;
449 249 : }
450 :
451 : inline void
452 8623 : timer_service::destroy(io_object::implementation* p)
453 : {
454 8623 : destroy_impl(static_cast<implementation&>(*p));
455 8623 : }
456 :
457 : inline void
458 8623 : timer_service::destroy_impl(implementation& impl)
459 : {
460 : // During shutdown the impl is owned by the shutdown loop.
461 : // Re-entering here (from a coroutine-owned timer destructor
462 : // triggered by h.destroy()) must not modify the heap or
463 : // recycle the impl — shutdown deletes it directly.
464 8623 : if (shutting_down_)
465 8543 : return;
466 :
467 8617 : cancel_timer(impl);
468 :
469 8617 : if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
470 : {
471 MIS 0 : std::lock_guard lock(mutex_);
472 0 : remove_timer_impl(impl);
473 0 : refresh_cached_nearest();
474 0 : }
475 :
476 HIT 8617 : if (try_push_tl_cache(&impl))
477 8537 : return;
478 :
479 80 : std::lock_guard lock(mutex_);
480 80 : impl.next_free_ = free_list_;
481 80 : free_list_ = &impl;
482 80 : }
483 :
484 : inline waiter_node*
485 8419 : timer_service::create_waiter()
486 : {
487 8419 : if (auto* w = try_pop_waiter_tl_cache())
488 8207 : return w;
489 :
490 212 : std::lock_guard lock(mutex_);
491 212 : if (waiter_free_list_)
492 : {
493 2 : auto* w = waiter_free_list_;
494 2 : waiter_free_list_ = w->next_free_;
495 2 : w->next_free_ = nullptr;
496 2 : return w;
497 : }
498 :
499 210 : return new waiter_node();
500 212 : }
501 :
502 : inline void
503 8403 : timer_service::destroy_waiter(waiter_node* w)
504 : {
505 8403 : if (try_push_waiter_tl_cache(w))
506 8323 : return;
507 :
508 80 : std::lock_guard lock(mutex_);
509 80 : w->next_free_ = waiter_free_list_;
510 80 : waiter_free_list_ = w;
511 80 : }
512 :
513 : inline std::size_t
514 6 : timer_service::update_timer(implementation& impl, time_point new_time)
515 : {
516 : bool in_heap =
517 6 : (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
518 6 : if (!in_heap && impl.waiters_.empty())
519 MIS 0 : return 0;
520 :
521 HIT 6 : bool notify = false;
522 6 : intrusive_list<waiter_node> canceled;
523 :
524 : {
525 6 : std::lock_guard lock(mutex_);
526 :
527 16 : while (auto* w = impl.waiters_.pop_front())
528 : {
529 10 : w->impl_ = nullptr;
530 10 : canceled.push_back(w);
531 10 : }
532 :
533 6 : if (impl.heap_index_ < heap_.size())
534 : {
535 6 : time_point old_time = heap_[impl.heap_index_].time_;
536 6 : heap_[impl.heap_index_].time_ = new_time;
537 :
538 6 : if (new_time < old_time)
539 6 : up_heap(impl.heap_index_);
540 : else
541 MIS 0 : down_heap(impl.heap_index_);
542 :
543 HIT 6 : notify = (impl.heap_index_ == 0);
544 : }
545 :
546 6 : refresh_cached_nearest();
547 6 : }
548 :
549 6 : std::size_t count = 0;
550 16 : while (auto* w = canceled.pop_front())
551 : {
552 10 : w->ec_value_ = make_error_code(capy::error::canceled);
553 10 : sched_->post(&w->op_);
554 10 : ++count;
555 10 : }
556 :
557 6 : if (notify)
558 6 : on_earliest_changed_();
559 :
560 6 : return count;
561 : }
562 :
563 : inline void
564 8419 : timer_service::insert_waiter(implementation& impl, waiter_node* w)
565 : {
566 8419 : bool notify = false;
567 : {
568 8419 : std::lock_guard lock(mutex_);
569 8419 : if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
570 : {
571 8397 : impl.heap_index_ = heap_.size();
572 8397 : heap_.push_back({impl.expiry_, &impl});
573 8397 : up_heap(heap_.size() - 1);
574 8397 : notify = (impl.heap_index_ == 0);
575 8397 : refresh_cached_nearest();
576 : }
577 8419 : impl.waiters_.push_back(w);
578 8419 : }
579 8419 : if (notify)
580 8366 : on_earliest_changed_();
581 8419 : }
582 :
583 : inline std::size_t
584 8625 : timer_service::cancel_timer(implementation& impl)
585 : {
586 8625 : if (!impl.might_have_pending_waits_)
587 8601 : return 0;
588 :
589 : // Not in heap and no waiters — just clear the flag
590 24 : if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
591 MIS 0 : impl.waiters_.empty())
592 : {
593 0 : impl.might_have_pending_waits_ = false;
594 0 : return 0;
595 : }
596 :
597 HIT 24 : intrusive_list<waiter_node> canceled;
598 :
599 : {
600 24 : std::lock_guard lock(mutex_);
601 24 : remove_timer_impl(impl);
602 52 : while (auto* w = impl.waiters_.pop_front())
603 : {
604 28 : w->impl_ = nullptr;
605 28 : canceled.push_back(w);
606 28 : }
607 24 : refresh_cached_nearest();
608 24 : }
609 :
610 24 : impl.might_have_pending_waits_ = false;
611 :
612 24 : std::size_t count = 0;
613 52 : while (auto* w = canceled.pop_front())
614 : {
615 28 : w->ec_value_ = make_error_code(capy::error::canceled);
616 28 : sched_->post(&w->op_);
617 28 : ++count;
618 28 : }
619 :
620 24 : return count;
621 : }
622 :
623 : inline void
624 30 : timer_service::cancel_waiter(waiter_node* w)
625 : {
626 : {
627 30 : std::lock_guard lock(mutex_);
628 : // Already removed by cancel_timer or process_expired
629 30 : if (!w->impl_)
630 MIS 0 : return;
631 HIT 30 : auto* impl = w->impl_;
632 30 : w->impl_ = nullptr;
633 30 : impl->waiters_.remove(w);
634 30 : if (impl->waiters_.empty())
635 : {
636 28 : remove_timer_impl(*impl);
637 28 : impl->might_have_pending_waits_ = false;
638 : }
639 30 : refresh_cached_nearest();
640 30 : }
641 :
642 30 : w->ec_value_ = make_error_code(capy::error::canceled);
643 30 : sched_->post(&w->op_);
644 : }
645 :
646 : inline std::size_t
647 2 : timer_service::cancel_one_waiter(implementation& impl)
648 : {
649 2 : if (!impl.might_have_pending_waits_)
650 MIS 0 : return 0;
651 :
652 HIT 2 : waiter_node* w = nullptr;
653 :
654 : {
655 2 : std::lock_guard lock(mutex_);
656 2 : w = impl.waiters_.pop_front();
657 2 : if (!w)
658 MIS 0 : return 0;
659 HIT 2 : w->impl_ = nullptr;
660 2 : if (impl.waiters_.empty())
661 : {
662 MIS 0 : remove_timer_impl(impl);
663 0 : impl.might_have_pending_waits_ = false;
664 : }
665 HIT 2 : refresh_cached_nearest();
666 2 : }
667 :
668 2 : w->ec_value_ = make_error_code(capy::error::canceled);
669 2 : sched_->post(&w->op_);
670 2 : return 1;
671 : }
672 :
673 : inline std::size_t
674 164668 : timer_service::process_expired()
675 : {
676 164668 : intrusive_list<waiter_node> expired;
677 :
678 : {
679 164668 : std::lock_guard lock(mutex_);
680 164668 : auto now = clock_type::now();
681 :
682 173005 : while (!heap_.empty() && heap_[0].time_ <= now)
683 : {
684 8337 : implementation* t = heap_[0].timer_;
685 8337 : remove_timer_impl(*t);
686 16678 : while (auto* w = t->waiters_.pop_front())
687 : {
688 8341 : w->impl_ = nullptr;
689 8341 : w->ec_value_ = {};
690 8341 : expired.push_back(w);
691 8341 : }
692 8337 : t->might_have_pending_waits_ = false;
693 : }
694 :
695 164668 : refresh_cached_nearest();
696 164668 : }
697 :
698 164668 : std::size_t count = 0;
699 173009 : while (auto* w = expired.pop_front())
700 : {
701 8341 : sched_->post(&w->op_);
702 8341 : ++count;
703 8341 : }
704 :
705 164668 : return count;
706 : }
707 :
708 : inline void
709 8389 : timer_service::remove_timer_impl(implementation& impl)
710 : {
711 8389 : std::size_t index = impl.heap_index_;
712 8389 : if (index >= heap_.size())
713 MIS 0 : return; // Not in heap
714 :
715 HIT 8389 : if (index == heap_.size() - 1)
716 : {
717 : // Last element, just pop
718 152 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
719 152 : heap_.pop_back();
720 : }
721 : else
722 : {
723 : // Swap with last and reheapify
724 8237 : swap_heap(index, heap_.size() - 1);
725 8237 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
726 8237 : heap_.pop_back();
727 :
728 8237 : if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
729 MIS 0 : up_heap(index);
730 : else
731 HIT 8237 : down_heap(index);
732 : }
733 : }
734 :
735 : inline void
736 8403 : timer_service::up_heap(std::size_t index)
737 : {
738 16619 : while (index > 0)
739 : {
740 8247 : std::size_t parent = (index - 1) / 2;
741 8247 : if (!(heap_[index].time_ < heap_[parent].time_))
742 31 : break;
743 8216 : swap_heap(index, parent);
744 8216 : index = parent;
745 : }
746 8403 : }
747 :
748 : inline void
749 8237 : timer_service::down_heap(std::size_t index)
750 : {
751 8237 : std::size_t child = index * 2 + 1;
752 8237 : while (child < heap_.size())
753 : {
754 6 : std::size_t min_child = (child + 1 == heap_.size() ||
755 MIS 0 : heap_[child].time_ < heap_[child + 1].time_)
756 HIT 6 : ? child
757 6 : : child + 1;
758 :
759 6 : if (heap_[index].time_ < heap_[min_child].time_)
760 6 : break;
761 :
762 MIS 0 : swap_heap(index, min_child);
763 0 : index = min_child;
764 0 : child = index * 2 + 1;
765 : }
766 HIT 8237 : }
767 :
768 : inline void
769 16453 : timer_service::swap_heap(std::size_t i1, std::size_t i2)
770 : {
771 16453 : heap_entry tmp = heap_[i1];
772 16453 : heap_[i1] = heap_[i2];
773 16453 : heap_[i2] = tmp;
774 16453 : heap_[i1].timer_->heap_index_ = i1;
775 16453 : heap_[i2].timer_->heap_index_ = i2;
776 16453 : }
777 :
778 : // waiter_node out-of-class member function definitions
779 :
780 : inline void
781 30 : waiter_node::canceller::operator()() const
782 : {
783 30 : waiter_->svc_->cancel_waiter(waiter_);
784 30 : }
785 :
786 : inline void
787 MIS 0 : waiter_node::completion_op::do_complete(
788 : [[maybe_unused]] void* owner,
789 : scheduler_op* base,
790 : std::uint32_t,
791 : std::uint32_t)
792 : {
793 : // owner is always non-null here. The destroy path (owner == nullptr)
794 : // is unreachable because completion_op overrides destroy() directly,
795 : // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
796 0 : BOOST_COROSIO_ASSERT(owner);
797 0 : static_cast<completion_op*>(base)->operator()();
798 0 : }
799 :
800 : inline void
801 HIT 8403 : waiter_node::completion_op::operator()()
802 : {
803 8403 : auto* w = waiter_;
804 8403 : w->stop_cb_.reset();
805 8403 : if (w->ec_out_)
806 8403 : *w->ec_out_ = w->ec_value_;
807 :
808 8403 : auto* cont = w->cont_;
809 8403 : auto d = w->d_;
810 8403 : auto* svc = w->svc_;
811 8403 : auto& sched = svc->get_scheduler();
812 :
813 8403 : svc->destroy_waiter(w);
814 :
815 8403 : d.post(*cont);
816 8403 : sched.work_finished();
817 8403 : }
818 :
819 : // GCC 14 false-positive: inlining ~optional<stop_callback> through
820 : // delete loses track that stop_cb_ was already .reset() above.
821 : #if defined(__GNUC__) && !defined(__clang__)
822 : #pragma GCC diagnostic push
823 : #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
824 : #endif
825 : inline void
826 8 : waiter_node::completion_op::destroy()
827 : {
828 : // Called during scheduler shutdown drain when this completion_op is
829 : // in the scheduler's ready queue (posted by cancel_timer() or
830 : // process_expired()). Balances the work_started() from
831 : // implementation::wait(). The scheduler drain loop separately
832 : // balances the work_started() from post(). On IOCP both decrements
833 : // are required for outstanding_work_ to reach zero; on other
834 : // backends this is harmless.
835 : //
836 : // This override also prevents scheduler_op::destroy() from calling
837 : // do_complete(nullptr, ...). See also: timer_service::shutdown()
838 : // which drains waiters still in the timer heap (the other path).
839 8 : auto* w = waiter_;
840 8 : w->stop_cb_.reset();
841 8 : auto h = std::exchange(w->h_, {});
842 8 : auto& sched = w->svc_->get_scheduler();
843 8 : delete w;
844 8 : sched.work_finished();
845 8 : if (h)
846 8 : h.destroy();
847 8 : }
848 : #if defined(__GNUC__) && !defined(__clang__)
849 : #pragma GCC diagnostic pop
850 : #endif
851 :
852 : inline std::coroutine_handle<>
853 8420 : timer_service::implementation::wait(
854 : std::coroutine_handle<> h,
855 : capy::executor_ref d,
856 : std::stop_token token,
857 : std::error_code* ec,
858 : capy::continuation* cont)
859 : {
860 : // Already-expired fast path — no waiter_node, no mutex.
861 : // Post instead of dispatch so the coroutine yields to the
862 : // scheduler, allowing other queued work to run.
863 8420 : if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
864 : {
865 8398 : if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
866 : {
867 1 : if (ec)
868 1 : *ec = {};
869 1 : d.post(*cont);
870 1 : return std::noop_coroutine();
871 : }
872 : }
873 :
874 8419 : auto* w = svc_->create_waiter();
875 8419 : w->impl_ = this;
876 8419 : w->svc_ = svc_;
877 8419 : w->h_ = h;
878 8419 : w->cont_ = cont;
879 8419 : w->d_ = d;
880 8419 : w->token_ = std::move(token);
881 8419 : w->ec_out_ = ec;
882 :
883 8419 : svc_->insert_waiter(*this, w);
884 8419 : might_have_pending_waits_ = true;
885 8419 : svc_->get_scheduler().work_started();
886 :
887 8419 : if (w->token_.stop_possible())
888 48 : w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
889 :
890 8419 : return std::noop_coroutine();
891 : }
892 :
893 : // Free functions
894 :
895 : struct timer_service_access
896 : {
897 8625 : static native_scheduler& get_scheduler(io_context& ctx) noexcept
898 : {
899 8625 : return static_cast<native_scheduler&>(*ctx.sched_);
900 : }
901 : };
902 :
903 : // Bypass find_service() mutex by reading the scheduler's cached pointer
904 : inline io_object::io_service&
905 8625 : timer_service_direct(capy::execution_context& ctx) noexcept
906 : {
907 8625 : return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
908 8625 : .timer_svc_;
909 : }
910 :
911 : inline std::size_t
912 6 : timer_service_update_expiry(timer::implementation& base)
913 : {
914 6 : auto& impl = static_cast<timer_service::implementation&>(base);
915 6 : return impl.svc_->update_timer(impl, impl.expiry_);
916 : }
917 :
918 : inline std::size_t
919 8 : timer_service_cancel(timer::implementation& base) noexcept
920 : {
921 8 : auto& impl = static_cast<timer_service::implementation&>(base);
922 8 : return impl.svc_->cancel_timer(impl);
923 : }
924 :
925 : inline std::size_t
926 2 : timer_service_cancel_one(timer::implementation& base) noexcept
927 : {
928 2 : auto& impl = static_cast<timer_service::implementation&>(base);
929 2 : return impl.svc_->cancel_one_waiter(impl);
930 : }
931 :
932 : inline timer_service&
933 515 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
934 : {
935 515 : return ctx.make_service<timer_service>(sched);
936 : }
937 :
938 : } // namespace boost::corosio::detail
939 :
940 : #endif
|