This commit is contained in:
JackDoan
2026-05-14 13:40:40 -05:00
parent 3a329ec217
commit c5abf30102
10 changed files with 48 additions and 80 deletions

View File

@@ -1,28 +0,0 @@
package batch
import "net/netip"
type RxBatcher interface {
// Reserve creates a pkt to borrow
Reserve(sz int) []byte
// Commit borrows pkt. The caller must keep pkt valid until the next Flush
Commit(pkt []byte) error
// Flush emits every queued packet in arrival order. Returns the
// first error observed; keeps draining so one bad packet doesn't hold up
// the rest. After Flush returns, borrowed payload slices may be recycled.
Flush() error
}
type TxBatcher interface {
// Reserve creates a pkt to borrow
Reserve(sz int) []byte
// Commit borrows pkt and records its destination plus the 2-bit
// IP-level ECN codepoint to set on the outer (carrier) header. The
// caller must keep pkt valid until the next Flush. Pass 0 (Not-ECT)
// to leave the outer ECN field unset.
Commit(pkt []byte, dst netip.AddrPort, outerECN byte)
// Flush emits every queued packet via the underlying batch writer in
// arrival order. Returns an errors.Join of one or more errors. After Flush returns,
// borrowed payload slices may be recycled.
Flush() error
}

View File

@@ -3,24 +3,18 @@ package batch
import (
"io"
"github.com/slackhq/nebula/udp"
"github.com/slackhq/nebula/util"
)
// Passthrough is a RxBatcher that doesn't batch anything, it just accumulates and then sends packets.
type Passthrough struct {
out io.Writer
slots [][]byte
arena *Arena
arena *util.Arena
cursor int
}
const passthroughBaseNumSlots = 128
// DefaultPassthroughArenaCap is the recommended arena capacity for a
// standalone Passthrough batcher: 128 slots × udp.MTU ≈ 1.1 MiB.
const DefaultPassthroughArenaCap = passthroughBaseNumSlots * udp.MTU
func NewPassthrough(w io.Writer, slots int, arena *Arena) *Passthrough {
func NewPassthrough(w io.Writer, slots int, arena *util.Arena) *Passthrough {
return &Passthrough{
out: w,
slots: make([][]byte, 0, slots),

12
overlay/batch/rx_batch.go Normal file
View File

@@ -0,0 +1,12 @@
package batch
type RxBatcher interface {
// Reserve creates a pkt to borrow
Reserve(sz int) []byte
// Commit borrows pkt. The caller must keep pkt valid until the next Flush
Commit(pkt []byte) error
// Flush emits every queued packet in arrival order. Returns the
// first error observed; keeps draining so one bad packet doesn't hold up
// the rest. After Flush returns, borrowed payload slices may be recycled.
Flush() error
}

View File

@@ -3,16 +3,11 @@ package batch
import (
"net/netip"
"github.com/slackhq/nebula/udp"
"github.com/slackhq/nebula/util"
)
const SendBatchCap = 128
// DefaultSendBatchArenaCap is the recommended arena capacity for a
// standalone SendBatch: 128 slots × (udp.MTU + 32) ≈ 1.1 MiB. The +32 covers
// the nebula header + AEAD tag tacked onto each plaintext segment.
const DefaultSendBatchArenaCap = SendBatchCap * (udp.MTU + 32)
// batchWriter is the minimal subset of udp.Conn needed by SendBatch to flush.
type batchWriter interface {
WriteBatch(bufs [][]byte, addrs []netip.AddrPort, outerECNs []byte) error
@@ -27,11 +22,11 @@ type SendBatch struct {
bufs [][]byte
dsts []netip.AddrPort
ecns []byte
arena *Arena
arena *util.Arena
}
// NewSendBatch makes a SendBatch with batchCap slots backed by arena.
func NewSendBatch(out batchWriter, batchCap int, arena *Arena) *SendBatch {
func NewSendBatch(out batchWriter, batchCap int, arena *util.Arena) *SendBatch {
return &SendBatch{
out: out,
bufs: make([][]byte, 0, batchCap),

View File

@@ -3,6 +3,8 @@ package batch
import (
"net/netip"
"testing"
"github.com/slackhq/nebula/util"
)
type fakeBatchWriter struct {
@@ -27,7 +29,7 @@ func (w *fakeBatchWriter) WriteBatch(bufs [][]byte, addrs []netip.AddrPort, ecns
func TestSendBatchReserveCommitFlush(t *testing.T) {
fw := &fakeBatchWriter{}
b := NewSendBatch(fw, 4, NewArena(32))
b := NewSendBatch(fw, 4, util.NewArena(32))
ap := netip.MustParseAddrPort("10.0.0.1:4242")
for i := 0; i < 4; i++ {
@@ -71,7 +73,7 @@ func TestSendBatchReserveCommitFlush(t *testing.T) {
func TestSendBatchSlotsDoNotOverlap(t *testing.T) {
fw := &fakeBatchWriter{}
b := NewSendBatch(fw, 3, NewArena(8))
b := NewSendBatch(fw, 3, util.NewArena(8))
ap := netip.MustParseAddrPort("10.0.0.1:80")
for i := 0; i < 3; i++ {
@@ -93,7 +95,7 @@ func TestSendBatchSlotsDoNotOverlap(t *testing.T) {
func TestSendBatchGrowPreservesCommitted(t *testing.T) {
fw := &fakeBatchWriter{}
// Tiny initial backing forces a grow on the second Reserve.
b := NewSendBatch(fw, 1, NewArena(4))
b := NewSendBatch(fw, 1, util.NewArena(4))
ap := netip.MustParseAddrPort("10.0.0.1:80")
s1 := b.Reserve(4)

View File

@@ -1,12 +0,0 @@
package tio
import "fmt"
// SegmentSuperpacket invokes fn once per segment of pkt.
// This is a stub implementation that does not actually support segmentation
func SegmentSuperpacket(pkt Packet, fn func(seg []byte) error) error {
if pkt.GSO.IsSuperpacket() {
return fmt.Errorf("tio: GSO superpacket on platform without segmentation support")
}
return fn(pkt.Bytes)
}