From 1ab1f71dba7b5b2f543f444f678db5bd46406bb1 Mon Sep 17 00:00:00 2001 From: Nate Brown Date: Mon, 27 Apr 2026 12:25:24 -0500 Subject: [PATCH] Make stats a server we can reconfigure and start/stop (#1670) --- examples/config.yml | 5 + main.go | 4 +- stats.go | 424 ++++++++++++++++++++++++++++++++++---------- stats_test.go | 410 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 744 insertions(+), 99 deletions(-) create mode 100644 stats_test.go diff --git a/examples/config.yml b/examples/config.yml index b02b3d58..f5752ae4 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -304,6 +304,9 @@ logging: #disable_timestamp: true # Timestamps use RFC3339Nano ("2006-01-02T15:04:05.999999999Z07:00") and are not configurable. +# The stats section is reloadable. A HUP may change the backend, toggle stats +# on or off, switch the listen/host address, or pick up new DNS for the +# configured graphite host. #stats: #type: graphite #prefix: nebula @@ -321,10 +324,12 @@ logging: # enables counter metrics for meta packets # e.g.: `messages.tx.handshake` # NOTE: `message.{tx,rx}.recv_error` is always emitted + # Not reloadable. #message_metrics: false # enables detailed counter metrics for lighthouse packets # e.g.: `lighthouse.rx.HostQuery` + # Not reloadable. #lighthouse_metrics: false # Handshake Manager Settings diff --git a/main.go b/main.go index f692f317..eef13c97 100644 --- a/main.go +++ b/main.go @@ -246,7 +246,7 @@ func Main(c *config.C, configTest bool, buildVersion string, l *slog.Logger, dev go handshakeManager.Run(ctx) } - statsStart, err := startStats(l, c, buildVersion, configTest) + stats, err := newStatsServerFromConfig(ctx, l, c, buildVersion, configTest) if err != nil { return nil, util.ContextualizeIfNeeded("Failed to start stats emitter", err) } @@ -266,7 +266,7 @@ func Main(c *config.C, configTest bool, buildVersion string, l *slog.Logger, dev ctx: ctx, cancel: cancel, sshStart: sshStart, - statsStart: statsStart, + statsStart: stats.Start, dnsStart: ds.Start, lighthouseStart: lightHouse.StartUpdateWorker, connectionManagerStart: connManager.Start, diff --git a/stats.go b/stats.go index c7bf3a06..97ce7cf5 100644 --- a/stats.go +++ b/stats.go @@ -1,14 +1,16 @@ package nebula import ( + "context" "errors" "fmt" - "log" "log/slog" "net" "net/http" "runtime" "strconv" + "sync" + "sync/atomic" "time" graphite "github.com/cyberdelia/go-metrics-graphite" @@ -19,119 +21,347 @@ import ( "github.com/slackhq/nebula/config" ) -// startStats initializes stats from config. On success, if any further work -// is needed to serve stats, it returns a func to handle that work. If no -// work is needed, it'll return nil. On failure, it returns nil, error. -func startStats(l *slog.Logger, c *config.C, buildVersion string, configTest bool) (func(), error) { - mType := c.GetString("stats.type", "") - if mType == "" || mType == "none" { - return nil, nil - } +// statsServer owns nebula's stats subsystem: the periodic metric capture +// goroutine and (for prometheus) an HTTP listener. It mirrors the lifecycle +// shape of dnsServer: constructor wires the reload callback, reload records +// config, Start builds and runs the runtime, Stop tears it down. +type statsServer struct { + l *slog.Logger + ctx context.Context + buildVersion string + configTest bool - interval := c.GetDuration("stats.interval", 0) - if interval == 0 { - return nil, fmt.Errorf("stats.interval was an invalid duration: %s", c.GetString("stats.interval", "")) - } + // enabled mirrors "stats configured to a real backend". Start consults + // it so callers don't need to know the gating rules. + enabled atomic.Bool - var startFn func() - switch mType { - case "graphite": - err := startGraphiteStats(l, interval, c, configTest) - if err != nil { - return nil, err - } - case "prometheus": - var err error - startFn, err = startPrometheusStats(l, interval, c, buildVersion, configTest) - if err != nil { - return nil, err - } - default: - return nil, fmt.Errorf("stats.type was not understood: %s", mType) - } - - metrics.RegisterDebugGCStats(metrics.DefaultRegistry) - metrics.RegisterRuntimeMemStats(metrics.DefaultRegistry) - - go metrics.CaptureDebugGCStats(metrics.DefaultRegistry, interval) - go metrics.CaptureRuntimeMemStats(metrics.DefaultRegistry, interval) - - return startFn, nil + runMu sync.Mutex + runCfg *statsConfig + run *statsRuntime // non-nil while a runtime is live } -func startGraphiteStats(l *slog.Logger, i time.Duration, c *config.C, configTest bool) error { - proto := c.GetString("stats.protocol", "tcp") - host := c.GetString("stats.host", "") - if host == "" { - return errors.New("stats.host can not be empty") +// statsRuntime is the live state owned by a single Start invocation. Start +// stashes a pointer under runMu; Stop and Start's own exit path use pointer +// equality to tell "my runtime" apart from one that replaced it after a +// reload. +type statsRuntime struct { + cancel context.CancelFunc + listener *http.Server // nil for graphite +} + +// statsConfig is the snapshot of stats-related config that drives the runtime. +// It is comparable with == so reload can detect "no change" cheaply. +type statsConfig struct { + typ string + interval time.Duration + graphite graphiteConfig + prom promConfig +} + +type graphiteConfig struct { + protocol string + host string + // resolvedAddr is the string form of host resolved at config-load time. + // Including it in the struct means a SIGHUP picks up DNS changes even + // when stats.host hasn't been edited. + resolvedAddr string + prefix string +} + +type promConfig struct { + listen string + path string + namespace string + subsystem string +} + +// newStatsServerFromConfig builds a statsServer, applies the initial config, +// and registers a reload callback. The reload callback is registered before +// the initial config is applied so a SIGHUP can later enable, fix, or disable +// stats even if the initial application failed. +// +// Start is safe to call unconditionally: it no-ops when stats are disabled. +// The returned pointer is always non-nil, even on error. +func newStatsServerFromConfig(ctx context.Context, l *slog.Logger, c *config.C, buildVersion string, configTest bool) (*statsServer, error) { + s := &statsServer{ + l: l, + ctx: ctx, + buildVersion: buildVersion, + configTest: configTest, } - prefix := c.GetString("stats.prefix", "nebula") - addr, err := net.ResolveTCPAddr(proto, host) + c.RegisterReloadCallback(func(c *config.C) { + if err := s.reload(c, false); err != nil { + s.l.Error("Failed to reload stats from config", "error", err) + } + }) + + if err := s.reload(c, true); err != nil { + return s, err + } + return s, nil +} + +// reload records the latest config. On the initial call it only records it; +// Control.Start is what launches the first runtime via statsStart. On later +// calls it reconciles the running runtime with the new config: +// +// - newly enabled -> spawn Start +// - newly disabled -> Stop the runtime +// - config changed (still enabled) -> Stop the old, Start the new +// - no change -> no-op +func (s *statsServer) reload(c *config.C, initial bool) error { + newCfg, err := loadStatsConfig(c) if err != nil { - return fmt.Errorf("error while setting up graphite sink: %s", err) + return err + } + enabled := newCfg.typ != "" && newCfg.typ != "none" + + s.runMu.Lock() + sameCfg := s.runCfg != nil && *s.runCfg == newCfg + s.runCfg = &newCfg + running := s.run != nil + s.runMu.Unlock() + + s.enabled.Store(enabled) + + if initial || sameCfg { + return nil } - if !configTest { - l.Info("Starting graphite", - "interval", i, - "prefix", prefix, - "addr", addr.String(), - ) - go graphite.Graphite(metrics.DefaultRegistry, i, prefix, addr) + if running { + s.Stop() + } + if enabled && !s.configTest { + go s.Start() } return nil } -func startPrometheusStats(l *slog.Logger, i time.Duration, c *config.C, buildVersion string, configTest bool) (func(), error) { - namespace := c.GetString("stats.namespace", "") - subsystem := c.GetString("stats.subsystem", "") - - listen := c.GetString("stats.listen", "") - if listen == "" { - return nil, fmt.Errorf("stats.listen should not be empty") +// Start builds the runtime from the latest config, spawns the capture loop, +// and blocks until Stop is called or ctx fires. For prometheus it also serves +// the HTTP listener. For graphite it blocks on the capture loop's context. +// Safe to call when stats are disabled or already running (both no-op). +func (s *statsServer) Start() { + if !s.enabled.Load() || s.configTest { + return } - path := c.GetString("stats.path", "") - if path == "" { - return nil, fmt.Errorf("stats.path should not be empty") + s.runMu.Lock() + if s.ctx.Err() != nil || s.run != nil || s.runCfg == nil { + s.runMu.Unlock() + return + } + cfg := *s.runCfg + captureFns, listener := s.buildRuntime(cfg) + runCtx, cancel := context.WithCancel(s.ctx) + rt := &statsRuntime{cancel: cancel, listener: listener} + s.run = rt + s.runMu.Unlock() + + go captureStatsLoop(runCtx, cfg.interval, captureFns) + + cleanExit := true + if listener == nil { + // Graphite: no HTTP listener to serve; block until teardown. + <-runCtx.Done() + } else { + cleanExit = s.serveListener(listener) } - pr := prometheus.NewRegistry() - pClient := mp.NewPrometheusProvider(metrics.DefaultRegistry, namespace, subsystem, pr, i) - if !configTest { - go pClient.UpdatePrometheusMetrics() - } - - // Export our version information as labels on a static gauge - g := prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "info", - Help: "Version information for the Nebula binary", - ConstLabels: prometheus.Labels{ - "version": buildVersion, - "goversion": runtime.Version(), - "boringcrypto": strconv.FormatBool(boringEnabled()), - }, - }) - pr.MustRegister(g) - g.Set(1) - - var startFn func() - if !configTest { - // promhttp.HandlerOpts.ErrorLog needs a stdlib-shaped Println logger, - // so bridge our slog.Logger back to a *log.Logger that emits at Error. - errLog := slog.NewLogLogger(l.Handler(), slog.LevelError) - startFn = func() { - l.Info("Prometheus stats listening", - "listen", listen, - "path", path, - ) - http.Handle(path, promhttp.HandlerFor(pr, promhttp.HandlerOpts{ErrorLog: errLog})) - log.Fatal(http.ListenAndServe(listen, nil)) + // Clear our runtime only if nothing has replaced it. Stop races through + // here too but leaves s.run == nil, so the pointer check skips. + s.runMu.Lock() + if s.run == rt { + rt.cancel() + s.run = nil + // A listener that exited with an error (e.g., bind conflict) leaves + // runCfg cached as if it were applied. Drop it so a SIGHUP with the + // same config re-triggers Start once the user fixes the underlying + // problem. + if !cleanExit { + s.runCfg = nil } } - - return startFn, nil + s.runMu.Unlock() +} + +// serveListener runs ListenAndServe and ensures ctx cancellation unblocks it. +// Returns true if the listener exited cleanly (Stop, ctx cancellation, or any +// other http.ErrServerClosed path), false on an unexpected error. +func (s *statsServer) serveListener(listener *http.Server) bool { + // Per-invocation watcher: ctx cancellation triggers a listener shutdown + // which in turn unblocks ListenAndServe. Closing `done` on exit keeps + // the watcher from outliving this call. + done := make(chan struct{}) + go func() { + select { + case <-s.ctx.Done(): + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := listener.Shutdown(shutdownCtx); err != nil { + s.l.Warn("Failed to shut down prometheus stats listener", "error", err) + } + case <-done: + } + }() + defer close(done) + + s.l.Info("Starting prometheus stats listener", "addr", listener.Addr) + err := listener.ListenAndServe() + if err == nil || errors.Is(err, http.ErrServerClosed) { + return true + } + s.l.Error("Prometheus stats listener exited", "error", err) + return false +} + +// Stop tears down the active runtime, if any. Idempotent. +func (s *statsServer) Stop() { + s.runMu.Lock() + rt := s.run + s.run = nil + s.runMu.Unlock() + if rt == nil { + return + } + rt.cancel() + if rt.listener != nil { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := rt.listener.Shutdown(shutdownCtx); err != nil { + s.l.Warn("Failed to shut down prometheus stats listener", "error", err) + } + cancel() + } +} + +// buildRuntime produces the capture functions and, for prometheus, an un-served +// http.Server from cfg. cfg has already been validated by loadStatsConfig. +func (s *statsServer) buildRuntime(cfg statsConfig) ([]func(), *http.Server) { + // rcrowley/go-metrics guards these registrations with a private sync.Once, + // so subsequent reloads are no-ops. + metrics.RegisterDebugGCStats(metrics.DefaultRegistry) + metrics.RegisterRuntimeMemStats(metrics.DefaultRegistry) + + captureFns := []func(){ + func() { metrics.CaptureDebugGCStatsOnce(metrics.DefaultRegistry) }, + func() { metrics.CaptureRuntimeMemStatsOnce(metrics.DefaultRegistry) }, + } + + switch cfg.typ { + case "graphite": + // loadStatsConfig already resolved and validated the address; re-parse + // the resolved form (no DNS lookup) to get a *net.TCPAddr. + addr, _ := net.ResolveTCPAddr(cfg.graphite.protocol, cfg.graphite.resolvedAddr) + gcfg := graphite.Config{ + Addr: addr, + Registry: metrics.DefaultRegistry, + FlushInterval: cfg.interval, + DurationUnit: time.Nanosecond, + Prefix: cfg.graphite.prefix, + Percentiles: []float64{0.5, 0.75, 0.95, 0.99, 0.999}, + } + captureFns = append(captureFns, func() { + if err := graphite.Once(gcfg); err != nil { + s.l.Error("Graphite export failed", "error", err) + } + }) + s.l.Info("Starting graphite stats", + "interval", cfg.interval, + "prefix", cfg.graphite.prefix, + "addr", addr, + ) + return captureFns, nil + + case "prometheus": + pr := prometheus.NewRegistry() + pClient := mp.NewPrometheusProvider(metrics.DefaultRegistry, cfg.prom.namespace, cfg.prom.subsystem, pr, cfg.interval) + captureFns = append(captureFns, func() { + if err := pClient.UpdatePrometheusMetricsOnce(); err != nil { + s.l.Error("Prometheus metrics update failed", "error", err) + } + }) + + g := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: cfg.prom.namespace, + Subsystem: cfg.prom.subsystem, + Name: "info", + Help: "Version information for the Nebula binary", + ConstLabels: prometheus.Labels{ + "version": s.buildVersion, + "goversion": runtime.Version(), + "boringcrypto": strconv.FormatBool(boringEnabled()), + }, + }) + pr.MustRegister(g) + g.Set(1) + + // promhttp.HandlerOpts.ErrorLog needs a stdlib-shaped Println logger, + // so bridge our slog.Logger back to a *log.Logger that emits at Error. + errLog := slog.NewLogLogger(s.l.Handler(), slog.LevelError) + mux := http.NewServeMux() + mux.Handle(cfg.prom.path, promhttp.HandlerFor(pr, promhttp.HandlerOpts{ErrorLog: errLog})) + return captureFns, &http.Server{Addr: cfg.prom.listen, Handler: mux} + } + return captureFns, nil +} + +// captureStatsLoop runs each fn on every tick of d until ctx is cancelled. +func captureStatsLoop(ctx context.Context, d time.Duration, fns []func()) { + t := time.NewTicker(d) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + for _, fn := range fns { + fn() + } + } + } +} + +func loadStatsConfig(c *config.C) (statsConfig, error) { + cfg := statsConfig{ + typ: c.GetString("stats.type", ""), + } + if cfg.typ == "" || cfg.typ == "none" { + return cfg, nil + } + + cfg.interval = c.GetDuration("stats.interval", 0) + if cfg.interval == 0 { + return cfg, fmt.Errorf("stats.interval was an invalid duration: %s", c.GetString("stats.interval", "")) + } + + switch cfg.typ { + case "graphite": + cfg.graphite.protocol = c.GetString("stats.protocol", "tcp") + cfg.graphite.host = c.GetString("stats.host", "") + if cfg.graphite.host == "" { + return cfg, errors.New("stats.host can not be empty") + } + addr, err := net.ResolveTCPAddr(cfg.graphite.protocol, cfg.graphite.host) + if err != nil { + return cfg, fmt.Errorf("error while setting up graphite sink: %s", err) + } + cfg.graphite.resolvedAddr = addr.String() + cfg.graphite.prefix = c.GetString("stats.prefix", "nebula") + case "prometheus": + cfg.prom.listen = c.GetString("stats.listen", "") + if cfg.prom.listen == "" { + return cfg, errors.New("stats.listen should not be empty") + } + cfg.prom.path = c.GetString("stats.path", "") + if cfg.prom.path == "" { + return cfg, errors.New("stats.path should not be empty") + } + cfg.prom.namespace = c.GetString("stats.namespace", "") + cfg.prom.subsystem = c.GetString("stats.subsystem", "") + default: + return cfg, fmt.Errorf("stats.type was not understood: %s", cfg.typ) + } + + return cfg, nil } diff --git a/stats_test.go b/stats_test.go new file mode 100644 index 00000000..20b17c0e --- /dev/null +++ b/stats_test.go @@ -0,0 +1,410 @@ +package nebula + +import ( + "context" + "io" + "log/slog" + "net" + "strconv" + "testing" + "time" + + "github.com/slackhq/nebula/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newTestStatsServer(t *testing.T) (*statsServer, *config.C) { + t.Helper() + l := slog.New(slog.DiscardHandler) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + return &statsServer{ + l: l, + ctx: ctx, + }, config.NewC(l) +} + +func setStatsConfig(c *config.C, m map[string]any) { + c.Settings["stats"] = m +} + +func currentRuntime(s *statsServer) *statsRuntime { + s.runMu.Lock() + defer s.runMu.Unlock() + return s.run +} + +func TestStatsServer_reload_initial_disabled(t *testing.T) { + s, c := newTestStatsServer(t) + setStatsConfig(c, map[string]any{"type": "none"}) + + require.NoError(t, s.reload(c, true)) + assert.False(t, s.enabled.Load()) + assert.Nil(t, currentRuntime(s)) +} + +func TestStatsServer_reload_initial_invalidInterval(t *testing.T) { + s, c := newTestStatsServer(t) + setStatsConfig(c, map[string]any{ + "type": "graphite", + "host": "127.0.0.1:0", + "prefix": "test", + }) + + err := s.reload(c, true) + require.Error(t, err) + assert.False(t, s.enabled.Load()) +} + +func TestStatsServer_reload_initial_unknownType(t *testing.T) { + s, c := newTestStatsServer(t) + setStatsConfig(c, map[string]any{ + "type": "carbon", + "interval": "1s", + }) + + err := s.reload(c, true) + require.Error(t, err) + assert.False(t, s.enabled.Load()) +} + +func TestStatsServer_reload_unchanged_noOp(t *testing.T) { + s, c := newTestStatsServer(t) + setStatsConfig(c, map[string]any{"type": "none"}) + + require.NoError(t, s.reload(c, true)) + require.NoError(t, s.reload(c, false)) + assert.False(t, s.enabled.Load()) +} + +func TestStatsServer_reload_initial_graphite(t *testing.T) { + s, c := newTestStatsServer(t) + setStatsConfig(c, map[string]any{ + "type": "graphite", + "interval": "1s", + "protocol": "tcp", + "host": "127.0.0.1:2003", + "prefix": "test", + }) + + require.NoError(t, s.reload(c, true)) + assert.True(t, s.enabled.Load()) + // reload only records config; Start builds the runtime. + assert.Nil(t, currentRuntime(s)) +} + +func TestStatsServer_reload_initial_prometheus(t *testing.T) { + s, c := newTestStatsServer(t) + setStatsConfig(c, map[string]any{ + "type": "prometheus", + "interval": "1s", + "listen": "127.0.0.1:0", + "path": "/metrics", + }) + + require.NoError(t, s.reload(c, true)) + assert.True(t, s.enabled.Load()) + // reload only records config; Start builds the runtime. + assert.Nil(t, currentRuntime(s)) +} + +func TestStatsServer_Start_graphite_blocksUntilStop(t *testing.T) { + sink := newGraphiteSink(t) + defer sink.Close() + + s, c := newTestStatsServer(t) + setStatsConfig(c, map[string]any{ + "type": "graphite", + "interval": "1s", + "protocol": "tcp", + "host": sink.Addr(), + "prefix": "test", + }) + require.NoError(t, s.reload(c, true)) + + done := make(chan struct{}) + go func() { + s.Start() + close(done) + }() + + // Wait for Start to publish runtime state. + waitFor(t, func() bool { return currentRuntime(s) != nil }) + rt := currentRuntime(s) + require.NotNil(t, rt) + assert.Nil(t, rt.listener, "graphite has no listener") + + s.Stop() + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("graphite Start did not return after Stop") + } + assert.Nil(t, currentRuntime(s)) +} + +func TestStatsServer_StartStop_lifecycle(t *testing.T) { + port := freeTCPPort(t) + s, c := newTestStatsServer(t) + setStatsConfig(c, map[string]any{ + "type": "prometheus", + "interval": "1s", + "listen": "127.0.0.1:" + port, + "path": "/metrics", + }) + require.NoError(t, s.reload(c, true)) + + done := make(chan struct{}) + go func() { + s.Start() + close(done) + }() + + waitForListening(t, "127.0.0.1:"+port) + rt := currentRuntime(s) + require.NotNil(t, rt) + require.NotNil(t, rt.listener) + + s.Stop() + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("Start did not return after Stop") + } + assert.Nil(t, currentRuntime(s)) +} + +func TestStatsServer_reload_disable_stopsRunningRuntime(t *testing.T) { + port := freeTCPPort(t) + s, c := newTestStatsServer(t) + setStatsConfig(c, map[string]any{ + "type": "prometheus", + "interval": "1s", + "listen": "127.0.0.1:" + port, + "path": "/metrics", + }) + require.NoError(t, s.reload(c, true)) + + done := make(chan struct{}) + go func() { + s.Start() + close(done) + }() + waitForListening(t, "127.0.0.1:"+port) + + setStatsConfig(c, map[string]any{"type": "none"}) + require.NoError(t, s.reload(c, false)) + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("Start did not return after reload disabled stats") + } + assert.False(t, s.enabled.Load()) + assert.Nil(t, currentRuntime(s)) +} + +func TestStatsServer_reload_changeListener_restartsListener(t *testing.T) { + port1 := freeTCPPort(t) + s, c := newTestStatsServer(t) + setStatsConfig(c, map[string]any{ + "type": "prometheus", + "interval": "1s", + "listen": "127.0.0.1:" + port1, + "path": "/metrics", + }) + require.NoError(t, s.reload(c, true)) + + firstDone := make(chan struct{}) + go func() { + s.Start() + close(firstDone) + }() + waitForListening(t, "127.0.0.1:"+port1) + first := currentRuntime(s) + require.NotNil(t, first) + + port2 := freeTCPPort(t) + setStatsConfig(c, map[string]any{ + "type": "prometheus", + "interval": "1s", + "listen": "127.0.0.1:" + port2, + "path": "/metrics", + }) + require.NoError(t, s.reload(c, false)) + + select { + case <-firstDone: + case <-time.After(5 * time.Second): + t.Fatal("old Start did not return after reload") + } + + waitForListening(t, "127.0.0.1:"+port2) + second := currentRuntime(s) + require.NotNil(t, second) + assert.NotSame(t, first, second, "expected a new runtime after listen address change") + + s.Stop() +} + +func TestStatsServer_Stop_beforeStart_doesNotBlock(t *testing.T) { + s, c := newTestStatsServer(t) + setStatsConfig(c, map[string]any{ + "type": "prometheus", + "interval": "1s", + "listen": "127.0.0.1:0", + "path": "/metrics", + }) + require.NoError(t, s.reload(c, true)) + + stopped := make(chan struct{}) + go func() { + s.Stop() + close(stopped) + }() + select { + case <-stopped: + case <-time.After(time.Second): + t.Fatal("Stop hung with no runtime started") + } +} + +func TestStatsServer_configTest_validatesWithoutSpawning(t *testing.T) { + s, c := newTestStatsServer(t) + s.configTest = true + setStatsConfig(c, map[string]any{ + "type": "prometheus", + "interval": "1s", + "listen": "127.0.0.1:0", + "path": "/metrics", + }) + + require.NoError(t, s.reload(c, true)) + s.Start() + assert.Nil(t, currentRuntime(s)) +} + +func TestStatsServer_ctxCancel_unblocksStart(t *testing.T) { + // Ensures ctx cancellation alone (no explicit Stop) tears down both + // graphite and prom Start invocations. + port := freeTCPPort(t) + l := slog.New(slog.DiscardHandler) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s := &statsServer{l: l, ctx: ctx} + c := config.NewC(l) + setStatsConfig(c, map[string]any{ + "type": "prometheus", + "interval": "1s", + "listen": "127.0.0.1:" + port, + "path": "/metrics", + }) + require.NoError(t, s.reload(c, true)) + + done := make(chan struct{}) + go func() { + s.Start() + close(done) + }() + waitForListening(t, "127.0.0.1:"+port) + + cancel() + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("Start did not return after ctx cancel") + } +} + +func TestStatsServer_listenerBindFailure_sameCfgReloadRetries(t *testing.T) { + // Hold the port so ListenAndServe will fail on first Start. + blocker, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + port := strconv.Itoa(blocker.Addr().(*net.TCPAddr).Port) + + s, c := newTestStatsServer(t) + setStatsConfig(c, map[string]any{ + "type": "prometheus", + "interval": "1s", + "listen": "127.0.0.1:" + port, + "path": "/metrics", + }) + require.NoError(t, s.reload(c, true)) + + done := make(chan struct{}) + go func() { + s.Start() + close(done) + }() + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("Start did not return after bind failure") + } + // Bind failure should have dropped the cached config so a same-cfg + // SIGHUP can retry. + s.runMu.Lock() + cfgAfterFailure := s.runCfg + s.runMu.Unlock() + assert.Nil(t, cfgAfterFailure) + + // Free the port and reload with the same config; Start should fire again. + require.NoError(t, blocker.Close()) + require.NoError(t, s.reload(c, false)) + + waitForListening(t, "127.0.0.1:"+port) + require.NotNil(t, currentRuntime(s)) + + s.Stop() +} + +func waitForListening(t *testing.T, addr string) { + t.Helper() + waitFor(t, func() bool { + conn, err := net.DialTimeout("tcp", addr, 200*time.Millisecond) + if err != nil { + return false + } + _ = conn.Close() + return true + }) +} + +// graphiteSink is a minimal TCP accept-and-discard server so graphite.Once +// calls in tests don't spam error logs or wedge on connection refused. +type graphiteSink struct { + ln net.Listener +} + +func newGraphiteSink(t *testing.T) *graphiteSink { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + g := &graphiteSink{ln: ln} + go func() { + for { + conn, err := ln.Accept() + if err != nil { + return + } + go func(c net.Conn) { + _, _ = io.Copy(io.Discard, c) + _ = c.Close() + }(conn) + } + }() + return g +} + +func (g *graphiteSink) Addr() string { return g.ln.Addr().String() } +func (g *graphiteSink) Close() { _ = g.ln.Close() } + +func freeTCPPort(t *testing.T) string { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + port := ln.Addr().(*net.TCPAddr).Port + require.NoError(t, ln.Close()) + return strconv.Itoa(port) +}