Experimenting

This commit is contained in:
Nate Brown
2026-05-11 11:51:46 -05:00
parent b7e9939e92
commit 86cef88744
33 changed files with 691 additions and 560 deletions

View File

@@ -63,7 +63,11 @@ type LightHouse struct {
interval atomic.Int64
updateCancel context.CancelFunc
ifce EncWriter
nebulaPort uint32 // 32 bits because protobuf does not have a uint16
// bufAlloc lets the lighthouse query/update workers, request handlers
// and punchback goroutines acquire correctly sized WireBuffers from
// the same pool as the data plane. Set by main.go alongside ifce.
bufAlloc WireBufferAllocator
nebulaPort uint32 // 32 bits because protobuf does not have a uint16
advertiseAddrs atomic.Pointer[[]netip.AddrPort]
@@ -109,7 +113,11 @@ func NewLightHouseFromConfig(ctx context.Context, l *slog.Logger, c *config.C, c
punchy: p,
updateTrigger: make(chan struct{}, 1),
queryChan: make(chan netip.Addr, c.GetUint32("handshakes.query_buffer", 64)),
l: l,
// Default to a no-prefix pool so the query/update workers and
// request handlers have a working WireBufferAllocator before
// main.go wires up the real one from the Interface.
bufAlloc: NewWireBufferPool(mtu, 0),
l: l,
}
lighthouses := make([]netip.Addr, 0)
h.lighthouses.Store(&lighthouses)
@@ -758,21 +766,22 @@ func (lh *LightHouse) startQueryWorker() {
}
go func() {
nb := make([]byte, 12, 12)
out := make([]byte, mtu)
// Long-lived per-worker WireBuffer; reused for every lighthouse query
// this worker issues for the life of the goroutine.
buf := lh.bufAlloc.Acquire()
for {
select {
case <-lh.ctx.Done():
return
case addr := <-lh.queryChan:
lh.innerQueryServer(addr, nb, out)
lh.innerQueryServer(addr, buf)
}
}
}()
}
func (lh *LightHouse) innerQueryServer(addr netip.Addr, nb, out []byte) {
func (lh *LightHouse) innerQueryServer(addr netip.Addr, buf *WireBuffer) {
if lh.IsLighthouseAddr(addr) {
return
}
@@ -821,7 +830,7 @@ func (lh *LightHouse) innerQueryServer(addr netip.Addr, nb, out []byte) {
}
}
lh.ifce.SendMessageToVpnAddr(header.LightHouse, 0, lhVpnAddr, v1Query, nb, out)
lh.ifce.SendMessageToVpnAddr(header.LightHouse, 0, lhVpnAddr, v1Query, buf)
queried++
} else if v == cert.Version2 {
@@ -840,7 +849,7 @@ func (lh *LightHouse) innerQueryServer(addr netip.Addr, nb, out []byte) {
}
}
lh.ifce.SendMessageToVpnAddr(header.LightHouse, 0, lhVpnAddr, v2Query, nb, out)
lh.ifce.SendMessageToVpnAddr(header.LightHouse, 0, lhVpnAddr, v2Query, buf)
queried++
} else {
@@ -869,8 +878,12 @@ func (lh *LightHouse) StartUpdateWorker() {
go func() {
defer clockSource.Stop()
// Long-lived per-worker WireBuffer; reused across every periodic
// update for the life of this goroutine.
buf := lh.bufAlloc.Acquire()
for {
lh.SendUpdate()
lh.sendUpdate(buf)
select {
case <-updateCtx.Done():
@@ -884,6 +897,15 @@ func (lh *LightHouse) StartUpdateWorker() {
}()
}
// SendUpdate is the public entry point that triggers a one-shot lighthouse
// update outside the worker loop (e.g. tests or reload paths). It allocates
// its own WireBuffer since callers don't already own one.
func (lh *LightHouse) SendUpdate() {
buf := lh.bufAlloc.Acquire()
defer lh.bufAlloc.Release(buf)
lh.sendUpdate(buf)
}
// TriggerUpdate requests an immediate lighthouse update. This is a non-blocking
// operation intended to be called after a handshake completes with a lighthouse,
// so the lighthouse has our current addresses without waiting for the next
@@ -895,7 +917,7 @@ func (lh *LightHouse) TriggerUpdate() {
}
}
func (lh *LightHouse) SendUpdate() {
func (lh *LightHouse) sendUpdate(buf *WireBuffer) {
var v4 []*V4AddrPort
var v6 []*V6AddrPort
@@ -921,9 +943,6 @@ func (lh *LightHouse) SendUpdate() {
}
}
nb := make([]byte, 12, 12)
out := make([]byte, mtu)
var v1Update, v2Update []byte
var err error
updated := 0
@@ -974,7 +993,7 @@ func (lh *LightHouse) SendUpdate() {
}
}
lh.ifce.SendMessageToVpnAddr(header.LightHouse, 0, lhVpnAddr, v1Update, nb, out)
lh.ifce.SendMessageToVpnAddr(header.LightHouse, 0, lhVpnAddr, v1Update, buf)
updated++
} else if v == cert.Version2 {
@@ -1003,7 +1022,7 @@ func (lh *LightHouse) SendUpdate() {
}
}
lh.ifce.SendMessageToVpnAddr(header.LightHouse, 0, lhVpnAddr, v2Update, nb, out)
lh.ifce.SendMessageToVpnAddr(header.LightHouse, 0, lhVpnAddr, v2Update, buf)
updated++
} else {
@@ -1019,9 +1038,11 @@ func (lh *LightHouse) SendUpdate() {
}
type LightHouseHandler struct {
lh *LightHouse
nb []byte
out []byte
lh *LightHouse
// buf is the long-lived per-handler wire scratch. NewRequestHandler is
// called once per data-plane receive goroutine, so buf is owned by that
// goroutine and reused for every lighthouse send the handler issues.
buf *WireBuffer
pb []byte
meta *NebulaMeta
l *slog.Logger
@@ -1030,8 +1051,7 @@ type LightHouseHandler struct {
func (lh *LightHouse) NewRequestHandler() *LightHouseHandler {
lhh := &LightHouseHandler{
lh: lh,
nb: make([]byte, 12, 12),
out: make([]byte, mtu),
buf: lh.bufAlloc.Acquire(),
l: lh.l,
pb: make([]byte, mtu),
@@ -1168,7 +1188,7 @@ func (lhh *LightHouseHandler) handleHostQuery(n *NebulaMeta, fromVpnAddrs []neti
}
lhh.lh.metricTx(NebulaMeta_HostQueryReply, 1)
w.SendMessageToVpnAddr(header.LightHouse, 0, fromVpnAddrs[0], lhh.pb[:ln], lhh.nb, lhh.out[:0])
w.SendMessageToVpnAddr(header.LightHouse, 0, fromVpnAddrs[0], lhh.pb[:ln], lhh.buf)
lhh.sendHostPunchNotification(n, fromVpnAddrs, queryVpnAddr, w)
}
@@ -1228,7 +1248,7 @@ func (lhh *LightHouseHandler) sendHostPunchNotification(n *NebulaMeta, fromVpnAd
}
lhh.lh.metricTx(NebulaMeta_HostPunchNotification, 1)
w.SendMessageToVpnAddr(header.LightHouse, 0, punchNotifDest, lhh.pb[:ln], lhh.nb, lhh.out[:0])
w.SendMessageToVpnAddr(header.LightHouse, 0, punchNotifDest, lhh.pb[:ln], lhh.buf)
}
func (lhh *LightHouseHandler) coalesceAnswers(v cert.Version, c *cache, n *NebulaMeta) {
@@ -1385,7 +1405,7 @@ func (lhh *LightHouseHandler) handleHostUpdateNotification(n *NebulaMeta, fromVp
}
lhh.lh.metricTx(NebulaMeta_HostUpdateNotificationAck, 1)
w.SendMessageToVpnAddr(header.LightHouse, 0, fromVpnAddrs[0], lhh.pb[:ln], lhh.nb, lhh.out[:0])
w.SendMessageToVpnAddr(header.LightHouse, 0, fromVpnAddrs[0], lhh.pb[:ln], lhh.buf)
}
func (lhh *LightHouseHandler) handleHostPunchNotification(n *NebulaMeta, fromVpnAddrs []netip.Addr, w EncWriter) {
@@ -1452,10 +1472,13 @@ func (lhh *LightHouseHandler) handleHostPunchNotification(n *NebulaMeta, fromVpn
"vpnAddr", detailsVpnAddr,
)
}
//NOTE: we have to allocate a new output buffer here since we are spawning a new goroutine
// for each punchBack packet. We should move this into a timerwheel or a single goroutine
// We acquire and release a fresh buf within this goroutine so it
// returns to the pool once the punchback send completes. We
// should move this into a timerwheel or a single goroutine
// managed by a channel.
w.SendMessageToVpnAddr(header.Test, header.TestRequest, detailsVpnAddr, []byte(""), make([]byte, 12, 12), make([]byte, mtu))
pbuf := lhh.lh.bufAlloc.Acquire()
defer lhh.lh.bufAlloc.Release(pbuf)
w.SendMessageToVpnAddr(header.Test, header.TestRequest, detailsVpnAddr, []byte(""), pbuf)
}()
}
}