Files
nebula/overlay/batch/multi_coalesce.go
2026-05-14 15:56:34 -05:00

142 lines
4.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package batch
import (
"errors"
"log/slog"
"github.com/slackhq/nebula/overlay/tio"
"github.com/slackhq/nebula/util"
)
// MultiCoalescer fans plaintext packets out to lane-specific batchers based
// on the IP/L4 protocol of the packet, sharing a single Reserve arena
// across lanes so the caller's allocation pattern is unchanged.
//
// Lanes are processed independently: the TCP coalescer only sees TCP, the
// UDP coalescer only sees UDP, and the passthrough lane handles everything
// else. Per-flow arrival order is preserved because a single 5-tuple only
// ever lands in one lane and each lane preserves its own slot order.
//
// Cross-lane order is NOT preserved across the TCP/UDP/passthrough split.
// This is acceptable because the carrier-side recvmmsg path already
// stable-sorts by (peer, message counter) before delivering plaintext
// here, so replay-window invariants are unaffected, and apps observe
// correct per-flow ordering — which is all the IP layer guarantees anyway.
// Do not "fix" this by interleaving lane outputs at flush time; that
// negates the entire point of coalescing (each lane needs to see runs of
// adjacent same-flow packets to coalesce them).
type MultiCoalescer struct {
tcp *TCPCoalescer
udp *UDPCoalescer
pt *Passthrough
// arena is shared across every lane (constructor hands the same
// *Arena to TCP, UDP, and Passthrough), so there's exactly one
// backing slab per MultiCoalescer instance. Each lane's Flush calls
// Reset; the resets are idempotent because Multi.Flush drains lanes
// sequentially and never Reserves in between, so a later lane's
// slots stay readable across an earlier lane's Reset (the underlying
// bytes are still alive — Reset only re-slices len to 0).
arena *util.Arena
}
// DefaultMultiArenaCap is the recommended arena capacity for a Multi-lane
// batcher: 64 slots × 65535 bytes ≈ 4 MiB, enough to hold one recvmmsg
// burst worth of MTU-sized packets without the arena growing.
const DefaultMultiArenaCap = initialSlots * 65535
// NewMultiCoalescer builds a multi-lane batcher. tcpEnabled lets the caller
// opt out of TCP coalescing (e.g. when the queue can't do TSO); udpEnabled
// likewise gates UDP coalescing (only enable when USO was negotiated).
// Either lane disabled redirects its traffic into the passthrough lane.
// arena is the single backing slab shared across every lane; the caller
// pre-sizes it via NewArena so the hot path never allocates.
func NewMultiCoalescer(w tio.Queue, l *slog.Logger, arena *util.Arena, tcpEnabled, udpEnabled bool) *MultiCoalescer {
m := &MultiCoalescer{
pt: NewPassthrough(w, initialSlots, arena),
arena: arena,
}
if tcpEnabled {
m.tcp = NewTCPCoalescer(w, l, arena)
}
if udpEnabled {
m.udp = NewUDPCoalescer(w, arena)
}
return m
}
func (m *MultiCoalescer) Reserve(sz int) []byte {
return m.arena.Reserve(sz)
}
// Commit dispatches pkt to the appropriate lane based on IP version + L4
// proto. Borrowed slice contract is identical to the single-lane batchers,
// pkt must remain valid until the next Flush.
//
// On the success path the IP/TCP-or-UDP parse happens here once and the
// parsed struct is handed to the lane via commitParsed so the lane doesn't
// re-walk the header.
func (m *MultiCoalescer) Commit(pkt []byte) error {
if len(pkt) < 20 {
return m.pt.Commit(pkt)
}
v := pkt[0] >> 4
var proto byte
switch v {
case 4:
proto = pkt[9]
case 6:
if len(pkt) < 40 {
return m.pt.Commit(pkt)
}
proto = pkt[6]
default:
return m.pt.Commit(pkt)
}
switch proto {
case ipProtoTCP:
if m.tcp != nil {
info, ok := parseTCPBase(pkt)
if !ok {
// Malformed/unsupported TCP shape (IP options, fragments, ...).
// Handle this via passthrough support in the TCP coalescer, to attempt to preserve flow order.
m.tcp.addPassthrough(pkt)
return nil
}
return m.tcp.commitParsed(pkt, info)
}
case ipProtoUDP:
if m.udp != nil {
info, ok := parseUDP(pkt)
if !ok {
m.udp.addPassthrough(pkt) //we could also m.pt.Commit() here I guess?
return nil
}
return m.udp.commitParsed(pkt, info)
}
}
return m.pt.Commit(pkt)
}
// Flush drains every lane in a fixed order: TCP, UDP, passthrough. Errors
// from a lane do not stop subsequent lanes from flushing, we keep
// draining and return the first observed error so a single bad packet
// doesn't strand the others.
func (m *MultiCoalescer) Flush() error {
var errs []error
if m.tcp != nil {
if err := m.tcp.Flush(); err != nil {
errs = append(errs, err)
}
}
if m.udp != nil {
if err := m.udp.Flush(); err != nil {
errs = append(errs, err)
}
}
if err := m.pt.Flush(); err != nil {
errs = append(errs, err)
}
return errors.Join(errs...)
}