better and batched tun interface

This commit is contained in:
JackDoan
2026-04-17 10:25:05 -05:00
parent 398d67e2da
commit f95857b4c3
34 changed files with 1189 additions and 483 deletions

33
overlay/batch/batch.go Normal file
View File

@@ -0,0 +1,33 @@
package batch
import "net/netip"
type RxBatcher interface {
// Reserve creates a pkt to borrow
Reserve(sz int) []byte
// Commit borrows pkt. The caller must keep pkt valid until the next Flush
Commit(pkt []byte) error
// Flush emits every queued packet in arrival order. Returns the
// first error observed; keeps draining so one bad packet doesn't hold up
// the rest. After Flush returns, borrowed payload slices may be recycled.
Flush() error
}
type TxBatcher interface {
// Next returns a zero-length slice with slotCap capacity over the next unused
// slot's backing bytes. The caller writes into the returned slice and then
// calls Commit with the final length and destination. Next returns nil when
// the batch is full.
Next() []byte
// Commit records the slot just returned by Next as a packet of length n
// destined for dst.
Commit(n int, dst netip.AddrPort)
// Reset clears committed slots; backing storage is retained for reuse.
Reset()
// Len returns the number of committed packets.
Len() int
// Cap returns the maximum number of slots in the batch.
Cap() int
// Get returns the buffers needed to send the batch
Get() ([][]byte, []netip.AddrPort)
}

View File

@@ -0,0 +1,57 @@
package batch
import (
"io"
"github.com/slackhq/nebula/udp"
)
// Passthrough is a RxBatcher that doesn't batch anything, it just accumulates and then sends packets.
type Passthrough struct {
out io.Writer
slots [][]byte
backing []byte
cursor int
}
func NewPassthrough(w io.Writer) *Passthrough {
const baseNumSlots = 128
return &Passthrough{
out: w,
slots: make([][]byte, 0, baseNumSlots),
backing: make([]byte, 0, baseNumSlots*udp.MTU),
}
}
func (p *Passthrough) Reserve(sz int) []byte {
if len(p.backing)+sz > cap(p.backing) {
// Grow: allocate a fresh backing. Already-committed slices still
// reference the old array and remain valid until Flush drops them.
newCap := max(cap(p.backing)*2, sz)
p.backing = make([]byte, 0, newCap)
}
start := len(p.backing)
p.backing = p.backing[:start+sz]
return p.backing[start : start+sz : start+sz] //return zero length, sz-cap slice
}
func (p *Passthrough) Commit(pkt []byte) error {
p.slots = append(p.slots, pkt)
return nil
}
func (p *Passthrough) Flush() error {
var firstErr error
for _, s := range p.slots {
_, err := p.out.Write(s)
if err != nil && firstErr == nil {
firstErr = err
}
}
for i := range p.slots {
p.slots[i] = nil
}
p.slots = p.slots[:0]
p.backing = p.backing[:0]
return firstErr
}

61
overlay/batch/tx_batch.go Normal file
View File

@@ -0,0 +1,61 @@
package batch
import "net/netip"
const SendBatchCap = 128
// SendBatch accumulates encrypted UDP packets for potential TX offloading.
// One SendBatch is owned by each listenIn goroutine; no locking is needed.
// The backing storage holds up to batchCap packets of slotCap bytes each;
// bufs and dsts are parallel slices of committed slots.
type SendBatch struct {
bufs [][]byte
dsts []netip.AddrPort
backing []byte
slotCap int
batchCap int
nextSlot int
}
func NewSendBatch(batchCap, slotCap int) *SendBatch {
return &SendBatch{
bufs: make([][]byte, 0, batchCap),
dsts: make([]netip.AddrPort, 0, batchCap),
backing: make([]byte, batchCap*slotCap),
slotCap: slotCap,
batchCap: batchCap,
}
}
func (b *SendBatch) Next() []byte {
if b.nextSlot >= b.batchCap {
return nil
}
start := b.nextSlot * b.slotCap
return b.backing[start : start : start+b.slotCap] //set len to 0 but cap to slotCap
}
func (b *SendBatch) Commit(n int, dst netip.AddrPort) {
start := b.nextSlot * b.slotCap
b.bufs = append(b.bufs, b.backing[start:start+n])
b.dsts = append(b.dsts, dst)
b.nextSlot++
}
func (b *SendBatch) Reset() {
b.bufs = b.bufs[:0]
b.dsts = b.dsts[:0]
b.nextSlot = 0
}
func (b *SendBatch) Len() int {
return len(b.bufs)
}
func (b *SendBatch) Cap() int {
return b.batchCap
}
func (b *SendBatch) Get() ([][]byte, []netip.AddrPort) {
return b.bufs, b.dsts
}

View File

@@ -0,0 +1,69 @@
package batch
import (
"net/netip"
"testing"
)
func TestSendBatchBookkeeping(t *testing.T) {
b := NewSendBatch(4, 32)
if b.Len() != 0 || b.Cap() != 4 {
t.Fatalf("fresh batch: len=%d cap=%d", b.Len(), b.Cap())
}
ap := netip.MustParseAddrPort("10.0.0.1:4242")
for i := 0; i < 4; i++ {
slot := b.Next()
if slot == nil {
t.Fatalf("slot %d: Next returned nil before cap", i)
}
if cap(slot) != 32 || len(slot) != 0 {
t.Fatalf("slot %d: got len=%d cap=%d want len=0 cap=32", i, len(slot), cap(slot))
}
// Write a marker byte.
slot = append(slot, byte(i), byte(i+1), byte(i+2))
b.Commit(len(slot), ap)
}
if b.Next() != nil {
t.Fatalf("Next should return nil when full")
}
if b.Len() != 4 {
t.Fatalf("Len=%d want 4", b.Len())
}
for i, buf := range b.bufs {
if len(buf) != 3 || buf[0] != byte(i) {
t.Errorf("buf %d: %x", i, buf)
}
if b.dsts[i] != ap {
t.Errorf("dst %d: got %v want %v", i, b.dsts[i], ap)
}
}
// Reset returns empty and Next works again.
b.Reset()
if b.Len() != 0 {
t.Fatalf("after Reset Len=%d want 0", b.Len())
}
slot := b.Next()
if slot == nil || cap(slot) != 32 {
t.Fatalf("after Reset Next nil or wrong cap: %v cap=%d", slot == nil, cap(slot))
}
}
func TestSendBatchSlotsDoNotOverlap(t *testing.T) {
b := NewSendBatch(3, 8)
ap := netip.MustParseAddrPort("10.0.0.1:80")
// Fill three slots, each with its own sentinel byte.
for i := 0; i < 3; i++ {
s := b.Next()
s = append(s, byte(0xA0+i), byte(0xB0+i))
b.Commit(len(s), ap)
}
for i, buf := range b.bufs {
if buf[0] != byte(0xA0+i) || buf[1] != byte(0xB0+i) {
t.Errorf("slot %d corrupted: %x", i, buf)
}
}
}