batched tun interface

This commit is contained in:
JackDoan
2026-04-17 10:25:05 -05:00
parent 0d87e57de3
commit 13ebc1b343
7 changed files with 133 additions and 586 deletions

View File

@@ -1,9 +1,18 @@
package batch
import "net/netip"
import (
"net/netip"
"github.com/slackhq/nebula/udp"
)
const SendBatchCap = 128
// DefaultSendBatchArenaCap is the recommended arena capacity for a
// standalone SendBatch: 128 slots × (udp.MTU + 32) ≈ 1.1 MiB. The +32 covers
// the nebula header + AEAD tag tacked onto each plaintext segment.
const DefaultSendBatchArenaCap = SendBatchCap * (udp.MTU + 32)
// batchWriter is the minimal subset of udp.Conn needed by SendBatch to flush.
type batchWriter interface {
WriteBatch(bufs [][]byte, addrs []netip.AddrPort, outerECNs []byte) error
@@ -11,38 +20,29 @@ type batchWriter interface {
// SendBatch accumulates encrypted UDP packets and flushes them via WriteBatch.
// One SendBatch is owned by each listenIn goroutine; no locking is needed.
// The backing arena grows on demand: when there isn't room for the next slot
// we allocate a fresh backing array. Already-committed slices keep referencing
// the old array and remain valid until Flush drops them.
// Slot bytes are borrowed from the injected Arena and remain valid until
// Flush, which Resets the arena.
type SendBatch struct {
out batchWriter
bufs [][]byte
dsts []netip.AddrPort
ecns []byte
backing []byte
out batchWriter
bufs [][]byte
dsts []netip.AddrPort
ecns []byte
arena *Arena
}
// NewSendBatch makes a SendBatch with batchCap slots and an arenaSize byte buffer for slices to back those slots
func NewSendBatch(out batchWriter, batchCap, arenaSize int) *SendBatch {
// NewSendBatch makes a SendBatch with batchCap slots backed by arena.
func NewSendBatch(out batchWriter, batchCap int, arena *Arena) *SendBatch {
return &SendBatch{
out: out,
bufs: make([][]byte, 0, batchCap),
dsts: make([]netip.AddrPort, 0, batchCap),
ecns: make([]byte, 0, batchCap),
backing: make([]byte, 0, arenaSize),
out: out,
bufs: make([][]byte, 0, batchCap),
dsts: make([]netip.AddrPort, 0, batchCap),
ecns: make([]byte, 0, batchCap),
arena: arena,
}
}
func (b *SendBatch) Reserve(sz int) []byte {
if len(b.backing)+sz > cap(b.backing) {
// Grow: allocate a fresh backing. Already-committed slices still
// reference the old array and remain valid until Flush drops them.
newCap := max(cap(b.backing)*2, sz)
b.backing = make([]byte, 0, newCap)
}
start := len(b.backing)
b.backing = b.backing[:start+sz]
return b.backing[start : start+sz : start+sz]
return b.arena.Reserve(sz)
}
func (b *SendBatch) Commit(pkt []byte, dst netip.AddrPort, outerECN byte) {
@@ -60,6 +60,6 @@ func (b *SendBatch) Flush() error {
b.bufs = b.bufs[:0]
b.dsts = b.dsts[:0]
b.ecns = b.ecns[:0]
b.backing = b.backing[:0]
b.arena.Reset()
return err
}

View File

@@ -27,7 +27,7 @@ func (w *fakeBatchWriter) WriteBatch(bufs [][]byte, addrs []netip.AddrPort, ecns
func TestSendBatchReserveCommitFlush(t *testing.T) {
fw := &fakeBatchWriter{}
b := NewSendBatch(fw, 4, 32)
b := NewSendBatch(fw, 4, NewArena(32))
ap := netip.MustParseAddrPort("10.0.0.1:4242")
for i := 0; i < 4; i++ {
@@ -71,7 +71,7 @@ func TestSendBatchReserveCommitFlush(t *testing.T) {
func TestSendBatchSlotsDoNotOverlap(t *testing.T) {
fw := &fakeBatchWriter{}
b := NewSendBatch(fw, 3, 8)
b := NewSendBatch(fw, 3, NewArena(8))
ap := netip.MustParseAddrPort("10.0.0.1:80")
for i := 0; i < 3; i++ {
@@ -93,7 +93,7 @@ func TestSendBatchSlotsDoNotOverlap(t *testing.T) {
func TestSendBatchGrowPreservesCommitted(t *testing.T) {
fw := &fakeBatchWriter{}
// Tiny initial backing forces a grow on the second Reserve.
b := NewSendBatch(fw, 1, 4)
b := NewSendBatch(fw, 1, NewArena(4))
ap := netip.MustParseAddrPort("10.0.0.1:80")
s1 := b.Reserve(4)

View File

@@ -8,6 +8,10 @@ import (
"github.com/slackhq/nebula/routing"
)
// defaultBatchBufSize is the per-Queue scratch size for Read on backends
// that don't do TSO segmentation. 65535 covers any single IP packet.
const defaultBatchBufSize = 65535
type Device interface {
io.Closer
Activate() error

12
overlay/tio/segment.go Normal file
View File

@@ -0,0 +1,12 @@
package tio
import "fmt"
// SegmentSuperpacket invokes fn once per segment of pkt.
// This is a stub implementation that does not actually support segmentation
func SegmentSuperpacket(pkt Packet, fn func(seg []byte) error) error {
if pkt.GSO.IsSuperpacket() {
return fmt.Errorf("tio: GSO superpacket on platform without segmentation support")
}
return fn(pkt.Bytes)
}