mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-15 20:37:36 +02:00
Stop leaking goroutines past Control.Stop, consolidate punching in Punchy (#1708)
This commit is contained in:
84
scheduler.go
Normal file
84
scheduler.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package nebula
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Scheduler is an allocation-conscious dispatch primitive for delayed work.
|
||||
// Pending items are handed to time.AfterFunc, and ready items land on a worker
|
||||
// channel for centralized dispatch in fire-time order.
|
||||
//
|
||||
// Pick a Scheduler when fire timing matters (exact deadlines, no bucketing) or when the scheduling
|
||||
// rate is uneven enough that idle CPU matters. Each fire is a runtime-spawned goroutine running the callback before
|
||||
// delivering to the worker, which is fine at sparse rates but adds up at line rate.
|
||||
//
|
||||
// Pick a TimerWheel when scheduling is high-rate and uniform: its O(1) insert, internal item cache,
|
||||
// and bucket-batched dispatch are cheaper at scale.
|
||||
// The caller drives the tick loop (Advance/Purge) and pays for fires at bucket boundaries rather than exact deadlines.
|
||||
type Scheduler[T any] struct {
|
||||
queue chan T
|
||||
pool sync.Pool
|
||||
}
|
||||
|
||||
type schedItem[T any] struct {
|
||||
val T
|
||||
ctx context.Context
|
||||
s *Scheduler[T]
|
||||
timer *time.Timer
|
||||
fire func()
|
||||
}
|
||||
|
||||
// NewScheduler builds a Scheduler whose worker channel is sized to queueSize.
|
||||
// The buffer absorbs bursts of timers firing close together without
|
||||
// blocking the runtime's callback goroutines on the worker.
|
||||
func NewScheduler[T any](queueSize int) *Scheduler[T] {
|
||||
s := &Scheduler[T]{
|
||||
queue: make(chan T, queueSize),
|
||||
}
|
||||
s.pool.New = func() any {
|
||||
si := &schedItem[T]{s: s}
|
||||
// fire is allocated exactly once per pool-resident item.
|
||||
// The closure captures only `si`, which stays stable for the item's lifetime.
|
||||
si.fire = func() {
|
||||
select {
|
||||
case si.s.queue <- si.val:
|
||||
case <-si.ctx.Done():
|
||||
}
|
||||
var zero T
|
||||
si.val = zero
|
||||
si.ctx = nil
|
||||
si.s.pool.Put(si)
|
||||
}
|
||||
return si
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Schedule arranges item to be delivered to the worker after delay.
|
||||
// The runtime's timer heap handles the wait, so the scheduler itself burns no CPU while idle.
|
||||
// The callback observes ctx: if ctx is cancelled before the timer fires, the item is dropped instead of queued.
|
||||
func (s *Scheduler[T]) Schedule(ctx context.Context, item T, delay time.Duration) {
|
||||
si := s.pool.Get().(*schedItem[T])
|
||||
si.val = item
|
||||
si.ctx = ctx
|
||||
if si.timer == nil {
|
||||
si.timer = time.AfterFunc(delay, si.fire)
|
||||
} else {
|
||||
si.timer.Reset(delay)
|
||||
}
|
||||
}
|
||||
|
||||
// Run drains the worker queue, calling fn for each item. Returns when ctx is cancelled.
|
||||
// Tests that want deterministic timing should drive the queue directly rather than going through Schedule + Run.
|
||||
func (s *Scheduler[T]) Run(ctx context.Context, fn func(T)) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case item := <-s.queue:
|
||||
fn(item)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user