Files
nebula/overlay/batch/multi_coalesce.go
T
JackDoan c610b712af it works
2026-05-22 08:52:37 -05:00

137 lines
4.4 KiB
Go

package batch
import (
"errors"
"log/slog"
"github.com/slackhq/nebula/overlay/tio"
"github.com/slackhq/nebula/util"
)
// MultiCoalescer fans plaintext packets out to lane-specific batchers based
// on the IP/L4 protocol of the packet, sharing a single Reserve arena
// across lanes so the caller's allocation pattern is unchanged.
//
// Lanes are processed independently: the TCP coalescer only sees TCP, the
// UDP coalescer only sees UDP, and the passthrough lane handles everything
// else. Per-flow arrival order is preserved because a single 5-tuple only
// ever lands in one lane and each lane preserves its own slot order.
//
// Cross-lane order is NOT preserved across the TCP/UDP/passthrough split.
// This is acceptable because the carrier-side recvmmsg path already
// stable-sorts by (peer, message counter) before delivering plaintext
// here, so replay-window invariants are unaffected, and apps observe
// correct per-flow ordering — which is all the IP layer guarantees anyway.
// Do not "fix" this by interleaving lane outputs at flush time; that
// negates the entire point of coalescing (each lane needs to see runs of
// adjacent same-flow packets to coalesce them).
type MultiCoalescer struct {
tcp *TCPCoalescer
udp *UDPCoalescer
pt *Passthrough
// arena is shared across every lane (constructor hands the same
// *Arena to TCP, UDP, and Passthrough), so there's exactly one
// backing slab per MultiCoalescer instance. Each lane's Flush calls
// Reset; the resets are idempotent because Multi.Flush drains lanes
// sequentially and never Reserves in between, so a later lane's
// slots stay readable across an earlier lane's Reset (the underlying
// bytes are still alive — Reset only re-slices len to 0).
arena *util.Arena
}
// NewMultiCoalescer builds a multi-lane batcher. tcpEnabled lets the caller
// opt out of TCP coalescing (e.g. when the queue can't do TSO); udpEnabled
// likewise gates UDP coalescing (only enable when USO was negotiated).
// Either lane disabled redirects its traffic into the passthrough lane.
// arena is the single backing slab shared across every lane; the caller
// pre-sizes it via NewArena so the hot path never allocates.
func NewMultiCoalescer(w tio.Queue, l *slog.Logger, arena *util.Arena, tcpEnabled, udpEnabled bool) *MultiCoalescer {
m := &MultiCoalescer{
pt: NewPassthrough(w, initialSlots, arena),
arena: arena,
}
if tcpEnabled {
m.tcp = NewTCPCoalescer(w, l, arena)
}
if udpEnabled {
m.udp = NewUDPCoalescer(w, arena)
}
return m
}
func (m *MultiCoalescer) Reserve(sz int) []byte {
return m.arena.Reserve(sz)
}
// Commit dispatches pkt to the appropriate lane based on IP version + L4
// proto. Borrowed slice contract is identical to the single-lane batchers,
// pkt must remain valid until the next Flush.
//
// On the success path the IP/TCP-or-UDP parse happens here once and the
// parsed struct is handed to the lane via commitParsed so the lane doesn't
// re-walk the header.
func (m *MultiCoalescer) Commit(pkt []byte) error {
if len(pkt) < 20 {
return m.pt.Commit(pkt)
}
v := pkt[0] >> 4
var proto byte
switch v {
case 4:
proto = pkt[9]
case 6:
if len(pkt) < 40 {
return m.pt.Commit(pkt)
}
proto = pkt[6]
default:
return m.pt.Commit(pkt)
}
switch proto {
case ipProtoTCP:
if m.tcp != nil {
info, ok := parseTCPBase(pkt)
if !ok {
// Malformed/unsupported TCP shape (IP options, fragments, ...).
// Handle this via passthrough support in the TCP coalescer, to attempt to preserve flow order.
m.tcp.addPassthrough(pkt)
return nil
}
return m.tcp.commitParsed(pkt, info)
}
case ipProtoUDP:
if m.udp != nil {
info, ok := parseUDP(pkt)
if !ok {
m.udp.addPassthrough(pkt) //we could also m.pt.Commit() here I guess?
return nil
}
return m.udp.commitParsed(pkt, info)
}
}
return m.pt.Commit(pkt)
}
// Flush drains every lane in a fixed order: TCP, UDP, passthrough. Errors
// from a lane do not stop subsequent lanes from flushing, we keep
// draining and return the first observed error so a single bad packet
// doesn't strand the others.
func (m *MultiCoalescer) Flush() error {
var errs []error
if m.tcp != nil {
if err := m.tcp.Flush(); err != nil {
errs = append(errs, err)
}
}
if m.udp != nil {
if err := m.udp.Flush(); err != nil {
errs = append(errs, err)
}
}
if err := m.pt.Flush(); err != nil {
errs = append(errs, err)
}
return errors.Join(errs...)
}