This commit is contained in:
JackDoan
2026-05-14 13:40:40 -05:00
parent 697294a676
commit c4deb5fc1c
8 changed files with 37 additions and 62 deletions

View File

@@ -11,11 +11,11 @@ import (
"github.com/slackhq/nebula/iputil" "github.com/slackhq/nebula/iputil"
"github.com/slackhq/nebula/noiseutil" "github.com/slackhq/nebula/noiseutil"
"github.com/slackhq/nebula/overlay/batch" "github.com/slackhq/nebula/overlay/batch"
"github.com/slackhq/nebula/overlay/tio"
"github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/routing"
"github.com/slackhq/nebula/wire"
) )
func (f *Interface) consumeInsidePacket(pkt wire.Packet, fwPacket *firewall.Packet, nb []byte, sendBatch batch.TxBatcher, rejectBuf []byte, q int, localCache firewall.ConntrackCache) { func (f *Interface) consumeInsidePacket(pkt wire.TunPacket, fwPacket *firewall.Packet, nb []byte, sendBatch *batch.SendBatch, rejectBuf []byte, q int, localCache firewall.ConntrackCache) {
// pkt.Bytes is either one IP datagram (GSO zero) or a TSO/USO // pkt.Bytes is either one IP datagram (GSO zero) or a TSO/USO
// superpacket. In both cases the L3+L4 headers at the start describe // superpacket. In both cases the L3+L4 headers at the start describe
// the same 5-tuple every segment will share, so a single newPacket / // the same 5-tuple every segment will share, so a single newPacket /
@@ -45,7 +45,7 @@ func (f *Interface) consumeInsidePacket(pkt wire.Packet, fwPacket *firewall.Pack
// routes packets from the Nebula addr to the Nebula addr through the Nebula // routes packets from the Nebula addr to the Nebula addr through the Nebula
// TUN device. // TUN device.
if immediatelyForwardToSelf { if immediatelyForwardToSelf {
err := tio.SegmentSuperpacket(pkt, func(seg []byte) error { err := pkt.PerSegment(func(seg []byte) error {
_, werr := f.readers[q].Write(seg) _, werr := f.readers[q].Write(seg)
return werr return werr
}) })
@@ -67,7 +67,7 @@ func (f *Interface) consumeInsidePacket(pkt wire.Packet, fwPacket *firewall.Pack
// borrowed: SegmentSuperpacket builds each segment in the kernel-supplied pkt // borrowed: SegmentSuperpacket builds each segment in the kernel-supplied pkt
// bytes underneath. cachePacket explicitly copies its argument (handshake_manager.go cachePacket), // bytes underneath. cachePacket explicitly copies its argument (handshake_manager.go cachePacket),
// so retaining segments past the loop is safe. // so retaining segments past the loop is safe.
err := tio.SegmentSuperpacket(pkt, func(seg []byte) error { err := pkt.PerSegment(func(seg []byte) error {
hh.cachePacket(f.l, header.Message, 0, seg, f.sendMessageNow, f.cachedPacketMetrics) hh.cachePacket(f.l, header.Message, 0, seg, f.sendMessageNow, f.cachedPacketMetrics)
return nil return nil
}) })
@@ -139,10 +139,10 @@ func (f *Interface) sendInsideEncrypt(hostinfo *HostInfo, ci *ConnectionState, s
// segment of a TSO/USO superpacket) into the caller's batch slot for // segment of a TSO/USO superpacket) into the caller's batch slot for
// later sendmmsg flush. Segmentation is fused with encryption here so the // later sendmmsg flush. Segmentation is fused with encryption here so the
// kernel-supplied superpacket bytes never get written into a separate // kernel-supplied superpacket bytes never get written into a separate
// scratch arena: SegmentSuperpacket builds each segment's plaintext in // scratch arena: PerSegment builds each segment's plaintext in
// segScratch[:segLen] in turn, and we encrypt directly into a fresh // segScratch[:segLen] in turn, and we encrypt directly into a fresh
// SendBatch slot. // SendBatch slot.
func (f *Interface) sendInsideMessage(hostinfo *HostInfo, pkt tio.Packet, nb []byte, sendBatch batch.TxBatcher, rejectBuf []byte, q int) { func (f *Interface) sendInsideMessage(hostinfo *HostInfo, pkt wire.TunPacket, nb []byte, sendBatch *batch.SendBatch) {
ci := hostinfo.ConnectionState ci := hostinfo.ConnectionState
if ci.eKey == nil { if ci.eKey == nil {
return return
@@ -183,7 +183,7 @@ func (f *Interface) sendInsideMessage(hostinfo *HostInfo, pkt tio.Packet, nb []b
return return
} }
err = tio.SegmentSuperpacket(pkt, func(seg []byte) error { err = pkt.PerSegment(func(seg []byte) error {
//relay header + header + plaintext + AEAD tag (16 bytes for both AES-GCM and ChaCha20-Poly1305) + relay tag //relay header + header + plaintext + AEAD tag (16 bytes for both AES-GCM and ChaCha20-Poly1305) + relay tag
scratch := sendBatch.Reserve(header.Len + header.Len + len(seg) + 16 + 16) scratch := sendBatch.Reserve(header.Len + header.Len + len(seg) + 16 + 16)
@@ -212,7 +212,7 @@ func (f *Interface) sendInsideMessage(hostinfo *HostInfo, pkt tio.Packet, nb []b
return return
} }
err := tio.SegmentSuperpacket(pkt, func(seg []byte) error { err := pkt.PerSegment(func(seg []byte) error {
// header + plaintext + AEAD tag (16 bytes for both AES-GCM and ChaCha20-Poly1305) // header + plaintext + AEAD tag (16 bytes for both AES-GCM and ChaCha20-Poly1305)
scratch := sendBatch.Reserve(header.Len + len(seg) + 16) scratch := sendBatch.Reserve(header.Len + len(seg) + 16)

View File

@@ -289,10 +289,10 @@ func (f *Interface) activate() error {
// is on, everything else (and either lane disabled) falls // is on, everything else (and either lane disabled) falls
// through to passthrough so non-IP / non-TCP-UDP traffic still // through to passthrough so non-IP / non-TCP-UDP traffic still
// reaches the TUN. // reaches the TUN.
arena := batch.NewArena(max(f.batchSize, 1) * 65535) arena := util.NewArena(max(f.batchSize, 1) * 65535)
f.batchers[i] = batch.NewMultiCoalescer(f.readers[i], f.l, arena, caps.TSO, caps.USO) f.batchers[i] = batch.NewMultiCoalescer(f.readers[i], f.l, arena, caps.TSO, caps.USO)
} else { } else {
arena := batch.NewArena(max(f.batchSize, 1) * udp.MTU) arena := util.NewArena(max(f.batchSize, 1) * udp.MTU)
f.batchers[i] = batch.NewPassthrough(f.readers[i], f.batchSize, arena) f.batchers[i] = batch.NewPassthrough(f.readers[i], f.batchSize, arena)
} }
} }
@@ -389,7 +389,7 @@ func (f *Interface) listenIn(reader tio.Queue, q int) {
rejectBuf := make([]byte, mtu) rejectBuf := make([]byte, mtu)
arenaSize := batch.SendBatchCap * (udp.MTU + 32) arenaSize := batch.SendBatchCap * (udp.MTU + 32)
sb := batch.NewSendBatch(f.writers[q], batch.SendBatchCap, arenaSize) sb := batch.NewSendBatch(f.writers[q], batch.SendBatchCap, util.NewArena(arenaSize))
fwPacket := &firewall.Packet{} fwPacket := &firewall.Packet{}
nb := make([]byte, 12, 12) nb := make([]byte, 12, 12)

View File

@@ -1,28 +0,0 @@
package batch
import "net/netip"
type RxBatcher interface {
// Reserve creates a pkt to borrow
Reserve(sz int) []byte
// Commit borrows pkt. The caller must keep pkt valid until the next Flush
Commit(pkt []byte) error
// Flush emits every queued packet in arrival order. Returns the
// first error observed; keeps draining so one bad packet doesn't hold up
// the rest. After Flush returns, borrowed payload slices may be recycled.
Flush() error
}
type TxBatcher interface {
// Reserve creates a pkt to borrow
Reserve(sz int) []byte
// Commit borrows pkt and records its destination plus the 2-bit
// IP-level ECN codepoint to set on the outer (carrier) header. The
// caller must keep pkt valid until the next Flush. Pass 0 (Not-ECT)
// to leave the outer ECN field unset.
Commit(pkt []byte, dst netip.AddrPort, outerECN byte)
// Flush emits every queued packet via the underlying batch writer in
// arrival order. Returns an errors.Join of one or more errors. After Flush returns,
// borrowed payload slices may be recycled.
Flush() error
}

12
overlay/batch/rx_batch.go Normal file
View File

@@ -0,0 +1,12 @@
package batch
type RxBatcher interface {
// Reserve creates a pkt to borrow
Reserve(sz int) []byte
// Commit borrows pkt. The caller must keep pkt valid until the next Flush
Commit(pkt []byte) error
// Flush emits every queued packet in arrival order. Returns the
// first error observed; keeps draining so one bad packet doesn't hold up
// the rest. After Flush returns, borrowed payload slices may be recycled.
Flush() error
}

View File

@@ -3,16 +3,11 @@ package batch
import ( import (
"net/netip" "net/netip"
"github.com/slackhq/nebula/udp" "github.com/slackhq/nebula/util"
) )
const SendBatchCap = 128 const SendBatchCap = 128
// DefaultSendBatchArenaCap is the recommended arena capacity for a
// standalone SendBatch: 128 slots × (udp.MTU + 32) ≈ 1.1 MiB. The +32 covers
// the nebula header + AEAD tag tacked onto each plaintext segment.
const DefaultSendBatchArenaCap = SendBatchCap * (udp.MTU + 32)
// batchWriter is the minimal subset of udp.Conn needed by SendBatch to flush. // batchWriter is the minimal subset of udp.Conn needed by SendBatch to flush.
type batchWriter interface { type batchWriter interface {
WriteBatch(bufs [][]byte, addrs []netip.AddrPort, outerECNs []byte) error WriteBatch(bufs [][]byte, addrs []netip.AddrPort, outerECNs []byte) error
@@ -27,11 +22,11 @@ type SendBatch struct {
bufs [][]byte bufs [][]byte
dsts []netip.AddrPort dsts []netip.AddrPort
ecns []byte ecns []byte
arena *Arena arena *util.Arena
} }
// NewSendBatch makes a SendBatch with batchCap slots backed by arena. // NewSendBatch makes a SendBatch with batchCap slots backed by arena.
func NewSendBatch(out batchWriter, batchCap int, arena *Arena) *SendBatch { func NewSendBatch(out batchWriter, batchCap int, arena *util.Arena) *SendBatch {
return &SendBatch{ return &SendBatch{
out: out, out: out,
bufs: make([][]byte, 0, batchCap), bufs: make([][]byte, 0, batchCap),

View File

@@ -3,6 +3,8 @@ package batch
import ( import (
"net/netip" "net/netip"
"testing" "testing"
"github.com/slackhq/nebula/util"
) )
type fakeBatchWriter struct { type fakeBatchWriter struct {
@@ -27,7 +29,7 @@ func (w *fakeBatchWriter) WriteBatch(bufs [][]byte, addrs []netip.AddrPort, ecns
func TestSendBatchReserveCommitFlush(t *testing.T) { func TestSendBatchReserveCommitFlush(t *testing.T) {
fw := &fakeBatchWriter{} fw := &fakeBatchWriter{}
b := NewSendBatch(fw, 4, NewArena(32)) b := NewSendBatch(fw, 4, util.NewArena(32))
ap := netip.MustParseAddrPort("10.0.0.1:4242") ap := netip.MustParseAddrPort("10.0.0.1:4242")
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
@@ -71,7 +73,7 @@ func TestSendBatchReserveCommitFlush(t *testing.T) {
func TestSendBatchSlotsDoNotOverlap(t *testing.T) { func TestSendBatchSlotsDoNotOverlap(t *testing.T) {
fw := &fakeBatchWriter{} fw := &fakeBatchWriter{}
b := NewSendBatch(fw, 3, NewArena(8)) b := NewSendBatch(fw, 3, util.NewArena(8))
ap := netip.MustParseAddrPort("10.0.0.1:80") ap := netip.MustParseAddrPort("10.0.0.1:80")
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
@@ -93,7 +95,7 @@ func TestSendBatchSlotsDoNotOverlap(t *testing.T) {
func TestSendBatchGrowPreservesCommitted(t *testing.T) { func TestSendBatchGrowPreservesCommitted(t *testing.T) {
fw := &fakeBatchWriter{} fw := &fakeBatchWriter{}
// Tiny initial backing forces a grow on the second Reserve. // Tiny initial backing forces a grow on the second Reserve.
b := NewSendBatch(fw, 1, NewArena(4)) b := NewSendBatch(fw, 1, util.NewArena(4))
ap := netip.MustParseAddrPort("10.0.0.1:80") ap := netip.MustParseAddrPort("10.0.0.1:80")
s1 := b.Reserve(4) s1 := b.Reserve(4)

View File

@@ -1,12 +0,0 @@
package tio
import "fmt"
// SegmentSuperpacket invokes fn once per segment of pkt.
// This is a stub implementation that does not actually support segmentation
func SegmentSuperpacket(pkt Packet, fn func(seg []byte) error) error {
if pkt.GSO.IsSuperpacket() {
return fmt.Errorf("tio: GSO superpacket on platform without segmentation support")
}
return fn(pkt.Bytes)
}

View File

@@ -9,3 +9,9 @@ type TunPacket struct {
// Fields in Meta should be as portable/platform-agnostic as possible. // Fields in Meta should be as portable/platform-agnostic as possible.
Meta struct{} Meta struct{}
} }
// PerSegment invokes fn once per segment of pkt.
// This is a stub implementation that does not actually support segmentation
func (t *TunPacket) PerSegment(fn func(seg []byte) error) error {
return fn(t.Bytes)
}