mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-16 04:47:38 +02:00
142 lines
4.6 KiB
Go
142 lines
4.6 KiB
Go
package batch
|
||
|
||
import (
|
||
"errors"
|
||
"log/slog"
|
||
|
||
"github.com/slackhq/nebula/overlay/tio"
|
||
"github.com/slackhq/nebula/util"
|
||
)
|
||
|
||
// MultiCoalescer fans plaintext packets out to lane-specific batchers based
|
||
// on the IP/L4 protocol of the packet, sharing a single Reserve arena
|
||
// across lanes so the caller's allocation pattern is unchanged.
|
||
//
|
||
// Lanes are processed independently: the TCP coalescer only sees TCP, the
|
||
// UDP coalescer only sees UDP, and the passthrough lane handles everything
|
||
// else. Per-flow arrival order is preserved because a single 5-tuple only
|
||
// ever lands in one lane and each lane preserves its own slot order.
|
||
//
|
||
// Cross-lane order is NOT preserved across the TCP/UDP/passthrough split.
|
||
// This is acceptable because the carrier-side recvmmsg path already
|
||
// stable-sorts by (peer, message counter) before delivering plaintext
|
||
// here, so replay-window invariants are unaffected, and apps observe
|
||
// correct per-flow ordering — which is all the IP layer guarantees anyway.
|
||
// Do not "fix" this by interleaving lane outputs at flush time; that
|
||
// negates the entire point of coalescing (each lane needs to see runs of
|
||
// adjacent same-flow packets to coalesce them).
|
||
type MultiCoalescer struct {
|
||
tcp *TCPCoalescer
|
||
udp *UDPCoalescer
|
||
pt *Passthrough
|
||
|
||
// arena is shared across every lane (constructor hands the same
|
||
// *Arena to TCP, UDP, and Passthrough), so there's exactly one
|
||
// backing slab per MultiCoalescer instance. Each lane's Flush calls
|
||
// Reset; the resets are idempotent because Multi.Flush drains lanes
|
||
// sequentially and never Reserves in between, so a later lane's
|
||
// slots stay readable across an earlier lane's Reset (the underlying
|
||
// bytes are still alive — Reset only re-slices len to 0).
|
||
arena *util.Arena
|
||
}
|
||
|
||
// DefaultMultiArenaCap is the recommended arena capacity for a Multi-lane
|
||
// batcher: 64 slots × 65535 bytes ≈ 4 MiB, enough to hold one recvmmsg
|
||
// burst worth of MTU-sized packets without the arena growing.
|
||
const DefaultMultiArenaCap = initialSlots * 65535
|
||
|
||
// NewMultiCoalescer builds a multi-lane batcher. tcpEnabled lets the caller
|
||
// opt out of TCP coalescing (e.g. when the queue can't do TSO); udpEnabled
|
||
// likewise gates UDP coalescing (only enable when USO was negotiated).
|
||
// Either lane disabled redirects its traffic into the passthrough lane.
|
||
// arena is the single backing slab shared across every lane; the caller
|
||
// pre-sizes it via NewArena so the hot path never allocates.
|
||
func NewMultiCoalescer(w tio.Queue, l *slog.Logger, arena *util.Arena, tcpEnabled, udpEnabled bool) *MultiCoalescer {
|
||
m := &MultiCoalescer{
|
||
pt: NewPassthrough(w, initialSlots, arena),
|
||
arena: arena,
|
||
}
|
||
if tcpEnabled {
|
||
m.tcp = NewTCPCoalescer(w, l, arena)
|
||
}
|
||
if udpEnabled {
|
||
m.udp = NewUDPCoalescer(w, arena)
|
||
}
|
||
return m
|
||
}
|
||
|
||
func (m *MultiCoalescer) Reserve(sz int) []byte {
|
||
return m.arena.Reserve(sz)
|
||
}
|
||
|
||
// Commit dispatches pkt to the appropriate lane based on IP version + L4
|
||
// proto. Borrowed slice contract is identical to the single-lane batchers,
|
||
// pkt must remain valid until the next Flush.
|
||
//
|
||
// On the success path the IP/TCP-or-UDP parse happens here once and the
|
||
// parsed struct is handed to the lane via commitParsed so the lane doesn't
|
||
// re-walk the header.
|
||
func (m *MultiCoalescer) Commit(pkt []byte) error {
|
||
if len(pkt) < 20 {
|
||
return m.pt.Commit(pkt)
|
||
}
|
||
v := pkt[0] >> 4
|
||
var proto byte
|
||
switch v {
|
||
case 4:
|
||
proto = pkt[9]
|
||
case 6:
|
||
if len(pkt) < 40 {
|
||
return m.pt.Commit(pkt)
|
||
}
|
||
proto = pkt[6]
|
||
default:
|
||
return m.pt.Commit(pkt)
|
||
}
|
||
switch proto {
|
||
case ipProtoTCP:
|
||
if m.tcp != nil {
|
||
info, ok := parseTCPBase(pkt)
|
||
if !ok {
|
||
// Malformed/unsupported TCP shape (IP options, fragments, ...).
|
||
// Handle this via passthrough support in the TCP coalescer, to attempt to preserve flow order.
|
||
m.tcp.addPassthrough(pkt)
|
||
return nil
|
||
}
|
||
return m.tcp.commitParsed(pkt, info)
|
||
}
|
||
case ipProtoUDP:
|
||
if m.udp != nil {
|
||
info, ok := parseUDP(pkt)
|
||
if !ok {
|
||
m.udp.addPassthrough(pkt) //we could also m.pt.Commit() here I guess?
|
||
return nil
|
||
}
|
||
return m.udp.commitParsed(pkt, info)
|
||
}
|
||
}
|
||
return m.pt.Commit(pkt)
|
||
}
|
||
|
||
// Flush drains every lane in a fixed order: TCP, UDP, passthrough. Errors
|
||
// from a lane do not stop subsequent lanes from flushing, we keep
|
||
// draining and return the first observed error so a single bad packet
|
||
// doesn't strand the others.
|
||
func (m *MultiCoalescer) Flush() error {
|
||
var errs []error
|
||
if m.tcp != nil {
|
||
if err := m.tcp.Flush(); err != nil {
|
||
errs = append(errs, err)
|
||
}
|
||
}
|
||
if m.udp != nil {
|
||
if err := m.udp.Flush(); err != nil {
|
||
errs = append(errs, err)
|
||
}
|
||
}
|
||
if err := m.pt.Flush(); err != nil {
|
||
errs = append(errs, err)
|
||
}
|
||
return errors.Join(errs...)
|
||
}
|