From 90a943e386490e29f258441d0a566cc5dc238512 Mon Sep 17 00:00:00 2001 From: JackDoan Date: Tue, 12 May 2026 14:29:57 -0500 Subject: [PATCH] fix --- interface.go | 3 +- overlay/batch/tx_batch.go | 52 +++++++++++++++++----------------- overlay/batch/tx_batch_test.go | 6 ++-- 3 files changed, 30 insertions(+), 31 deletions(-) diff --git a/interface.go b/interface.go index 2af581f2..e13e773d 100644 --- a/interface.go +++ b/interface.go @@ -346,8 +346,7 @@ func (f *Interface) listenOut(i int) { func (f *Interface) listenIn(reader tio.Queue, i int) { rejectBuf := make([]byte, mtu) - arenaSize := batch.SendBatchCap * (udp.MTU + 32) - sb := batch.NewSendBatch(f.writers[i], batch.SendBatchCap, arenaSize) + sb := batch.NewSendBatch(f.writers[i], batch.SendBatchCap, batch.NewArena(batch.DefaultSendBatchArenaCap)) fwPacket := &firewall.Packet{} nb := make([]byte, 12, 12) diff --git a/overlay/batch/tx_batch.go b/overlay/batch/tx_batch.go index 38f86b25..599bc306 100644 --- a/overlay/batch/tx_batch.go +++ b/overlay/batch/tx_batch.go @@ -1,9 +1,18 @@ package batch -import "net/netip" +import ( + "net/netip" + + "github.com/slackhq/nebula/udp" +) const SendBatchCap = 128 +// DefaultSendBatchArenaCap is the recommended arena capacity for a +// standalone SendBatch: 128 slots × (udp.MTU + 32) ≈ 1.1 MiB. The +32 covers +// the nebula header + AEAD tag tacked onto each plaintext segment. +const DefaultSendBatchArenaCap = SendBatchCap * (udp.MTU + 32) + // batchWriter is the minimal subset of udp.Conn needed by SendBatch to flush. type batchWriter interface { WriteBatch(bufs [][]byte, addrs []netip.AddrPort, outerECNs []byte) error @@ -11,38 +20,29 @@ type batchWriter interface { // SendBatch accumulates encrypted UDP packets and flushes them via WriteBatch. // One SendBatch is owned by each listenIn goroutine; no locking is needed. -// The backing arena grows on demand: when there isn't room for the next slot -// we allocate a fresh backing array. Already-committed slices keep referencing -// the old array and remain valid until Flush drops them. +// Slot bytes are borrowed from the injected Arena and remain valid until +// Flush, which Resets the arena. type SendBatch struct { - out batchWriter - bufs [][]byte - dsts []netip.AddrPort - ecns []byte - backing []byte + out batchWriter + bufs [][]byte + dsts []netip.AddrPort + ecns []byte + arena *Arena } -// NewSendBatch makes a SendBatch with batchCap slots and an arenaSize byte buffer for slices to back those slots -func NewSendBatch(out batchWriter, batchCap, arenaSize int) *SendBatch { +// NewSendBatch makes a SendBatch with batchCap slots backed by arena. +func NewSendBatch(out batchWriter, batchCap int, arena *Arena) *SendBatch { return &SendBatch{ - out: out, - bufs: make([][]byte, 0, batchCap), - dsts: make([]netip.AddrPort, 0, batchCap), - ecns: make([]byte, 0, batchCap), - backing: make([]byte, 0, arenaSize), + out: out, + bufs: make([][]byte, 0, batchCap), + dsts: make([]netip.AddrPort, 0, batchCap), + ecns: make([]byte, 0, batchCap), + arena: arena, } } func (b *SendBatch) Reserve(sz int) []byte { - if len(b.backing)+sz > cap(b.backing) { - // Grow: allocate a fresh backing. Already-committed slices still - // reference the old array and remain valid until Flush drops them. - newCap := max(cap(b.backing)*2, sz) - b.backing = make([]byte, 0, newCap) - } - start := len(b.backing) - b.backing = b.backing[:start+sz] - return b.backing[start : start+sz : start+sz] + return b.arena.Reserve(sz) } func (b *SendBatch) Commit(pkt []byte, dst netip.AddrPort, outerECN byte) { @@ -60,6 +60,6 @@ func (b *SendBatch) Flush() error { b.bufs = b.bufs[:0] b.dsts = b.dsts[:0] b.ecns = b.ecns[:0] - b.backing = b.backing[:0] + b.arena.Reset() return err } diff --git a/overlay/batch/tx_batch_test.go b/overlay/batch/tx_batch_test.go index 454011dc..59b06e58 100644 --- a/overlay/batch/tx_batch_test.go +++ b/overlay/batch/tx_batch_test.go @@ -27,7 +27,7 @@ func (w *fakeBatchWriter) WriteBatch(bufs [][]byte, addrs []netip.AddrPort, ecns func TestSendBatchReserveCommitFlush(t *testing.T) { fw := &fakeBatchWriter{} - b := NewSendBatch(fw, 4, 32) + b := NewSendBatch(fw, 4, NewArena(32)) ap := netip.MustParseAddrPort("10.0.0.1:4242") for i := 0; i < 4; i++ { @@ -71,7 +71,7 @@ func TestSendBatchReserveCommitFlush(t *testing.T) { func TestSendBatchSlotsDoNotOverlap(t *testing.T) { fw := &fakeBatchWriter{} - b := NewSendBatch(fw, 3, 8) + b := NewSendBatch(fw, 3, NewArena(8)) ap := netip.MustParseAddrPort("10.0.0.1:80") for i := 0; i < 3; i++ { @@ -93,7 +93,7 @@ func TestSendBatchSlotsDoNotOverlap(t *testing.T) { func TestSendBatchGrowPreservesCommitted(t *testing.T) { fw := &fakeBatchWriter{} // Tiny initial backing forces a grow on the second Reserve. - b := NewSendBatch(fw, 1, 4) + b := NewSendBatch(fw, 1, NewArena(4)) ap := netip.MustParseAddrPort("10.0.0.1:80") s1 := b.Reserve(4)