batched tun interface

This commit is contained in:
JackDoan
2026-04-17 10:25:05 -05:00
parent 398d67e2da
commit dbe0c3c403
38 changed files with 1740 additions and 531 deletions

28
overlay/batch/batch.go Normal file
View File

@@ -0,0 +1,28 @@
package batch
import "net/netip"
type RxBatcher interface {
// Reserve creates a pkt to borrow
Reserve(sz int) []byte
// Commit borrows pkt. The caller must keep pkt valid until the next Flush
Commit(pkt []byte) error
// Flush emits every queued packet in arrival order. Returns the
// first error observed; keeps draining so one bad packet doesn't hold up
// the rest. After Flush returns, borrowed payload slices may be recycled.
Flush() error
}
type TxBatcher interface {
// Reserve creates a pkt to borrow
Reserve(sz int) []byte
// Commit borrows pkt and records its destination plus the 2-bit
// IP-level ECN codepoint to set on the outer (carrier) header. The
// caller must keep pkt valid until the next Flush. Pass 0 (Not-ECT)
// to leave the outer ECN field unset.
Commit(pkt []byte, dst netip.AddrPort, outerECN byte)
// Flush emits every queued packet via the underlying batch writer in
// arrival order. Returns an errors.Join of one or more errors. After Flush returns,
// borrowed payload slices may be recycled.
Flush() error
}

View File

@@ -0,0 +1,42 @@
package batch
// Arena is an injectable byte-slab that hands out non-overlapping borrowed
// slices via Reserve and releases them in bulk via Reset. Coalescers take
// an *Arena at construction so the caller controls the slab lifetime and
// can share one slab across multiple coalescers (MultiCoalescer hands the
// same *Arena to every lane so the lanes don't carry their own backings).
//
// Reserve borrows; the slice is valid until the next Reset. The slab grows
// (by allocating a fresh, larger backing array) if a Reserve doesn't fit;
// pre-size the arena via NewArena to avoid that path on the hot path.
type Arena struct {
buf []byte
}
// NewArena returns an Arena with a pre-allocated backing of the given
// capacity. Pass 0 if you don't intend to call Reserve (e.g. a test that
// only feeds the coalescer pre-made []byte packets via Commit).
func NewArena(capacity int) *Arena {
return &Arena{buf: make([]byte, 0, capacity)}
}
// Reserve hands out a non-overlapping sz-byte slice from the arena. If the
// request doesn't fit the current backing, a fresh, larger backing is
// allocated; already-borrowed slices reference the old backing and remain
// valid until Reset.
func (a *Arena) Reserve(sz int) []byte {
if len(a.buf)+sz > cap(a.buf) {
newCap := max(cap(a.buf)*2, sz)
a.buf = make([]byte, 0, newCap)
}
start := len(a.buf)
a.buf = a.buf[:start+sz]
return a.buf[start : start+sz : start+sz]
}
// Reset releases every slice handed out since the last Reset. Callers must
// not use any previously-borrowed slice after this returns. The underlying
// backing array is retained so subsequent Reserves don't re-allocate.
func (a *Arena) Reset() {
a.buf = a.buf[:0]
}

View File

@@ -0,0 +1,52 @@
package batch
import (
"io"
"github.com/slackhq/nebula/udp"
)
// Passthrough is a RxBatcher that doesn't batch anything, it just accumulates and then sends packets.
type Passthrough struct {
out io.Writer
slots [][]byte
arena *Arena
cursor int
}
const passthroughBaseNumSlots = 128
// DefaultPassthroughArenaCap is the recommended arena capacity for a
// standalone Passthrough batcher: 128 slots × udp.MTU ≈ 1.1 MiB.
const DefaultPassthroughArenaCap = passthroughBaseNumSlots * udp.MTU
func NewPassthrough(w io.Writer, arena *Arena) *Passthrough {
return &Passthrough{
out: w,
slots: make([][]byte, 0, passthroughBaseNumSlots),
arena: arena,
}
}
func (p *Passthrough) Reserve(sz int) []byte {
return p.arena.Reserve(sz)
}
func (p *Passthrough) Commit(pkt []byte) error {
p.slots = append(p.slots, pkt)
return nil
}
func (p *Passthrough) Flush() error {
var firstErr error
for _, s := range p.slots {
_, err := p.out.Write(s)
if err != nil && firstErr == nil {
firstErr = err
}
}
clear(p.slots)
p.slots = p.slots[:0]
p.arena.Reset()
return firstErr
}

65
overlay/batch/tx_batch.go Normal file
View File

@@ -0,0 +1,65 @@
package batch
import "net/netip"
const SendBatchCap = 128
// batchWriter is the minimal subset of udp.Conn needed by SendBatch to flush.
type batchWriter interface {
WriteBatch(bufs [][]byte, addrs []netip.AddrPort, outerECNs []byte) error
}
// SendBatch accumulates encrypted UDP packets and flushes them via WriteBatch.
// One SendBatch is owned by each listenIn goroutine; no locking is needed.
// The backing arena grows on demand: when there isn't room for the next slot
// we allocate a fresh backing array. Already-committed slices keep referencing
// the old array and remain valid until Flush drops them.
type SendBatch struct {
out batchWriter
bufs [][]byte
dsts []netip.AddrPort
ecns []byte
backing []byte
}
// NewSendBatch makes a SendBatch with batchCap slots and an arenaSize byte buffer for slices to back those slots
func NewSendBatch(out batchWriter, batchCap, arenaSize int) *SendBatch {
return &SendBatch{
out: out,
bufs: make([][]byte, 0, batchCap),
dsts: make([]netip.AddrPort, 0, batchCap),
ecns: make([]byte, 0, batchCap),
backing: make([]byte, 0, arenaSize),
}
}
func (b *SendBatch) Reserve(sz int) []byte {
if len(b.backing)+sz > cap(b.backing) {
// Grow: allocate a fresh backing. Already-committed slices still
// reference the old array and remain valid until Flush drops them.
newCap := max(cap(b.backing)*2, sz)
b.backing = make([]byte, 0, newCap)
}
start := len(b.backing)
b.backing = b.backing[:start+sz]
return b.backing[start : start+sz : start+sz]
}
func (b *SendBatch) Commit(pkt []byte, dst netip.AddrPort, outerECN byte) {
b.bufs = append(b.bufs, pkt)
b.dsts = append(b.dsts, dst)
b.ecns = append(b.ecns, outerECN)
}
func (b *SendBatch) Flush() error {
var err error
if len(b.bufs) > 0 {
err = b.out.WriteBatch(b.bufs, b.dsts, b.ecns)
}
clear(b.bufs)
b.bufs = b.bufs[:0]
b.dsts = b.dsts[:0]
b.ecns = b.ecns[:0]
b.backing = b.backing[:0]
return err
}

View File

@@ -0,0 +1,124 @@
package batch
import (
"net/netip"
"testing"
)
type fakeBatchWriter struct {
bufs [][]byte
addrs []netip.AddrPort
ecns []byte
}
func (w *fakeBatchWriter) WriteBatch(bufs [][]byte, addrs []netip.AddrPort, ecns []byte) error {
// Snapshot — SendBatch.Flush nils its slot pointers right after WriteBatch
// returns, so tests must capture data before that happens.
w.bufs = make([][]byte, len(bufs))
for i, b := range bufs {
cp := make([]byte, len(b))
copy(cp, b)
w.bufs[i] = cp
}
w.addrs = append(w.addrs[:0], addrs...)
w.ecns = append(w.ecns[:0], ecns...)
return nil
}
func TestSendBatchReserveCommitFlush(t *testing.T) {
fw := &fakeBatchWriter{}
b := NewSendBatch(fw, 4, 32)
ap := netip.MustParseAddrPort("10.0.0.1:4242")
for i := 0; i < 4; i++ {
slot := b.Reserve(32)
if cap(slot) != 32 {
t.Fatalf("slot %d: cap=%d want 32", i, cap(slot))
}
pkt := append(slot[:0], byte(i), byte(i+1), byte(i+2))
b.Commit(pkt, ap, 0)
}
if err := b.Flush(); err != nil {
t.Fatalf("Flush: %v", err)
}
if len(fw.bufs) != 4 {
t.Fatalf("WriteBatch got %d bufs want 4", len(fw.bufs))
}
for i, buf := range fw.bufs {
if len(buf) != 3 || buf[0] != byte(i) {
t.Errorf("buf %d: %x", i, buf)
}
if fw.addrs[i] != ap {
t.Errorf("addr %d: got %v want %v", i, fw.addrs[i], ap)
}
}
// Flush again with nothing committed — should be a no-op.
fw.bufs = nil
if err := b.Flush(); err != nil {
t.Fatalf("empty Flush: %v", err)
}
if fw.bufs != nil {
t.Fatalf("empty Flush triggered WriteBatch")
}
// Reuse after Flush.
slot := b.Reserve(32)
if cap(slot) != 32 {
t.Fatalf("after Flush Reserve wrong cap: %d", cap(slot))
}
}
func TestSendBatchSlotsDoNotOverlap(t *testing.T) {
fw := &fakeBatchWriter{}
b := NewSendBatch(fw, 3, 8)
ap := netip.MustParseAddrPort("10.0.0.1:80")
for i := 0; i < 3; i++ {
s := b.Reserve(8)
pkt := append(s[:0], byte(0xA0+i), byte(0xB0+i))
b.Commit(pkt, ap, 0)
}
if err := b.Flush(); err != nil {
t.Fatalf("Flush: %v", err)
}
for i, buf := range fw.bufs {
if buf[0] != byte(0xA0+i) || buf[1] != byte(0xB0+i) {
t.Errorf("slot %d corrupted: %x", i, buf)
}
}
}
func TestSendBatchGrowPreservesCommitted(t *testing.T) {
fw := &fakeBatchWriter{}
// Tiny initial backing forces a grow on the second Reserve.
b := NewSendBatch(fw, 1, 4)
ap := netip.MustParseAddrPort("10.0.0.1:80")
s1 := b.Reserve(4)
pkt1 := append(s1[:0], 0x11, 0x22, 0x33, 0x44)
b.Commit(pkt1, ap, 0)
s2 := b.Reserve(8) // exceeds remaining cap, triggers grow
pkt2 := append(s2[:0], 0xA, 0xB, 0xC, 0xD, 0xE)
b.Commit(pkt2, ap, 0)
// pkt1 must still be intact even though backing reallocated.
if pkt1[0] != 0x11 || pkt1[3] != 0x44 {
t.Fatalf("first packet corrupted by grow: %x", pkt1)
}
if err := b.Flush(); err != nil {
t.Fatalf("Flush: %v", err)
}
if len(fw.bufs) != 2 {
t.Fatalf("got %d bufs want 2", len(fw.bufs))
}
if fw.bufs[0][0] != 0x11 || fw.bufs[0][3] != 0x44 {
t.Errorf("first packet on the wire: %x", fw.bufs[0])
}
if fw.bufs[1][0] != 0xA || fw.bufs[1][4] != 0xE {
t.Errorf("second packet on the wire: %x", fw.bufs[1])
}
}