mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-15 20:37:36 +02:00
Make stats a server we can reconfigure and start/stop (#1670)
Some checks failed
gofmt / Run gofmt (push) Failing after 2s
smoke-extra / Run extra smoke tests (push) Failing after 2s
smoke / Run multi node smoke test (push) Failing after 3s
Build and test / Build all and test on ubuntu-linux (push) Failing after 2s
Build and test / Build and test on linux with boringcrypto (push) Failing after 3s
Build and test / Build and test on linux with pkcs11 (push) Failing after 2s
Build and test / Build and test on macos-latest (push) Has been cancelled
Build and test / Build and test on windows-latest (push) Has been cancelled
Some checks failed
gofmt / Run gofmt (push) Failing after 2s
smoke-extra / Run extra smoke tests (push) Failing after 2s
smoke / Run multi node smoke test (push) Failing after 3s
Build and test / Build all and test on ubuntu-linux (push) Failing after 2s
Build and test / Build and test on linux with boringcrypto (push) Failing after 3s
Build and test / Build and test on linux with pkcs11 (push) Failing after 2s
Build and test / Build and test on macos-latest (push) Has been cancelled
Build and test / Build and test on windows-latest (push) Has been cancelled
This commit is contained in:
@@ -304,6 +304,9 @@ logging:
|
|||||||
#disable_timestamp: true
|
#disable_timestamp: true
|
||||||
# Timestamps use RFC3339Nano ("2006-01-02T15:04:05.999999999Z07:00") and are not configurable.
|
# Timestamps use RFC3339Nano ("2006-01-02T15:04:05.999999999Z07:00") and are not configurable.
|
||||||
|
|
||||||
|
# The stats section is reloadable. A HUP may change the backend, toggle stats
|
||||||
|
# on or off, switch the listen/host address, or pick up new DNS for the
|
||||||
|
# configured graphite host.
|
||||||
#stats:
|
#stats:
|
||||||
#type: graphite
|
#type: graphite
|
||||||
#prefix: nebula
|
#prefix: nebula
|
||||||
@@ -321,10 +324,12 @@ logging:
|
|||||||
# enables counter metrics for meta packets
|
# enables counter metrics for meta packets
|
||||||
# e.g.: `messages.tx.handshake`
|
# e.g.: `messages.tx.handshake`
|
||||||
# NOTE: `message.{tx,rx}.recv_error` is always emitted
|
# NOTE: `message.{tx,rx}.recv_error` is always emitted
|
||||||
|
# Not reloadable.
|
||||||
#message_metrics: false
|
#message_metrics: false
|
||||||
|
|
||||||
# enables detailed counter metrics for lighthouse packets
|
# enables detailed counter metrics for lighthouse packets
|
||||||
# e.g.: `lighthouse.rx.HostQuery`
|
# e.g.: `lighthouse.rx.HostQuery`
|
||||||
|
# Not reloadable.
|
||||||
#lighthouse_metrics: false
|
#lighthouse_metrics: false
|
||||||
|
|
||||||
# Handshake Manager Settings
|
# Handshake Manager Settings
|
||||||
|
|||||||
4
main.go
4
main.go
@@ -246,7 +246,7 @@ func Main(c *config.C, configTest bool, buildVersion string, l *slog.Logger, dev
|
|||||||
go handshakeManager.Run(ctx)
|
go handshakeManager.Run(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
statsStart, err := startStats(l, c, buildVersion, configTest)
|
stats, err := newStatsServerFromConfig(ctx, l, c, buildVersion, configTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, util.ContextualizeIfNeeded("Failed to start stats emitter", err)
|
return nil, util.ContextualizeIfNeeded("Failed to start stats emitter", err)
|
||||||
}
|
}
|
||||||
@@ -266,7 +266,7 @@ func Main(c *config.C, configTest bool, buildVersion string, l *slog.Logger, dev
|
|||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
sshStart: sshStart,
|
sshStart: sshStart,
|
||||||
statsStart: statsStart,
|
statsStart: stats.Start,
|
||||||
dnsStart: ds.Start,
|
dnsStart: ds.Start,
|
||||||
lighthouseStart: lightHouse.StartUpdateWorker,
|
lighthouseStart: lightHouse.StartUpdateWorker,
|
||||||
connectionManagerStart: connManager.Start,
|
connectionManagerStart: connManager.Start,
|
||||||
|
|||||||
424
stats.go
424
stats.go
@@ -1,14 +1,16 @@
|
|||||||
package nebula
|
package nebula
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
graphite "github.com/cyberdelia/go-metrics-graphite"
|
graphite "github.com/cyberdelia/go-metrics-graphite"
|
||||||
@@ -19,119 +21,347 @@ import (
|
|||||||
"github.com/slackhq/nebula/config"
|
"github.com/slackhq/nebula/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
// startStats initializes stats from config. On success, if any further work
|
// statsServer owns nebula's stats subsystem: the periodic metric capture
|
||||||
// is needed to serve stats, it returns a func to handle that work. If no
|
// goroutine and (for prometheus) an HTTP listener. It mirrors the lifecycle
|
||||||
// work is needed, it'll return nil. On failure, it returns nil, error.
|
// shape of dnsServer: constructor wires the reload callback, reload records
|
||||||
func startStats(l *slog.Logger, c *config.C, buildVersion string, configTest bool) (func(), error) {
|
// config, Start builds and runs the runtime, Stop tears it down.
|
||||||
mType := c.GetString("stats.type", "")
|
type statsServer struct {
|
||||||
if mType == "" || mType == "none" {
|
l *slog.Logger
|
||||||
return nil, nil
|
ctx context.Context
|
||||||
}
|
buildVersion string
|
||||||
|
configTest bool
|
||||||
|
|
||||||
interval := c.GetDuration("stats.interval", 0)
|
// enabled mirrors "stats configured to a real backend". Start consults
|
||||||
if interval == 0 {
|
// it so callers don't need to know the gating rules.
|
||||||
return nil, fmt.Errorf("stats.interval was an invalid duration: %s", c.GetString("stats.interval", ""))
|
enabled atomic.Bool
|
||||||
}
|
|
||||||
|
|
||||||
var startFn func()
|
runMu sync.Mutex
|
||||||
switch mType {
|
runCfg *statsConfig
|
||||||
case "graphite":
|
run *statsRuntime // non-nil while a runtime is live
|
||||||
err := startGraphiteStats(l, interval, c, configTest)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
case "prometheus":
|
|
||||||
var err error
|
|
||||||
startFn, err = startPrometheusStats(l, interval, c, buildVersion, configTest)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("stats.type was not understood: %s", mType)
|
|
||||||
}
|
|
||||||
|
|
||||||
metrics.RegisterDebugGCStats(metrics.DefaultRegistry)
|
|
||||||
metrics.RegisterRuntimeMemStats(metrics.DefaultRegistry)
|
|
||||||
|
|
||||||
go metrics.CaptureDebugGCStats(metrics.DefaultRegistry, interval)
|
|
||||||
go metrics.CaptureRuntimeMemStats(metrics.DefaultRegistry, interval)
|
|
||||||
|
|
||||||
return startFn, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func startGraphiteStats(l *slog.Logger, i time.Duration, c *config.C, configTest bool) error {
|
// statsRuntime is the live state owned by a single Start invocation. Start
|
||||||
proto := c.GetString("stats.protocol", "tcp")
|
// stashes a pointer under runMu; Stop and Start's own exit path use pointer
|
||||||
host := c.GetString("stats.host", "")
|
// equality to tell "my runtime" apart from one that replaced it after a
|
||||||
if host == "" {
|
// reload.
|
||||||
return errors.New("stats.host can not be empty")
|
type statsRuntime struct {
|
||||||
|
cancel context.CancelFunc
|
||||||
|
listener *http.Server // nil for graphite
|
||||||
|
}
|
||||||
|
|
||||||
|
// statsConfig is the snapshot of stats-related config that drives the runtime.
|
||||||
|
// It is comparable with == so reload can detect "no change" cheaply.
|
||||||
|
type statsConfig struct {
|
||||||
|
typ string
|
||||||
|
interval time.Duration
|
||||||
|
graphite graphiteConfig
|
||||||
|
prom promConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
type graphiteConfig struct {
|
||||||
|
protocol string
|
||||||
|
host string
|
||||||
|
// resolvedAddr is the string form of host resolved at config-load time.
|
||||||
|
// Including it in the struct means a SIGHUP picks up DNS changes even
|
||||||
|
// when stats.host hasn't been edited.
|
||||||
|
resolvedAddr string
|
||||||
|
prefix string
|
||||||
|
}
|
||||||
|
|
||||||
|
type promConfig struct {
|
||||||
|
listen string
|
||||||
|
path string
|
||||||
|
namespace string
|
||||||
|
subsystem string
|
||||||
|
}
|
||||||
|
|
||||||
|
// newStatsServerFromConfig builds a statsServer, applies the initial config,
|
||||||
|
// and registers a reload callback. The reload callback is registered before
|
||||||
|
// the initial config is applied so a SIGHUP can later enable, fix, or disable
|
||||||
|
// stats even if the initial application failed.
|
||||||
|
//
|
||||||
|
// Start is safe to call unconditionally: it no-ops when stats are disabled.
|
||||||
|
// The returned pointer is always non-nil, even on error.
|
||||||
|
func newStatsServerFromConfig(ctx context.Context, l *slog.Logger, c *config.C, buildVersion string, configTest bool) (*statsServer, error) {
|
||||||
|
s := &statsServer{
|
||||||
|
l: l,
|
||||||
|
ctx: ctx,
|
||||||
|
buildVersion: buildVersion,
|
||||||
|
configTest: configTest,
|
||||||
}
|
}
|
||||||
|
|
||||||
prefix := c.GetString("stats.prefix", "nebula")
|
c.RegisterReloadCallback(func(c *config.C) {
|
||||||
addr, err := net.ResolveTCPAddr(proto, host)
|
if err := s.reload(c, false); err != nil {
|
||||||
|
s.l.Error("Failed to reload stats from config", "error", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := s.reload(c, true); err != nil {
|
||||||
|
return s, err
|
||||||
|
}
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// reload records the latest config. On the initial call it only records it;
|
||||||
|
// Control.Start is what launches the first runtime via statsStart. On later
|
||||||
|
// calls it reconciles the running runtime with the new config:
|
||||||
|
//
|
||||||
|
// - newly enabled -> spawn Start
|
||||||
|
// - newly disabled -> Stop the runtime
|
||||||
|
// - config changed (still enabled) -> Stop the old, Start the new
|
||||||
|
// - no change -> no-op
|
||||||
|
func (s *statsServer) reload(c *config.C, initial bool) error {
|
||||||
|
newCfg, err := loadStatsConfig(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error while setting up graphite sink: %s", err)
|
return err
|
||||||
|
}
|
||||||
|
enabled := newCfg.typ != "" && newCfg.typ != "none"
|
||||||
|
|
||||||
|
s.runMu.Lock()
|
||||||
|
sameCfg := s.runCfg != nil && *s.runCfg == newCfg
|
||||||
|
s.runCfg = &newCfg
|
||||||
|
running := s.run != nil
|
||||||
|
s.runMu.Unlock()
|
||||||
|
|
||||||
|
s.enabled.Store(enabled)
|
||||||
|
|
||||||
|
if initial || sameCfg {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if !configTest {
|
if running {
|
||||||
l.Info("Starting graphite",
|
s.Stop()
|
||||||
"interval", i,
|
}
|
||||||
"prefix", prefix,
|
if enabled && !s.configTest {
|
||||||
"addr", addr.String(),
|
go s.Start()
|
||||||
)
|
|
||||||
go graphite.Graphite(metrics.DefaultRegistry, i, prefix, addr)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func startPrometheusStats(l *slog.Logger, i time.Duration, c *config.C, buildVersion string, configTest bool) (func(), error) {
|
// Start builds the runtime from the latest config, spawns the capture loop,
|
||||||
namespace := c.GetString("stats.namespace", "")
|
// and blocks until Stop is called or ctx fires. For prometheus it also serves
|
||||||
subsystem := c.GetString("stats.subsystem", "")
|
// the HTTP listener. For graphite it blocks on the capture loop's context.
|
||||||
|
// Safe to call when stats are disabled or already running (both no-op).
|
||||||
listen := c.GetString("stats.listen", "")
|
func (s *statsServer) Start() {
|
||||||
if listen == "" {
|
if !s.enabled.Load() || s.configTest {
|
||||||
return nil, fmt.Errorf("stats.listen should not be empty")
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
path := c.GetString("stats.path", "")
|
s.runMu.Lock()
|
||||||
if path == "" {
|
if s.ctx.Err() != nil || s.run != nil || s.runCfg == nil {
|
||||||
return nil, fmt.Errorf("stats.path should not be empty")
|
s.runMu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cfg := *s.runCfg
|
||||||
|
captureFns, listener := s.buildRuntime(cfg)
|
||||||
|
runCtx, cancel := context.WithCancel(s.ctx)
|
||||||
|
rt := &statsRuntime{cancel: cancel, listener: listener}
|
||||||
|
s.run = rt
|
||||||
|
s.runMu.Unlock()
|
||||||
|
|
||||||
|
go captureStatsLoop(runCtx, cfg.interval, captureFns)
|
||||||
|
|
||||||
|
cleanExit := true
|
||||||
|
if listener == nil {
|
||||||
|
// Graphite: no HTTP listener to serve; block until teardown.
|
||||||
|
<-runCtx.Done()
|
||||||
|
} else {
|
||||||
|
cleanExit = s.serveListener(listener)
|
||||||
}
|
}
|
||||||
|
|
||||||
pr := prometheus.NewRegistry()
|
// Clear our runtime only if nothing has replaced it. Stop races through
|
||||||
pClient := mp.NewPrometheusProvider(metrics.DefaultRegistry, namespace, subsystem, pr, i)
|
// here too but leaves s.run == nil, so the pointer check skips.
|
||||||
if !configTest {
|
s.runMu.Lock()
|
||||||
go pClient.UpdatePrometheusMetrics()
|
if s.run == rt {
|
||||||
}
|
rt.cancel()
|
||||||
|
s.run = nil
|
||||||
// Export our version information as labels on a static gauge
|
// A listener that exited with an error (e.g., bind conflict) leaves
|
||||||
g := prometheus.NewGauge(prometheus.GaugeOpts{
|
// runCfg cached as if it were applied. Drop it so a SIGHUP with the
|
||||||
Namespace: namespace,
|
// same config re-triggers Start once the user fixes the underlying
|
||||||
Subsystem: subsystem,
|
// problem.
|
||||||
Name: "info",
|
if !cleanExit {
|
||||||
Help: "Version information for the Nebula binary",
|
s.runCfg = nil
|
||||||
ConstLabels: prometheus.Labels{
|
|
||||||
"version": buildVersion,
|
|
||||||
"goversion": runtime.Version(),
|
|
||||||
"boringcrypto": strconv.FormatBool(boringEnabled()),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
pr.MustRegister(g)
|
|
||||||
g.Set(1)
|
|
||||||
|
|
||||||
var startFn func()
|
|
||||||
if !configTest {
|
|
||||||
// promhttp.HandlerOpts.ErrorLog needs a stdlib-shaped Println logger,
|
|
||||||
// so bridge our slog.Logger back to a *log.Logger that emits at Error.
|
|
||||||
errLog := slog.NewLogLogger(l.Handler(), slog.LevelError)
|
|
||||||
startFn = func() {
|
|
||||||
l.Info("Prometheus stats listening",
|
|
||||||
"listen", listen,
|
|
||||||
"path", path,
|
|
||||||
)
|
|
||||||
http.Handle(path, promhttp.HandlerFor(pr, promhttp.HandlerOpts{ErrorLog: errLog}))
|
|
||||||
log.Fatal(http.ListenAndServe(listen, nil))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
s.runMu.Unlock()
|
||||||
return startFn, nil
|
}
|
||||||
|
|
||||||
|
// serveListener runs ListenAndServe and ensures ctx cancellation unblocks it.
|
||||||
|
// Returns true if the listener exited cleanly (Stop, ctx cancellation, or any
|
||||||
|
// other http.ErrServerClosed path), false on an unexpected error.
|
||||||
|
func (s *statsServer) serveListener(listener *http.Server) bool {
|
||||||
|
// Per-invocation watcher: ctx cancellation triggers a listener shutdown
|
||||||
|
// which in turn unblocks ListenAndServe. Closing `done` on exit keeps
|
||||||
|
// the watcher from outliving this call.
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := listener.Shutdown(shutdownCtx); err != nil {
|
||||||
|
s.l.Warn("Failed to shut down prometheus stats listener", "error", err)
|
||||||
|
}
|
||||||
|
case <-done:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
defer close(done)
|
||||||
|
|
||||||
|
s.l.Info("Starting prometheus stats listener", "addr", listener.Addr)
|
||||||
|
err := listener.ListenAndServe()
|
||||||
|
if err == nil || errors.Is(err, http.ErrServerClosed) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
s.l.Error("Prometheus stats listener exited", "error", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop tears down the active runtime, if any. Idempotent.
|
||||||
|
func (s *statsServer) Stop() {
|
||||||
|
s.runMu.Lock()
|
||||||
|
rt := s.run
|
||||||
|
s.run = nil
|
||||||
|
s.runMu.Unlock()
|
||||||
|
if rt == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rt.cancel()
|
||||||
|
if rt.listener != nil {
|
||||||
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
if err := rt.listener.Shutdown(shutdownCtx); err != nil {
|
||||||
|
s.l.Warn("Failed to shut down prometheus stats listener", "error", err)
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildRuntime produces the capture functions and, for prometheus, an un-served
|
||||||
|
// http.Server from cfg. cfg has already been validated by loadStatsConfig.
|
||||||
|
func (s *statsServer) buildRuntime(cfg statsConfig) ([]func(), *http.Server) {
|
||||||
|
// rcrowley/go-metrics guards these registrations with a private sync.Once,
|
||||||
|
// so subsequent reloads are no-ops.
|
||||||
|
metrics.RegisterDebugGCStats(metrics.DefaultRegistry)
|
||||||
|
metrics.RegisterRuntimeMemStats(metrics.DefaultRegistry)
|
||||||
|
|
||||||
|
captureFns := []func(){
|
||||||
|
func() { metrics.CaptureDebugGCStatsOnce(metrics.DefaultRegistry) },
|
||||||
|
func() { metrics.CaptureRuntimeMemStatsOnce(metrics.DefaultRegistry) },
|
||||||
|
}
|
||||||
|
|
||||||
|
switch cfg.typ {
|
||||||
|
case "graphite":
|
||||||
|
// loadStatsConfig already resolved and validated the address; re-parse
|
||||||
|
// the resolved form (no DNS lookup) to get a *net.TCPAddr.
|
||||||
|
addr, _ := net.ResolveTCPAddr(cfg.graphite.protocol, cfg.graphite.resolvedAddr)
|
||||||
|
gcfg := graphite.Config{
|
||||||
|
Addr: addr,
|
||||||
|
Registry: metrics.DefaultRegistry,
|
||||||
|
FlushInterval: cfg.interval,
|
||||||
|
DurationUnit: time.Nanosecond,
|
||||||
|
Prefix: cfg.graphite.prefix,
|
||||||
|
Percentiles: []float64{0.5, 0.75, 0.95, 0.99, 0.999},
|
||||||
|
}
|
||||||
|
captureFns = append(captureFns, func() {
|
||||||
|
if err := graphite.Once(gcfg); err != nil {
|
||||||
|
s.l.Error("Graphite export failed", "error", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
s.l.Info("Starting graphite stats",
|
||||||
|
"interval", cfg.interval,
|
||||||
|
"prefix", cfg.graphite.prefix,
|
||||||
|
"addr", addr,
|
||||||
|
)
|
||||||
|
return captureFns, nil
|
||||||
|
|
||||||
|
case "prometheus":
|
||||||
|
pr := prometheus.NewRegistry()
|
||||||
|
pClient := mp.NewPrometheusProvider(metrics.DefaultRegistry, cfg.prom.namespace, cfg.prom.subsystem, pr, cfg.interval)
|
||||||
|
captureFns = append(captureFns, func() {
|
||||||
|
if err := pClient.UpdatePrometheusMetricsOnce(); err != nil {
|
||||||
|
s.l.Error("Prometheus metrics update failed", "error", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
g := prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Namespace: cfg.prom.namespace,
|
||||||
|
Subsystem: cfg.prom.subsystem,
|
||||||
|
Name: "info",
|
||||||
|
Help: "Version information for the Nebula binary",
|
||||||
|
ConstLabels: prometheus.Labels{
|
||||||
|
"version": s.buildVersion,
|
||||||
|
"goversion": runtime.Version(),
|
||||||
|
"boringcrypto": strconv.FormatBool(boringEnabled()),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
pr.MustRegister(g)
|
||||||
|
g.Set(1)
|
||||||
|
|
||||||
|
// promhttp.HandlerOpts.ErrorLog needs a stdlib-shaped Println logger,
|
||||||
|
// so bridge our slog.Logger back to a *log.Logger that emits at Error.
|
||||||
|
errLog := slog.NewLogLogger(s.l.Handler(), slog.LevelError)
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
mux.Handle(cfg.prom.path, promhttp.HandlerFor(pr, promhttp.HandlerOpts{ErrorLog: errLog}))
|
||||||
|
return captureFns, &http.Server{Addr: cfg.prom.listen, Handler: mux}
|
||||||
|
}
|
||||||
|
return captureFns, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// captureStatsLoop runs each fn on every tick of d until ctx is cancelled.
|
||||||
|
func captureStatsLoop(ctx context.Context, d time.Duration, fns []func()) {
|
||||||
|
t := time.NewTicker(d)
|
||||||
|
defer t.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-t.C:
|
||||||
|
for _, fn := range fns {
|
||||||
|
fn()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadStatsConfig(c *config.C) (statsConfig, error) {
|
||||||
|
cfg := statsConfig{
|
||||||
|
typ: c.GetString("stats.type", ""),
|
||||||
|
}
|
||||||
|
if cfg.typ == "" || cfg.typ == "none" {
|
||||||
|
return cfg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.interval = c.GetDuration("stats.interval", 0)
|
||||||
|
if cfg.interval == 0 {
|
||||||
|
return cfg, fmt.Errorf("stats.interval was an invalid duration: %s", c.GetString("stats.interval", ""))
|
||||||
|
}
|
||||||
|
|
||||||
|
switch cfg.typ {
|
||||||
|
case "graphite":
|
||||||
|
cfg.graphite.protocol = c.GetString("stats.protocol", "tcp")
|
||||||
|
cfg.graphite.host = c.GetString("stats.host", "")
|
||||||
|
if cfg.graphite.host == "" {
|
||||||
|
return cfg, errors.New("stats.host can not be empty")
|
||||||
|
}
|
||||||
|
addr, err := net.ResolveTCPAddr(cfg.graphite.protocol, cfg.graphite.host)
|
||||||
|
if err != nil {
|
||||||
|
return cfg, fmt.Errorf("error while setting up graphite sink: %s", err)
|
||||||
|
}
|
||||||
|
cfg.graphite.resolvedAddr = addr.String()
|
||||||
|
cfg.graphite.prefix = c.GetString("stats.prefix", "nebula")
|
||||||
|
case "prometheus":
|
||||||
|
cfg.prom.listen = c.GetString("stats.listen", "")
|
||||||
|
if cfg.prom.listen == "" {
|
||||||
|
return cfg, errors.New("stats.listen should not be empty")
|
||||||
|
}
|
||||||
|
cfg.prom.path = c.GetString("stats.path", "")
|
||||||
|
if cfg.prom.path == "" {
|
||||||
|
return cfg, errors.New("stats.path should not be empty")
|
||||||
|
}
|
||||||
|
cfg.prom.namespace = c.GetString("stats.namespace", "")
|
||||||
|
cfg.prom.subsystem = c.GetString("stats.subsystem", "")
|
||||||
|
default:
|
||||||
|
return cfg, fmt.Errorf("stats.type was not understood: %s", cfg.typ)
|
||||||
|
}
|
||||||
|
|
||||||
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
|||||||
410
stats_test.go
Normal file
410
stats_test.go
Normal file
@@ -0,0 +1,410 @@
|
|||||||
|
package nebula
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"log/slog"
|
||||||
|
"net"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/slackhq/nebula/config"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newTestStatsServer(t *testing.T) (*statsServer, *config.C) {
|
||||||
|
t.Helper()
|
||||||
|
l := slog.New(slog.DiscardHandler)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
return &statsServer{
|
||||||
|
l: l,
|
||||||
|
ctx: ctx,
|
||||||
|
}, config.NewC(l)
|
||||||
|
}
|
||||||
|
|
||||||
|
func setStatsConfig(c *config.C, m map[string]any) {
|
||||||
|
c.Settings["stats"] = m
|
||||||
|
}
|
||||||
|
|
||||||
|
func currentRuntime(s *statsServer) *statsRuntime {
|
||||||
|
s.runMu.Lock()
|
||||||
|
defer s.runMu.Unlock()
|
||||||
|
return s.run
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsServer_reload_initial_disabled(t *testing.T) {
|
||||||
|
s, c := newTestStatsServer(t)
|
||||||
|
setStatsConfig(c, map[string]any{"type": "none"})
|
||||||
|
|
||||||
|
require.NoError(t, s.reload(c, true))
|
||||||
|
assert.False(t, s.enabled.Load())
|
||||||
|
assert.Nil(t, currentRuntime(s))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsServer_reload_initial_invalidInterval(t *testing.T) {
|
||||||
|
s, c := newTestStatsServer(t)
|
||||||
|
setStatsConfig(c, map[string]any{
|
||||||
|
"type": "graphite",
|
||||||
|
"host": "127.0.0.1:0",
|
||||||
|
"prefix": "test",
|
||||||
|
})
|
||||||
|
|
||||||
|
err := s.reload(c, true)
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.False(t, s.enabled.Load())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsServer_reload_initial_unknownType(t *testing.T) {
|
||||||
|
s, c := newTestStatsServer(t)
|
||||||
|
setStatsConfig(c, map[string]any{
|
||||||
|
"type": "carbon",
|
||||||
|
"interval": "1s",
|
||||||
|
})
|
||||||
|
|
||||||
|
err := s.reload(c, true)
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.False(t, s.enabled.Load())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsServer_reload_unchanged_noOp(t *testing.T) {
|
||||||
|
s, c := newTestStatsServer(t)
|
||||||
|
setStatsConfig(c, map[string]any{"type": "none"})
|
||||||
|
|
||||||
|
require.NoError(t, s.reload(c, true))
|
||||||
|
require.NoError(t, s.reload(c, false))
|
||||||
|
assert.False(t, s.enabled.Load())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsServer_reload_initial_graphite(t *testing.T) {
|
||||||
|
s, c := newTestStatsServer(t)
|
||||||
|
setStatsConfig(c, map[string]any{
|
||||||
|
"type": "graphite",
|
||||||
|
"interval": "1s",
|
||||||
|
"protocol": "tcp",
|
||||||
|
"host": "127.0.0.1:2003",
|
||||||
|
"prefix": "test",
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, s.reload(c, true))
|
||||||
|
assert.True(t, s.enabled.Load())
|
||||||
|
// reload only records config; Start builds the runtime.
|
||||||
|
assert.Nil(t, currentRuntime(s))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsServer_reload_initial_prometheus(t *testing.T) {
|
||||||
|
s, c := newTestStatsServer(t)
|
||||||
|
setStatsConfig(c, map[string]any{
|
||||||
|
"type": "prometheus",
|
||||||
|
"interval": "1s",
|
||||||
|
"listen": "127.0.0.1:0",
|
||||||
|
"path": "/metrics",
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, s.reload(c, true))
|
||||||
|
assert.True(t, s.enabled.Load())
|
||||||
|
// reload only records config; Start builds the runtime.
|
||||||
|
assert.Nil(t, currentRuntime(s))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsServer_Start_graphite_blocksUntilStop(t *testing.T) {
|
||||||
|
sink := newGraphiteSink(t)
|
||||||
|
defer sink.Close()
|
||||||
|
|
||||||
|
s, c := newTestStatsServer(t)
|
||||||
|
setStatsConfig(c, map[string]any{
|
||||||
|
"type": "graphite",
|
||||||
|
"interval": "1s",
|
||||||
|
"protocol": "tcp",
|
||||||
|
"host": sink.Addr(),
|
||||||
|
"prefix": "test",
|
||||||
|
})
|
||||||
|
require.NoError(t, s.reload(c, true))
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
s.Start()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for Start to publish runtime state.
|
||||||
|
waitFor(t, func() bool { return currentRuntime(s) != nil })
|
||||||
|
rt := currentRuntime(s)
|
||||||
|
require.NotNil(t, rt)
|
||||||
|
assert.Nil(t, rt.listener, "graphite has no listener")
|
||||||
|
|
||||||
|
s.Stop()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("graphite Start did not return after Stop")
|
||||||
|
}
|
||||||
|
assert.Nil(t, currentRuntime(s))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsServer_StartStop_lifecycle(t *testing.T) {
|
||||||
|
port := freeTCPPort(t)
|
||||||
|
s, c := newTestStatsServer(t)
|
||||||
|
setStatsConfig(c, map[string]any{
|
||||||
|
"type": "prometheus",
|
||||||
|
"interval": "1s",
|
||||||
|
"listen": "127.0.0.1:" + port,
|
||||||
|
"path": "/metrics",
|
||||||
|
})
|
||||||
|
require.NoError(t, s.reload(c, true))
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
s.Start()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
waitForListening(t, "127.0.0.1:"+port)
|
||||||
|
rt := currentRuntime(s)
|
||||||
|
require.NotNil(t, rt)
|
||||||
|
require.NotNil(t, rt.listener)
|
||||||
|
|
||||||
|
s.Stop()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("Start did not return after Stop")
|
||||||
|
}
|
||||||
|
assert.Nil(t, currentRuntime(s))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsServer_reload_disable_stopsRunningRuntime(t *testing.T) {
|
||||||
|
port := freeTCPPort(t)
|
||||||
|
s, c := newTestStatsServer(t)
|
||||||
|
setStatsConfig(c, map[string]any{
|
||||||
|
"type": "prometheus",
|
||||||
|
"interval": "1s",
|
||||||
|
"listen": "127.0.0.1:" + port,
|
||||||
|
"path": "/metrics",
|
||||||
|
})
|
||||||
|
require.NoError(t, s.reload(c, true))
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
s.Start()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
waitForListening(t, "127.0.0.1:"+port)
|
||||||
|
|
||||||
|
setStatsConfig(c, map[string]any{"type": "none"})
|
||||||
|
require.NoError(t, s.reload(c, false))
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("Start did not return after reload disabled stats")
|
||||||
|
}
|
||||||
|
assert.False(t, s.enabled.Load())
|
||||||
|
assert.Nil(t, currentRuntime(s))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsServer_reload_changeListener_restartsListener(t *testing.T) {
|
||||||
|
port1 := freeTCPPort(t)
|
||||||
|
s, c := newTestStatsServer(t)
|
||||||
|
setStatsConfig(c, map[string]any{
|
||||||
|
"type": "prometheus",
|
||||||
|
"interval": "1s",
|
||||||
|
"listen": "127.0.0.1:" + port1,
|
||||||
|
"path": "/metrics",
|
||||||
|
})
|
||||||
|
require.NoError(t, s.reload(c, true))
|
||||||
|
|
||||||
|
firstDone := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
s.Start()
|
||||||
|
close(firstDone)
|
||||||
|
}()
|
||||||
|
waitForListening(t, "127.0.0.1:"+port1)
|
||||||
|
first := currentRuntime(s)
|
||||||
|
require.NotNil(t, first)
|
||||||
|
|
||||||
|
port2 := freeTCPPort(t)
|
||||||
|
setStatsConfig(c, map[string]any{
|
||||||
|
"type": "prometheus",
|
||||||
|
"interval": "1s",
|
||||||
|
"listen": "127.0.0.1:" + port2,
|
||||||
|
"path": "/metrics",
|
||||||
|
})
|
||||||
|
require.NoError(t, s.reload(c, false))
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-firstDone:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("old Start did not return after reload")
|
||||||
|
}
|
||||||
|
|
||||||
|
waitForListening(t, "127.0.0.1:"+port2)
|
||||||
|
second := currentRuntime(s)
|
||||||
|
require.NotNil(t, second)
|
||||||
|
assert.NotSame(t, first, second, "expected a new runtime after listen address change")
|
||||||
|
|
||||||
|
s.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsServer_Stop_beforeStart_doesNotBlock(t *testing.T) {
|
||||||
|
s, c := newTestStatsServer(t)
|
||||||
|
setStatsConfig(c, map[string]any{
|
||||||
|
"type": "prometheus",
|
||||||
|
"interval": "1s",
|
||||||
|
"listen": "127.0.0.1:0",
|
||||||
|
"path": "/metrics",
|
||||||
|
})
|
||||||
|
require.NoError(t, s.reload(c, true))
|
||||||
|
|
||||||
|
stopped := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
s.Stop()
|
||||||
|
close(stopped)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-stopped:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("Stop hung with no runtime started")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsServer_configTest_validatesWithoutSpawning(t *testing.T) {
|
||||||
|
s, c := newTestStatsServer(t)
|
||||||
|
s.configTest = true
|
||||||
|
setStatsConfig(c, map[string]any{
|
||||||
|
"type": "prometheus",
|
||||||
|
"interval": "1s",
|
||||||
|
"listen": "127.0.0.1:0",
|
||||||
|
"path": "/metrics",
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, s.reload(c, true))
|
||||||
|
s.Start()
|
||||||
|
assert.Nil(t, currentRuntime(s))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsServer_ctxCancel_unblocksStart(t *testing.T) {
|
||||||
|
// Ensures ctx cancellation alone (no explicit Stop) tears down both
|
||||||
|
// graphite and prom Start invocations.
|
||||||
|
port := freeTCPPort(t)
|
||||||
|
l := slog.New(slog.DiscardHandler)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
s := &statsServer{l: l, ctx: ctx}
|
||||||
|
c := config.NewC(l)
|
||||||
|
setStatsConfig(c, map[string]any{
|
||||||
|
"type": "prometheus",
|
||||||
|
"interval": "1s",
|
||||||
|
"listen": "127.0.0.1:" + port,
|
||||||
|
"path": "/metrics",
|
||||||
|
})
|
||||||
|
require.NoError(t, s.reload(c, true))
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
s.Start()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
waitForListening(t, "127.0.0.1:"+port)
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("Start did not return after ctx cancel")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsServer_listenerBindFailure_sameCfgReloadRetries(t *testing.T) {
|
||||||
|
// Hold the port so ListenAndServe will fail on first Start.
|
||||||
|
blocker, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
port := strconv.Itoa(blocker.Addr().(*net.TCPAddr).Port)
|
||||||
|
|
||||||
|
s, c := newTestStatsServer(t)
|
||||||
|
setStatsConfig(c, map[string]any{
|
||||||
|
"type": "prometheus",
|
||||||
|
"interval": "1s",
|
||||||
|
"listen": "127.0.0.1:" + port,
|
||||||
|
"path": "/metrics",
|
||||||
|
})
|
||||||
|
require.NoError(t, s.reload(c, true))
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
s.Start()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("Start did not return after bind failure")
|
||||||
|
}
|
||||||
|
// Bind failure should have dropped the cached config so a same-cfg
|
||||||
|
// SIGHUP can retry.
|
||||||
|
s.runMu.Lock()
|
||||||
|
cfgAfterFailure := s.runCfg
|
||||||
|
s.runMu.Unlock()
|
||||||
|
assert.Nil(t, cfgAfterFailure)
|
||||||
|
|
||||||
|
// Free the port and reload with the same config; Start should fire again.
|
||||||
|
require.NoError(t, blocker.Close())
|
||||||
|
require.NoError(t, s.reload(c, false))
|
||||||
|
|
||||||
|
waitForListening(t, "127.0.0.1:"+port)
|
||||||
|
require.NotNil(t, currentRuntime(s))
|
||||||
|
|
||||||
|
s.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForListening(t *testing.T, addr string) {
|
||||||
|
t.Helper()
|
||||||
|
waitFor(t, func() bool {
|
||||||
|
conn, err := net.DialTimeout("tcp", addr, 200*time.Millisecond)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
_ = conn.Close()
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// graphiteSink is a minimal TCP accept-and-discard server so graphite.Once
|
||||||
|
// calls in tests don't spam error logs or wedge on connection refused.
|
||||||
|
type graphiteSink struct {
|
||||||
|
ln net.Listener
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGraphiteSink(t *testing.T) *graphiteSink {
|
||||||
|
t.Helper()
|
||||||
|
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
g := &graphiteSink{ln: ln}
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
conn, err := ln.Accept()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
go func(c net.Conn) {
|
||||||
|
_, _ = io.Copy(io.Discard, c)
|
||||||
|
_ = c.Close()
|
||||||
|
}(conn)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return g
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *graphiteSink) Addr() string { return g.ln.Addr().String() }
|
||||||
|
func (g *graphiteSink) Close() { _ = g.ln.Close() }
|
||||||
|
|
||||||
|
func freeTCPPort(t *testing.T) string {
|
||||||
|
t.Helper()
|
||||||
|
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
port := ln.Addr().(*net.TCPAddr).Port
|
||||||
|
require.NoError(t, ln.Close())
|
||||||
|
return strconv.Itoa(port)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user