mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-16 04:47:38 +02:00
GSO/GRO offloads, with TCP+ECN and UDP support
This commit is contained in:
@@ -14,20 +14,15 @@ type RxBatcher interface {
|
||||
}
|
||||
|
||||
type TxBatcher interface {
|
||||
// Next returns a zero-length slice with slotCap capacity over the next unused
|
||||
// slot's backing bytes. The caller writes into the returned slice and then
|
||||
// calls Commit with the final length and destination. Next returns nil when
|
||||
// the batch is full.
|
||||
Next() []byte
|
||||
// Commit records the slot just returned by Next as a packet of length n
|
||||
// destined for dst.
|
||||
Commit(n int, dst netip.AddrPort)
|
||||
// Reset clears committed slots; backing storage is retained for reuse.
|
||||
Reset()
|
||||
// Len returns the number of committed packets.
|
||||
Len() int
|
||||
// Cap returns the maximum number of slots in the batch.
|
||||
Cap() int
|
||||
// Get returns the buffers needed to send the batch
|
||||
Get() ([][]byte, []netip.AddrPort)
|
||||
// Reserve creates a pkt to borrow
|
||||
Reserve(sz int) []byte
|
||||
// Commit borrows pkt and records its destination plus the 2-bit
|
||||
// IP-level ECN codepoint to set on the outer (carrier) header. The
|
||||
// caller must keep pkt valid until the next Flush. Pass 0 (Not-ECT)
|
||||
// to leave the outer ECN field unset.
|
||||
Commit(pkt []byte, dst netip.AddrPort, outerECN byte)
|
||||
// Flush emits every queued packet via the underlying batch writer in
|
||||
// arrival order. Returns the first error observed. After Flush returns,
|
||||
// borrowed payload slices may be recycled.
|
||||
Flush() error
|
||||
}
|
||||
|
||||
163
overlay/batch/coalesce_core.go
Normal file
163
overlay/batch/coalesce_core.go
Normal file
@@ -0,0 +1,163 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
)
|
||||
|
||||
// flowKey identifies a transport flow by {src, dst, sport, dport, family}.
|
||||
// Comparable, so map lookups and linear scans over the slot list stay tight.
|
||||
// Shared by the TCP and UDP coalescers; each coalescer keeps its own
|
||||
// openSlots map, so a TCP and UDP flow on the same 5-tuple-without-proto
|
||||
// never alias.
|
||||
type flowKey struct {
|
||||
src, dst [16]byte
|
||||
sport, dport uint16
|
||||
isV6 bool
|
||||
}
|
||||
|
||||
// initialSlots is the starting capacity of the slot pool. One flow per
|
||||
// packet is the worst case so this matches a typical carrier-side
|
||||
// recvmmsg batch on the encrypted UDP socket.
|
||||
const initialSlots = 64
|
||||
|
||||
// parsedIP is the IP-level result of parseIPPrologue. The caller layers
|
||||
// L4-specific parsing (TCP / UDP) on top.
|
||||
type parsedIP struct {
|
||||
fk flowKey
|
||||
ipHdrLen int
|
||||
// pkt is the original buffer trimmed to the IP-declared total length.
|
||||
// Anything below the IP layer (transport parsers) should slice into
|
||||
// pkt rather than the unbounded original.
|
||||
pkt []byte
|
||||
}
|
||||
|
||||
// parseIPPrologue extracts the IP-level fields the coalescers care about:
|
||||
// IHL/payload length, version, src/dst addresses, and the L4 protocol byte.
|
||||
// Returns ok=false for malformed input, IPv4 with options or fragmentation,
|
||||
// or IPv6 with extension headers (all rejected by both coalescers in
|
||||
// identical ways before this refactor).
|
||||
//
|
||||
// On success, p.pkt is len-trimmed to the IP-declared length so callers
|
||||
// don't have to repeat the trim. wantProto is the IANA protocol number to
|
||||
// require (6 for TCP, 17 for UDP); ok=false for any other value.
|
||||
func parseIPPrologue(pkt []byte, wantProto byte) (parsedIP, bool) {
|
||||
var p parsedIP
|
||||
if len(pkt) < 20 {
|
||||
return p, false
|
||||
}
|
||||
v := pkt[0] >> 4
|
||||
switch v {
|
||||
case 4:
|
||||
ihl := int(pkt[0]&0x0f) * 4
|
||||
if ihl != 20 {
|
||||
return p, false
|
||||
}
|
||||
if pkt[9] != wantProto {
|
||||
return p, false
|
||||
}
|
||||
// Reject actual fragmentation (MF or non-zero frag offset).
|
||||
if binary.BigEndian.Uint16(pkt[6:8])&0x3fff != 0 {
|
||||
return p, false
|
||||
}
|
||||
totalLen := int(binary.BigEndian.Uint16(pkt[2:4]))
|
||||
if totalLen > len(pkt) || totalLen < ihl {
|
||||
return p, false
|
||||
}
|
||||
p.ipHdrLen = 20
|
||||
p.fk.isV6 = false
|
||||
copy(p.fk.src[:4], pkt[12:16])
|
||||
copy(p.fk.dst[:4], pkt[16:20])
|
||||
p.pkt = pkt[:totalLen]
|
||||
case 6:
|
||||
if len(pkt) < 40 {
|
||||
return p, false
|
||||
}
|
||||
if pkt[6] != wantProto {
|
||||
return p, false
|
||||
}
|
||||
payloadLen := int(binary.BigEndian.Uint16(pkt[4:6]))
|
||||
if 40+payloadLen > len(pkt) {
|
||||
return p, false
|
||||
}
|
||||
p.ipHdrLen = 40
|
||||
p.fk.isV6 = true
|
||||
copy(p.fk.src[:], pkt[8:24])
|
||||
copy(p.fk.dst[:], pkt[24:40])
|
||||
p.pkt = pkt[:40+payloadLen]
|
||||
default:
|
||||
return p, false
|
||||
}
|
||||
return p, true
|
||||
}
|
||||
|
||||
// ipHeadersMatch compares the IP portion of two packet header prefixes for
|
||||
// byte-for-byte equality on every field that must be identical across
|
||||
// coalesced segments. Size/IPID/IPCsum and the 2-bit IP-level ECN field are
|
||||
// masked out — the appendPayload step merges CE into the seed.
|
||||
//
|
||||
// The transport (L4) portion of the header is checked separately by the
|
||||
// per-protocol matcher.
|
||||
func ipHeadersMatch(a, b []byte, isV6 bool) bool {
|
||||
if isV6 {
|
||||
// IPv6: byte 0 = version/TC[7:4], byte 1 = TC[3:0]/flow[19:16],
|
||||
// bytes [2:4] = flow[15:0], [6:8] = next_hdr/hop, [8:40] = src+dst.
|
||||
// ECN lives in TC[1:0] = byte 1 mask 0x30. Skip [4:6] payload_len.
|
||||
if a[0] != b[0] {
|
||||
return false
|
||||
}
|
||||
if a[1]&^0x30 != b[1]&^0x30 {
|
||||
return false
|
||||
}
|
||||
if !bytes.Equal(a[2:4], b[2:4]) {
|
||||
return false
|
||||
}
|
||||
if !bytes.Equal(a[6:40], b[6:40]) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
// IPv4: byte 0 = version/IHL, byte 1 = DSCP(6)|ECN(2),
|
||||
// [6:10] flags/fragoff/TTL/proto, [12:20] src+dst.
|
||||
// Skip [2:4] total len, [4:6] id, [10:12] csum.
|
||||
if a[0] != b[0] {
|
||||
return false
|
||||
}
|
||||
if a[1]&^0x03 != b[1]&^0x03 {
|
||||
return false
|
||||
}
|
||||
if !bytes.Equal(a[6:10], b[6:10]) {
|
||||
return false
|
||||
}
|
||||
if !bytes.Equal(a[12:20], b[12:20]) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// mergeECNIntoSeed ORs the 2-bit IP-level ECN field of pkt's IP header
|
||||
// onto the seed's IP header, so a CE mark on any coalesced segment
|
||||
// propagates to the final superpacket. (CE is 0b11; ORing yields CE if
|
||||
// any segment carried it.) Used by both TCP and UDP coalescers, so the
|
||||
// invariant lives in one place.
|
||||
func mergeECNIntoSeed(seedHdr, pktHdr []byte, isV6 bool) {
|
||||
if isV6 {
|
||||
seedHdr[1] |= pktHdr[1] & 0x30
|
||||
} else {
|
||||
seedHdr[1] |= pktHdr[1] & 0x03
|
||||
}
|
||||
}
|
||||
|
||||
// reserveFromBacking implements the Reserve half of the RxBatcher contract
|
||||
// shared by TCP and UDP coalescers. The backing slice grows on demand;
|
||||
// already-committed slices reference the old array and remain valid until
|
||||
// Flush resets backing.
|
||||
func reserveFromBacking(backing *[]byte, sz int) []byte {
|
||||
if len(*backing)+sz > cap(*backing) {
|
||||
newCap := max(cap(*backing)*2, sz)
|
||||
*backing = make([]byte, 0, newCap)
|
||||
}
|
||||
start := len(*backing)
|
||||
*backing = (*backing)[:start+sz]
|
||||
return (*backing)[start : start+sz : start+sz]
|
||||
}
|
||||
133
overlay/batch/multi_coalesce.go
Normal file
133
overlay/batch/multi_coalesce.go
Normal file
@@ -0,0 +1,133 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
// MultiCoalescer fans plaintext packets out to lane-specific batchers based
|
||||
// on the IP/L4 protocol of the packet, sharing a single Reserve arena
|
||||
// across lanes so the caller's allocation pattern is unchanged.
|
||||
//
|
||||
// Lanes are processed independently: the TCP coalescer only sees TCP, the
|
||||
// UDP coalescer only sees UDP, and the passthrough lane handles everything
|
||||
// else. Per-flow arrival order is preserved because a single 5-tuple only
|
||||
// ever lands in one lane and each lane preserves its own slot order.
|
||||
//
|
||||
// Cross-lane order is NOT preserved across the TCP/UDP/passthrough split.
|
||||
// This is acceptable because the carrier-side recvmmsg path already
|
||||
// stable-sorts by (peer, message counter) before delivering plaintext
|
||||
// here, so replay-window invariants are unaffected, and apps observe
|
||||
// correct per-flow ordering — which is all the IP layer guarantees anyway.
|
||||
// Do not "fix" this by interleaving lane outputs at flush time; that
|
||||
// negates the entire point of coalescing (each lane needs to see runs of
|
||||
// adjacent same-flow packets to coalesce them).
|
||||
type MultiCoalescer struct {
|
||||
tcp *TCPCoalescer
|
||||
udp *UDPCoalescer
|
||||
pt *Passthrough
|
||||
|
||||
// arena shared across all lanes so a single Reserve grows one backing
|
||||
// slice; lane Commit calls borrow into this same arena.
|
||||
backing []byte
|
||||
}
|
||||
|
||||
// NewMultiCoalescer builds a multi-lane batcher. tcpEnabled lets the caller
|
||||
// opt out of TCP coalescing (e.g. when the queue can't do TSO); udpEnabled
|
||||
// likewise gates UDP coalescing (only enable when USO was negotiated).
|
||||
// Either lane disabled redirects its traffic into the passthrough lane.
|
||||
func NewMultiCoalescer(w io.Writer, tcpEnabled, udpEnabled bool) *MultiCoalescer {
|
||||
m := &MultiCoalescer{
|
||||
pt: NewPassthrough(w),
|
||||
backing: make([]byte, 0, initialSlots*65535),
|
||||
}
|
||||
if tcpEnabled {
|
||||
m.tcp = NewTCPCoalescer(w)
|
||||
}
|
||||
if udpEnabled {
|
||||
m.udp = NewUDPCoalescer(w)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *MultiCoalescer) Reserve(sz int) []byte {
|
||||
if len(m.backing)+sz > cap(m.backing) {
|
||||
newCap := max(cap(m.backing)*2, sz)
|
||||
m.backing = make([]byte, 0, newCap)
|
||||
}
|
||||
start := len(m.backing)
|
||||
m.backing = m.backing[:start+sz]
|
||||
return m.backing[start : start+sz : start+sz]
|
||||
}
|
||||
|
||||
// Commit dispatches pkt to the appropriate lane based on IP version + L4
|
||||
// proto. Borrowed slice contract is identical to the single-lane batchers
|
||||
// — pkt must remain valid until the next Flush.
|
||||
//
|
||||
// On the success path the IP/TCP-or-UDP parse happens here once and the
|
||||
// parsed struct is handed to the lane via commitParsed so the lane doesn't
|
||||
// re-walk the header. On a parse failure we fall through to the lane's
|
||||
// public Commit, which re-runs the parse before passthrough — that path
|
||||
// only fires for malformed/unsupported packets so the duplicated parse is
|
||||
// not on the hot path. The lane's public Commit still works for direct
|
||||
// callers.
|
||||
func (m *MultiCoalescer) Commit(pkt []byte) error {
|
||||
if len(pkt) < 20 {
|
||||
return m.pt.Commit(pkt)
|
||||
}
|
||||
v := pkt[0] >> 4
|
||||
var proto byte
|
||||
switch v {
|
||||
case 4:
|
||||
proto = pkt[9]
|
||||
case 6:
|
||||
if len(pkt) < 40 {
|
||||
return m.pt.Commit(pkt)
|
||||
}
|
||||
proto = pkt[6]
|
||||
default:
|
||||
return m.pt.Commit(pkt)
|
||||
}
|
||||
switch proto {
|
||||
case ipProtoTCP:
|
||||
if m.tcp != nil {
|
||||
info, ok := parseTCPBase(pkt)
|
||||
if !ok {
|
||||
// Malformed/unsupported TCP shape (IP options, fragments, ...)
|
||||
// — the TCP lane handles this as passthrough.
|
||||
return m.tcp.Commit(pkt)
|
||||
}
|
||||
return m.tcp.commitParsed(pkt, info)
|
||||
}
|
||||
case ipProtoUDP:
|
||||
if m.udp != nil {
|
||||
info, ok := parseUDP(pkt)
|
||||
if !ok {
|
||||
return m.udp.Commit(pkt)
|
||||
}
|
||||
return m.udp.commitParsed(pkt, info)
|
||||
}
|
||||
}
|
||||
return m.pt.Commit(pkt)
|
||||
}
|
||||
|
||||
// Flush drains every lane in a fixed order: TCP, UDP, passthrough. Errors
|
||||
// from a lane do not stop subsequent lanes from flushing — we keep
|
||||
// draining and return the first observed error so a single bad packet
|
||||
// doesn't strand the others.
|
||||
func (m *MultiCoalescer) Flush() error {
|
||||
var first error
|
||||
keep := func(err error) {
|
||||
if err != nil && first == nil {
|
||||
first = err
|
||||
}
|
||||
}
|
||||
if m.tcp != nil {
|
||||
keep(m.tcp.Flush())
|
||||
}
|
||||
if m.udp != nil {
|
||||
keep(m.udp.Flush())
|
||||
}
|
||||
keep(m.pt.Flush())
|
||||
m.backing = m.backing[:0]
|
||||
return first
|
||||
}
|
||||
94
overlay/batch/multi_coalesce_test.go
Normal file
94
overlay/batch/multi_coalesce_test.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestMultiCoalescerRoutesByProto confirms TCP/UDP/other land in the right
|
||||
// lane: TCP and UDP get coalesced when their lanes are enabled, anything
|
||||
// else (ICMP here) falls through to plain Write.
|
||||
func TestMultiCoalescerRoutesByProto(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: true}
|
||||
m := NewMultiCoalescer(w, true, true)
|
||||
|
||||
tcpPay := make([]byte, 1200)
|
||||
udpPay := make([]byte, 1200)
|
||||
icmp := make([]byte, 28)
|
||||
icmp[0] = 0x45
|
||||
icmp[2] = 0
|
||||
icmp[3] = 28
|
||||
icmp[9] = 1
|
||||
|
||||
if err := m.Commit(buildTCPv4(1000, tcpAck, tcpPay)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := m.Commit(buildTCPv4(2200, tcpAck, tcpPay)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := m.Commit(buildUDPv4(2000, 53, udpPay)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := m.Commit(buildUDPv4(2000, 53, udpPay)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := m.Commit(icmp); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := m.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// 1 TCP super (2 segments) + 1 UDP super (2 segments) = 2 gso writes.
|
||||
if len(w.gsoWrites) != 2 {
|
||||
t.Fatalf("want 2 gso writes (one TCP + one UDP), got %d", len(w.gsoWrites))
|
||||
}
|
||||
if len(w.writes) != 1 {
|
||||
t.Fatalf("want 1 plain write (ICMP), got %d", len(w.writes))
|
||||
}
|
||||
}
|
||||
|
||||
// TestMultiCoalescerDisabledUDPFallsThrough verifies that when the UDP lane
|
||||
// is disabled (e.g. kernel doesn't support USO), UDP packets still reach
|
||||
// the kernel via the passthrough lane rather than being lost.
|
||||
func TestMultiCoalescerDisabledUDPFallsThrough(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: true}
|
||||
m := NewMultiCoalescer(w, true, false) // TSO on, USO off
|
||||
|
||||
if err := m.Commit(buildUDPv4(1000, 53, make([]byte, 800))); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := m.Commit(buildUDPv4(1000, 53, make([]byte, 800))); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := m.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(w.gsoWrites) != 0 {
|
||||
t.Errorf("UDP must NOT be coalesced when USO disabled, got %d gso writes", len(w.gsoWrites))
|
||||
}
|
||||
if len(w.writes) != 2 {
|
||||
t.Errorf("UDP must pass through as 2 plain writes, got %d", len(w.writes))
|
||||
}
|
||||
}
|
||||
|
||||
// TestMultiCoalescerDisabledTCPFallsThrough mirrors the TSO=off case.
|
||||
func TestMultiCoalescerDisabledTCPFallsThrough(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: true}
|
||||
m := NewMultiCoalescer(w, false, true) // TSO off, USO on
|
||||
|
||||
pay := make([]byte, 1200)
|
||||
if err := m.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := m.Commit(buildTCPv4(2200, tcpAck, pay)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := m.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(w.gsoWrites) != 0 {
|
||||
t.Errorf("TCP must NOT be coalesced when TSO disabled, got %d gso writes", len(w.gsoWrites))
|
||||
}
|
||||
if len(w.writes) != 2 {
|
||||
t.Errorf("TCP must pass through as 2 plain writes, got %d", len(w.writes))
|
||||
}
|
||||
}
|
||||
722
overlay/batch/tcp_coalesce.go
Normal file
722
overlay/batch/tcp_coalesce.go
Normal file
@@ -0,0 +1,722 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/netip"
|
||||
"sort"
|
||||
|
||||
"github.com/slackhq/nebula/overlay/tio"
|
||||
)
|
||||
|
||||
// ipProtoTCP is the IANA protocol number for TCP. Hardcoded instead of
|
||||
// reaching for golang.org/x/sys/unix — that package doesn't define the
|
||||
// constant on Windows, which would break cross-compiles even though this
|
||||
// file runs unchanged on every platform.
|
||||
const ipProtoTCP = 6
|
||||
|
||||
// tcpCoalesceBufSize caps total bytes per superpacket. Mirrors the kernel's
|
||||
// sk_gso_max_size of ~64KiB; anything beyond this would be rejected anyway.
|
||||
const tcpCoalesceBufSize = 65535
|
||||
|
||||
// tcpCoalesceMaxSegs caps how many segments we'll coalesce into a single
|
||||
// superpacket. Keeping this well below the kernel's TSO ceiling bounds
|
||||
// latency.
|
||||
const tcpCoalesceMaxSegs = 64
|
||||
|
||||
// tcpCoalesceHdrCap is the scratch space we copy a seed's IP+TCP header
|
||||
// into. IPv6 (40) + TCP with full options (60) = 100 bytes.
|
||||
const tcpCoalesceHdrCap = 100
|
||||
|
||||
// coalesceSlot is one entry in the coalescer's ordered event queue. When
|
||||
// passthrough is true the slot holds a single borrowed packet that must be
|
||||
// emitted verbatim (non-TCP, non-admissible TCP, or oversize seed). When
|
||||
// passthrough is false the slot is an in-progress coalesced superpacket:
|
||||
// hdrBuf is a mutable copy of the seed's IP+TCP header (we patch total
|
||||
// length and pseudo-header partial at flush), and payIovs are *borrowed*
|
||||
// slices from the caller's plaintext buffers — no payload is ever copied.
|
||||
// The caller (listenOut) must keep those buffers alive until Flush.
|
||||
type coalesceSlot struct {
|
||||
passthrough bool
|
||||
rawPkt []byte // borrowed when passthrough
|
||||
|
||||
fk flowKey
|
||||
hdrBuf [tcpCoalesceHdrCap]byte
|
||||
hdrLen int
|
||||
ipHdrLen int
|
||||
isV6 bool
|
||||
gsoSize int
|
||||
numSeg int
|
||||
totalPay int
|
||||
nextSeq uint32
|
||||
// psh closes the chain: set when the last-accepted segment had PSH or
|
||||
// was sub-gsoSize. No further appends after that.
|
||||
psh bool
|
||||
payIovs [][]byte
|
||||
}
|
||||
|
||||
// TCPCoalescer accumulates adjacent in-flow TCP data segments across
|
||||
// multiple concurrent flows and emits each flow's run as a single TSO
|
||||
// superpacket via tio.GSOWriter. All output — coalesced or not — is
|
||||
// deferred until Flush so arrival order is preserved on the wire. Owns
|
||||
// no locks; one coalescer per TUN write queue.
|
||||
type TCPCoalescer struct {
|
||||
plainW io.Writer
|
||||
gsoW tio.GSOWriter // nil when the queue doesn't support TSO
|
||||
|
||||
// slots is the ordered event queue. Flush walks it once and emits each
|
||||
// entry as either a WriteGSO (coalesced) or a plainW.Write (passthrough).
|
||||
slots []*coalesceSlot
|
||||
// openSlots maps a flow key to its most recent non-sealed slot, so new
|
||||
// segments can extend an in-progress superpacket in O(1). Slots are
|
||||
// removed from this map when they close (PSH or short-last-segment),
|
||||
// when a non-admissible packet for that flow arrives, or in Flush.
|
||||
openSlots map[flowKey]*coalesceSlot
|
||||
// lastSlot caches the most recently touched open slot. Steady-state
|
||||
// bulk traffic is dominated by a single flow, so comparing the
|
||||
// incoming key against the cached slot's own fk lets the hot path
|
||||
// skip the map lookup (and the aeshash of a 38-byte key) entirely.
|
||||
// Kept in lockstep with openSlots: nil whenever the slot it pointed
|
||||
// at is removed/sealed.
|
||||
lastSlot *coalesceSlot
|
||||
pool []*coalesceSlot // free list for reuse
|
||||
|
||||
backing []byte
|
||||
}
|
||||
|
||||
func NewTCPCoalescer(w io.Writer) *TCPCoalescer {
|
||||
c := &TCPCoalescer{
|
||||
plainW: w,
|
||||
slots: make([]*coalesceSlot, 0, initialSlots),
|
||||
openSlots: make(map[flowKey]*coalesceSlot, initialSlots),
|
||||
pool: make([]*coalesceSlot, 0, initialSlots),
|
||||
backing: make([]byte, 0, initialSlots*65535),
|
||||
}
|
||||
if gw, ok := tio.SupportsGSO(w, tio.GSOProtoTCP); ok {
|
||||
c.gsoW = gw
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// parsedTCP holds the fields extracted from a single parse so later steps
|
||||
// (admission, slot lookup, canAppend) don't re-walk the header.
|
||||
type parsedTCP struct {
|
||||
fk flowKey
|
||||
ipHdrLen int
|
||||
tcpHdrLen int
|
||||
hdrLen int
|
||||
payLen int
|
||||
seq uint32
|
||||
flags byte
|
||||
}
|
||||
|
||||
// parseTCPBase extracts the flow key and IP/TCP offsets for any TCP packet,
|
||||
// regardless of whether it's admissible for coalescing. Returns ok=false
|
||||
// for non-TCP or malformed input. Accepts IPv4 (no options, no fragmentation)
|
||||
// and IPv6 (no extension headers).
|
||||
func parseTCPBase(pkt []byte) (parsedTCP, bool) {
|
||||
var p parsedTCP
|
||||
ip, ok := parseIPPrologue(pkt, ipProtoTCP)
|
||||
if !ok {
|
||||
return p, false
|
||||
}
|
||||
pkt = ip.pkt
|
||||
p.fk = ip.fk
|
||||
p.ipHdrLen = ip.ipHdrLen
|
||||
|
||||
if len(pkt) < p.ipHdrLen+20 {
|
||||
return p, false
|
||||
}
|
||||
tcpOff := int(pkt[p.ipHdrLen+12]>>4) * 4
|
||||
if tcpOff < 20 || tcpOff > 60 {
|
||||
return p, false
|
||||
}
|
||||
if len(pkt) < p.ipHdrLen+tcpOff {
|
||||
return p, false
|
||||
}
|
||||
p.tcpHdrLen = tcpOff
|
||||
p.hdrLen = p.ipHdrLen + tcpOff
|
||||
p.payLen = len(pkt) - p.hdrLen
|
||||
p.seq = binary.BigEndian.Uint32(pkt[p.ipHdrLen+4 : p.ipHdrLen+8])
|
||||
p.flags = pkt[p.ipHdrLen+13]
|
||||
p.fk.sport = binary.BigEndian.Uint16(pkt[p.ipHdrLen : p.ipHdrLen+2])
|
||||
p.fk.dport = binary.BigEndian.Uint16(pkt[p.ipHdrLen+2 : p.ipHdrLen+4])
|
||||
return p, true
|
||||
}
|
||||
|
||||
// TCP flag bits (byte 13 of the TCP header). Only the bits actually consulted
|
||||
// by the coalescer are named; FIN/SYN/RST/URG/CWR are rejected via the
|
||||
// negative mask in coalesceable, not by name.
|
||||
const (
|
||||
tcpFlagPsh = 0x08
|
||||
tcpFlagAck = 0x10
|
||||
tcpFlagEce = 0x40
|
||||
)
|
||||
|
||||
// coalesceable reports whether a parsed TCP segment is eligible for
|
||||
// coalescing. Accepts ACK, ACK|PSH, ACK|ECE, ACK|PSH|ECE with a
|
||||
// non-empty payload. CWR is excluded because it marks a one-shot
|
||||
// congestion-window-reduced transition the receiver must observe at a
|
||||
// segment boundary.
|
||||
func (p parsedTCP) coalesceable() bool {
|
||||
if p.flags&tcpFlagAck == 0 {
|
||||
return false
|
||||
}
|
||||
if p.flags&^(tcpFlagAck|tcpFlagPsh|tcpFlagEce) != 0 {
|
||||
return false
|
||||
}
|
||||
return p.payLen > 0
|
||||
}
|
||||
|
||||
func (c *TCPCoalescer) Reserve(sz int) []byte {
|
||||
return reserveFromBacking(&c.backing, sz)
|
||||
}
|
||||
|
||||
// Commit borrows pkt. The caller must keep pkt valid until the next Flush,
|
||||
// whether or not the packet was coalesced — passthrough (non-admissible)
|
||||
// packets are queued and written at Flush time, not synchronously.
|
||||
func (c *TCPCoalescer) Commit(pkt []byte) error {
|
||||
if c.gsoW == nil {
|
||||
c.addPassthrough(pkt)
|
||||
return nil
|
||||
}
|
||||
info, ok := parseTCPBase(pkt)
|
||||
if !ok {
|
||||
c.addPassthrough(pkt)
|
||||
return nil
|
||||
}
|
||||
return c.commitParsed(pkt, info)
|
||||
}
|
||||
|
||||
// commitParsed is the post-parse half of Commit. The caller must have
|
||||
// already verified parseTCPBase succeeded (info is a valid TCP parse).
|
||||
// Used by MultiCoalescer.Commit to avoid re-walking the IP/TCP header
|
||||
// after the dispatcher has already done so.
|
||||
func (c *TCPCoalescer) commitParsed(pkt []byte, info parsedTCP) error {
|
||||
if c.gsoW == nil {
|
||||
c.addPassthrough(pkt)
|
||||
return nil
|
||||
}
|
||||
if !info.coalesceable() {
|
||||
// TCP but not admissible (SYN/FIN/RST/URG/CWR or zero-payload).
|
||||
// Seal this flow's open slot so later in-flow packets don't extend
|
||||
// it and accidentally reorder past this passthrough.
|
||||
if last := c.lastSlot; last != nil && last.fk == info.fk {
|
||||
c.lastSlot = nil
|
||||
}
|
||||
delete(c.openSlots, info.fk)
|
||||
c.addPassthrough(pkt)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Single-flow fast path: with only one open flow the cache hits every
|
||||
// packet, and len(openSlots)==1 lets us skip the 38-byte fk compare
|
||||
// when there are multiple flows in flight (where the hit rate would
|
||||
// be ~0 and the compare is pure overhead).
|
||||
var open *coalesceSlot
|
||||
if last := c.lastSlot; last != nil && len(c.openSlots) == 1 && last.fk == info.fk {
|
||||
open = last
|
||||
} else {
|
||||
open = c.openSlots[info.fk]
|
||||
}
|
||||
if open != nil {
|
||||
if c.canAppend(open, pkt, info) {
|
||||
c.appendPayload(open, pkt, info)
|
||||
if open.psh {
|
||||
delete(c.openSlots, info.fk)
|
||||
c.lastSlot = nil
|
||||
} else {
|
||||
c.lastSlot = open
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// Can't extend — seal it and fall through to seed a fresh slot.
|
||||
delete(c.openSlots, info.fk)
|
||||
if c.lastSlot == open {
|
||||
c.lastSlot = nil
|
||||
}
|
||||
}
|
||||
c.seed(pkt, info)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Flush emits every queued event in (per-flow) seq order. Coalesced slots
|
||||
// go out via WriteGSO; passthrough slots go out via plainW.Write.
|
||||
// reorderForFlush first sorts each flow's slots into TCP-seq order within
|
||||
// passthrough-bounded segments and merges contiguous adjacent slots, so
|
||||
// any wire-side reorder that crossed an rxOrder batch boundary doesn't
|
||||
// get amplified into kernel-visible reorder by the slot machinery.
|
||||
// Returns the first error observed; keeps draining so one bad packet
|
||||
// doesn't hold up the rest. After Flush returns, borrowed payload slices
|
||||
// may be recycled.
|
||||
func (c *TCPCoalescer) Flush() error {
|
||||
c.reorderForFlush()
|
||||
var first error
|
||||
for _, s := range c.slots {
|
||||
var err error
|
||||
if s.passthrough {
|
||||
_, err = c.plainW.Write(s.rawPkt)
|
||||
} else {
|
||||
err = c.flushSlot(s)
|
||||
}
|
||||
if err != nil && first == nil {
|
||||
first = err
|
||||
}
|
||||
c.release(s)
|
||||
}
|
||||
for i := range c.slots {
|
||||
c.slots[i] = nil
|
||||
}
|
||||
c.slots = c.slots[:0]
|
||||
for k := range c.openSlots {
|
||||
delete(c.openSlots, k)
|
||||
}
|
||||
c.lastSlot = nil
|
||||
|
||||
c.backing = c.backing[:0]
|
||||
return first
|
||||
}
|
||||
|
||||
func (c *TCPCoalescer) addPassthrough(pkt []byte) {
|
||||
s := c.take()
|
||||
s.passthrough = true
|
||||
s.rawPkt = pkt
|
||||
c.slots = append(c.slots, s)
|
||||
}
|
||||
|
||||
func (c *TCPCoalescer) seed(pkt []byte, info parsedTCP) {
|
||||
if info.hdrLen > tcpCoalesceHdrCap || info.hdrLen+info.payLen > tcpCoalesceBufSize {
|
||||
// Pathological shape — can't fit our scratch, emit as-is.
|
||||
c.addPassthrough(pkt)
|
||||
return
|
||||
}
|
||||
s := c.take()
|
||||
s.passthrough = false
|
||||
s.rawPkt = nil
|
||||
copy(s.hdrBuf[:], pkt[:info.hdrLen])
|
||||
s.hdrLen = info.hdrLen
|
||||
s.ipHdrLen = info.ipHdrLen
|
||||
s.isV6 = info.fk.isV6
|
||||
s.fk = info.fk
|
||||
s.gsoSize = info.payLen
|
||||
s.numSeg = 1
|
||||
s.totalPay = info.payLen
|
||||
s.nextSeq = info.seq + uint32(info.payLen)
|
||||
s.psh = info.flags&tcpFlagPsh != 0
|
||||
s.payIovs = append(s.payIovs[:0], pkt[info.hdrLen:info.hdrLen+info.payLen])
|
||||
c.slots = append(c.slots, s)
|
||||
if !s.psh {
|
||||
c.openSlots[info.fk] = s
|
||||
c.lastSlot = s
|
||||
} else if last := c.lastSlot; last != nil && last.fk == info.fk {
|
||||
// PSH-on-seed seals the slot immediately. Any prior cached open
|
||||
// slot for this flow has just been sealed-and-replaced by this
|
||||
// passthrough-shaped seed, so drop the cache too.
|
||||
c.lastSlot = nil
|
||||
}
|
||||
}
|
||||
|
||||
// canAppend reports whether info's packet extends the slot's seed: same
|
||||
// header shape and stable contents, adjacent seq, not oversized, chain not
|
||||
// closed.
|
||||
func (c *TCPCoalescer) canAppend(s *coalesceSlot, pkt []byte, info parsedTCP) bool {
|
||||
if s.psh {
|
||||
return false
|
||||
}
|
||||
if info.hdrLen != s.hdrLen {
|
||||
return false
|
||||
}
|
||||
if info.seq != s.nextSeq {
|
||||
return false
|
||||
}
|
||||
if s.numSeg >= tcpCoalesceMaxSegs {
|
||||
return false
|
||||
}
|
||||
if info.payLen > s.gsoSize {
|
||||
return false
|
||||
}
|
||||
if s.hdrLen+s.totalPay+info.payLen > tcpCoalesceBufSize {
|
||||
return false
|
||||
}
|
||||
// ECE state must be stable across a burst — receivers expect the
|
||||
// flag set on every segment of a CE-echoing window or none.
|
||||
seedFlags := s.hdrBuf[s.ipHdrLen+13]
|
||||
if (seedFlags^info.flags)&tcpFlagEce != 0 {
|
||||
return false
|
||||
}
|
||||
if !headersMatch(s.hdrBuf[:s.hdrLen], pkt[:info.hdrLen], s.isV6, s.ipHdrLen) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *TCPCoalescer) appendPayload(s *coalesceSlot, pkt []byte, info parsedTCP) {
|
||||
s.payIovs = append(s.payIovs, pkt[info.hdrLen:info.hdrLen+info.payLen])
|
||||
s.numSeg++
|
||||
s.totalPay += info.payLen
|
||||
s.nextSeq = info.seq + uint32(info.payLen)
|
||||
if info.flags&tcpFlagPsh != 0 {
|
||||
// Propagate PSH into the seed header so kernel TSO sets it on the
|
||||
// last segment. Without this the sender's push signal is dropped.
|
||||
s.hdrBuf[s.ipHdrLen+13] |= tcpFlagPsh
|
||||
}
|
||||
// Merge IP-level CE marks into the seed: headersMatch ignores ECN, so
|
||||
// this is the one place the signal is preserved.
|
||||
mergeECNIntoSeed(s.hdrBuf[:s.ipHdrLen], pkt[:s.ipHdrLen], s.isV6)
|
||||
if info.payLen < s.gsoSize || info.flags&tcpFlagPsh != 0 {
|
||||
s.psh = true
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TCPCoalescer) take() *coalesceSlot {
|
||||
if n := len(c.pool); n > 0 {
|
||||
s := c.pool[n-1]
|
||||
c.pool[n-1] = nil
|
||||
c.pool = c.pool[:n-1]
|
||||
return s
|
||||
}
|
||||
return &coalesceSlot{}
|
||||
}
|
||||
|
||||
func (c *TCPCoalescer) release(s *coalesceSlot) {
|
||||
s.passthrough = false
|
||||
s.rawPkt = nil
|
||||
for i := range s.payIovs {
|
||||
s.payIovs[i] = nil
|
||||
}
|
||||
s.payIovs = s.payIovs[:0]
|
||||
s.numSeg = 0
|
||||
s.totalPay = 0
|
||||
s.psh = false
|
||||
c.pool = append(c.pool, s)
|
||||
}
|
||||
|
||||
// flushSlot patches the header and calls WriteGSO. Does not remove the
|
||||
// slot from c.slots.
|
||||
func (c *TCPCoalescer) flushSlot(s *coalesceSlot) error {
|
||||
total := s.hdrLen + s.totalPay
|
||||
l4Len := total - s.ipHdrLen
|
||||
hdr := s.hdrBuf[:s.hdrLen]
|
||||
|
||||
if s.isV6 {
|
||||
binary.BigEndian.PutUint16(hdr[4:6], uint16(l4Len))
|
||||
} else {
|
||||
binary.BigEndian.PutUint16(hdr[2:4], uint16(total))
|
||||
hdr[10] = 0
|
||||
hdr[11] = 0
|
||||
binary.BigEndian.PutUint16(hdr[10:12], ipv4HdrChecksum(hdr[:s.ipHdrLen]))
|
||||
}
|
||||
|
||||
var psum uint32
|
||||
if s.isV6 {
|
||||
psum = pseudoSumIPv6(hdr[8:24], hdr[24:40], ipProtoTCP, l4Len)
|
||||
} else {
|
||||
psum = pseudoSumIPv4(hdr[12:16], hdr[16:20], ipProtoTCP, l4Len)
|
||||
}
|
||||
tcsum := s.ipHdrLen + 16
|
||||
binary.BigEndian.PutUint16(hdr[tcsum:tcsum+2], foldOnceNoInvert(psum))
|
||||
|
||||
return c.gsoW.WriteGSO(hdr[:s.ipHdrLen], hdr[s.ipHdrLen:], s.payIovs, tio.GSOProtoTCP)
|
||||
}
|
||||
|
||||
// headersMatch compares two IP+TCP header prefixes for byte-for-byte
|
||||
// equality on every field that must be identical across coalesced
|
||||
// segments. Size/IPID/IPCsum/seq/flags/tcpCsum are masked out, as is the
|
||||
// 2-bit IP-level ECN field — appendPayload merges CE into the seed.
|
||||
func headersMatch(a, b []byte, isV6 bool, ipHdrLen int) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
if !ipHeadersMatch(a, b, isV6) {
|
||||
return false
|
||||
}
|
||||
// TCP: compare [0:4] ports, [8:13] ack+dataoff, [14:16] window,
|
||||
// [18:tcpHdrLen] options (incl. urgent).
|
||||
tcp := ipHdrLen
|
||||
if !bytes.Equal(a[tcp:tcp+4], b[tcp:tcp+4]) {
|
||||
return false
|
||||
}
|
||||
if !bytes.Equal(a[tcp+8:tcp+13], b[tcp+8:tcp+13]) {
|
||||
return false
|
||||
}
|
||||
if !bytes.Equal(a[tcp+14:tcp+16], b[tcp+14:tcp+16]) {
|
||||
return false
|
||||
}
|
||||
if !bytes.Equal(a[tcp+18:], b[tcp+18:]) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// reorderForFlush neutralizes wire-side reorder that the rxOrder buffer
|
||||
// couldn't catch (anything crossing a recvmmsg batch boundary). Without
|
||||
// this pass a small wire reorder — counter 250 arriving in batch K when
|
||||
// 200..249 are coming in batch K+1 — would seed an out-of-seq slot first
|
||||
// and emit it ahead of the lower-seq slot, manifesting at the inner TCP
|
||||
// receiver as a much larger reorder than the wire actually had.
|
||||
//
|
||||
// Two phases:
|
||||
// 1. Sort each passthrough-bounded segment of c.slots by (flow, seq).
|
||||
// Cross-flow ordering inside a segment isn't preserved (it never was
|
||||
// and doesn't matter for any single flow's TCP correctness).
|
||||
// 2. Sweep once and merge adjacent same-flow slots whose ranges are now
|
||||
// contiguous AND whose tail is gsoSize-aligned. The tail constraint
|
||||
// matters because the kernel TSO splitter chops at gsoSize from the
|
||||
// start of the merged payload — a short segment in the middle would
|
||||
// desynchronize every later segment.
|
||||
//
|
||||
// Passthrough slots act as barriers: the merge check skips them on either
|
||||
// side, so a SYN/FIN/RST/CWR is never reordered relative to its flow's
|
||||
// data.
|
||||
func (c *TCPCoalescer) reorderForFlush() {
|
||||
if len(c.slots) <= 1 {
|
||||
return
|
||||
}
|
||||
runStart := 0
|
||||
for i := 0; i <= len(c.slots); i++ {
|
||||
if i < len(c.slots) && !c.slots[i].passthrough {
|
||||
continue
|
||||
}
|
||||
c.sortRun(c.slots[runStart:i])
|
||||
runStart = i + 1
|
||||
}
|
||||
out := c.slots[:0]
|
||||
logged := false
|
||||
for _, s := range c.slots {
|
||||
if n := len(out); n > 0 {
|
||||
prev := out[n-1]
|
||||
if !prev.passthrough && !s.passthrough && prev.fk == s.fk {
|
||||
// Same-flow neighbors after sort. If they aren't seq-
|
||||
// contiguous it's a real gap — packets the wire reordered
|
||||
// across batches, or actual loss before nebula. Log it so
|
||||
// the operator can quantify how often it happens; the data
|
||||
// itself still emits in seq order, kernel TCP handles the
|
||||
// gap via its OOO queue.
|
||||
if prev.nextSeq != slotSeedSeq(s) {
|
||||
logged = true
|
||||
gap := int64(slotSeedSeq(s)) - int64(prev.nextSeq)
|
||||
slog.Default().Warn("tcp coalesce: cross-slot seq gap",
|
||||
"src", flowKeyAddr(s.fk, false),
|
||||
"dst", flowKeyAddr(s.fk, true),
|
||||
"sport", s.fk.sport,
|
||||
"dport", s.fk.dport,
|
||||
"prev_seed_seq", slotSeedSeq(prev),
|
||||
"prev_next_seq", prev.nextSeq,
|
||||
"this_seed_seq", slotSeedSeq(s),
|
||||
"gap_bytes", gap,
|
||||
"prev_seg_count", prev.numSeg,
|
||||
"prev_total_pay", prev.totalPay,
|
||||
)
|
||||
}
|
||||
if canMergeSlots(prev, s) {
|
||||
mergeSlots(prev, s)
|
||||
c.release(s)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
out = append(out, s)
|
||||
}
|
||||
if logged {
|
||||
slog.Default().Warn("==== end of batch ====")
|
||||
}
|
||||
c.slots = out
|
||||
}
|
||||
|
||||
// flowKeyAddr returns the src or dst address from fk as a netip.Addr for
|
||||
// logging. Only used on the cold gap-log path so the netip allocation
|
||||
// doesn't matter.
|
||||
func flowKeyAddr(fk flowKey, dst bool) netip.Addr {
|
||||
src := fk.src
|
||||
if dst {
|
||||
src = fk.dst
|
||||
}
|
||||
if fk.isV6 {
|
||||
return netip.AddrFrom16(src)
|
||||
}
|
||||
var v4 [4]byte
|
||||
copy(v4[:], src[:4])
|
||||
return netip.AddrFrom4(v4)
|
||||
}
|
||||
|
||||
// sortRun stable-sorts run by (flowKey, seedSeq) so each flow's slots
|
||||
// cluster together in seq order, ready for the merge sweep. Stable so
|
||||
// equal-key slots keep their original relative position (defensive — a
|
||||
// duplicate seedSeq would already mean something's wrong upstream).
|
||||
func (c *TCPCoalescer) sortRun(run []*coalesceSlot) {
|
||||
if len(run) <= 1 {
|
||||
return
|
||||
}
|
||||
sort.SliceStable(run, func(i, j int) bool {
|
||||
a, b := run[i], run[j]
|
||||
if cmp := flowKeyCompare(a.fk, b.fk); cmp != 0 {
|
||||
return cmp < 0
|
||||
}
|
||||
return tcpSeqLess(slotSeedSeq(a), slotSeedSeq(b))
|
||||
})
|
||||
}
|
||||
|
||||
// slotSeedSeq returns the TCP seq of the slot's seed (first segment).
|
||||
// nextSeq tracks the seq just past the last appended byte; subtracting
|
||||
// totalPay walks back to the seed. uint32 wraparound is the right TCP
|
||||
// arithmetic so no special-casing is needed.
|
||||
func slotSeedSeq(s *coalesceSlot) uint32 {
|
||||
return s.nextSeq - uint32(s.totalPay)
|
||||
}
|
||||
|
||||
// tcpSeqLess reports whether a precedes b in TCP serial-number arithmetic
|
||||
// (RFC 1323 §2.3). The signed int32 cast turns the modular subtraction
|
||||
// into the right comparison even across the 2^32 wrap.
|
||||
func tcpSeqLess(a, b uint32) bool {
|
||||
return int32(a-b) < 0
|
||||
}
|
||||
|
||||
// flowKeyCompare orders flowKeys deterministically. The exact ordering
|
||||
// is irrelevant — only that same-flow slots cluster together so the
|
||||
// post-sort sweep can merge contiguous pairs.
|
||||
func flowKeyCompare(a, b flowKey) int {
|
||||
if c := bytes.Compare(a.src[:], b.src[:]); c != 0 {
|
||||
return c
|
||||
}
|
||||
if c := bytes.Compare(a.dst[:], b.dst[:]); c != 0 {
|
||||
return c
|
||||
}
|
||||
if a.sport != b.sport {
|
||||
if a.sport < b.sport {
|
||||
return -1
|
||||
}
|
||||
return 1
|
||||
}
|
||||
if a.dport != b.dport {
|
||||
if a.dport < b.dport {
|
||||
return -1
|
||||
}
|
||||
return 1
|
||||
}
|
||||
if a.isV6 != b.isV6 {
|
||||
if !a.isV6 {
|
||||
return -1
|
||||
}
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// canMergeSlots reports whether s can fold into prev as one merged TSO
|
||||
// superpacket. Same flow, contiguous TCP byte range, equal gsoSize, and
|
||||
// fits within the kernel TSO limits. The tail-of-prev check rejects any
|
||||
// merge whose first slot ended on a sub-gsoSize segment — kernel TSO
|
||||
// would split the merged skb at gsoSize boundaries from the start, so a
|
||||
// short segment in the middle would corrupt every later segment. PSH and
|
||||
// ECE state must agree across both slots: PSH is a semantic delimiter
|
||||
// (preserving the sender's push boundary) and ECE state must be uniform
|
||||
// across a window (the same rule canAppend enforces for in-flow appends).
|
||||
//
|
||||
// Note: a slot sealed by reorder (canAppend returned false on seq
|
||||
// mismatch) keeps psh=false, so this restriction does not block the
|
||||
// reorder-fix merge — only legitimate PSH-set seals.
|
||||
func canMergeSlots(prev, s *coalesceSlot) bool {
|
||||
if prev.psh {
|
||||
return false
|
||||
}
|
||||
if prev.fk != s.fk {
|
||||
return false
|
||||
}
|
||||
if prev.gsoSize != s.gsoSize {
|
||||
return false
|
||||
}
|
||||
if prev.nextSeq != slotSeedSeq(s) {
|
||||
return false
|
||||
}
|
||||
if prev.numSeg+s.numSeg > tcpCoalesceMaxSegs {
|
||||
return false
|
||||
}
|
||||
if prev.hdrLen+prev.totalPay+s.totalPay > tcpCoalesceBufSize {
|
||||
return false
|
||||
}
|
||||
if len(prev.payIovs[len(prev.payIovs)-1]) != prev.gsoSize {
|
||||
return false
|
||||
}
|
||||
prevFlags := prev.hdrBuf[prev.ipHdrLen+13]
|
||||
sFlags := s.hdrBuf[s.ipHdrLen+13]
|
||||
if (prevFlags^sFlags)&tcpFlagEce != 0 {
|
||||
return false
|
||||
}
|
||||
if !headersMatch(prev.hdrBuf[:prev.hdrLen], s.hdrBuf[:s.hdrLen], prev.isV6, prev.ipHdrLen) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// mergeSlots folds src into dst in place: payIovs concatenated, counters
|
||||
// and totals updated, PSH and IP-level CE bits OR'd into the seed header
|
||||
// so neither the push signal nor a CE mark is lost. The seed header's
|
||||
// seq, gsoSize, and fk are unchanged. Caller is responsible for releasing
|
||||
// src (it's no longer in c.slots after this call).
|
||||
func mergeSlots(dst, src *coalesceSlot) {
|
||||
dst.payIovs = append(dst.payIovs, src.payIovs...)
|
||||
dst.numSeg += src.numSeg
|
||||
dst.totalPay += src.totalPay
|
||||
dst.nextSeq = src.nextSeq
|
||||
if src.psh {
|
||||
dst.psh = true
|
||||
dst.hdrBuf[dst.ipHdrLen+13] |= tcpFlagPsh
|
||||
}
|
||||
mergeECNIntoSeed(dst.hdrBuf[:dst.ipHdrLen], src.hdrBuf[:src.ipHdrLen], dst.isV6)
|
||||
}
|
||||
|
||||
// ipv4HdrChecksum computes the IPv4 header checksum over hdr (which must
|
||||
// already have its checksum field zeroed) and returns the folded/inverted
|
||||
// 16-bit value to store.
|
||||
func ipv4HdrChecksum(hdr []byte) uint16 {
|
||||
var sum uint32
|
||||
for i := 0; i+1 < len(hdr); i += 2 {
|
||||
sum += uint32(binary.BigEndian.Uint16(hdr[i : i+2]))
|
||||
}
|
||||
if len(hdr)%2 == 1 {
|
||||
sum += uint32(hdr[len(hdr)-1]) << 8
|
||||
}
|
||||
for sum>>16 != 0 {
|
||||
sum = (sum & 0xffff) + (sum >> 16)
|
||||
}
|
||||
return ^uint16(sum)
|
||||
}
|
||||
|
||||
// pseudoSumIPv4 / pseudoSumIPv6 build the L4 pseudo-header partial sum
|
||||
// expected by the virtio NEEDS_CSUM kernel path: the 32-bit accumulator
|
||||
// before folding. proto selects the L4 (TCP or UDP); the UDP coalescer
|
||||
// reuses these helpers.
|
||||
func pseudoSumIPv4(src, dst []byte, proto byte, l4Len int) uint32 {
|
||||
var sum uint32
|
||||
sum += uint32(binary.BigEndian.Uint16(src[0:2]))
|
||||
sum += uint32(binary.BigEndian.Uint16(src[2:4]))
|
||||
sum += uint32(binary.BigEndian.Uint16(dst[0:2]))
|
||||
sum += uint32(binary.BigEndian.Uint16(dst[2:4]))
|
||||
sum += uint32(proto)
|
||||
sum += uint32(l4Len)
|
||||
return sum
|
||||
}
|
||||
|
||||
func pseudoSumIPv6(src, dst []byte, proto byte, l4Len int) uint32 {
|
||||
var sum uint32
|
||||
for i := 0; i < 16; i += 2 {
|
||||
sum += uint32(binary.BigEndian.Uint16(src[i : i+2]))
|
||||
sum += uint32(binary.BigEndian.Uint16(dst[i : i+2]))
|
||||
}
|
||||
sum += uint32(l4Len >> 16)
|
||||
sum += uint32(l4Len & 0xffff)
|
||||
sum += uint32(proto)
|
||||
return sum
|
||||
}
|
||||
|
||||
// foldOnceNoInvert folds the 32-bit accumulator to 16 bits and returns it
|
||||
// unchanged (no one's complement). This is what virtio NEEDS_CSUM wants in
|
||||
// the L4 checksum field — the kernel will add the payload sum and invert.
|
||||
func foldOnceNoInvert(sum uint32) uint16 {
|
||||
for sum>>16 != 0 {
|
||||
sum = (sum & 0xffff) + (sum >> 16)
|
||||
}
|
||||
return uint16(sum)
|
||||
}
|
||||
173
overlay/batch/tcp_coalesce_bench_test.go
Normal file
173
overlay/batch/tcp_coalesce_bench_test.go
Normal file
@@ -0,0 +1,173 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"testing"
|
||||
|
||||
"github.com/slackhq/nebula/overlay/tio"
|
||||
)
|
||||
|
||||
// nopTunWriter is a zero-alloc tio.GSOWriter for benchmarks. Discards
|
||||
// everything but satisfies the interface the coalescer detects.
|
||||
type nopTunWriter struct{}
|
||||
|
||||
func (nopTunWriter) Write(p []byte) (int, error) { return len(p), nil }
|
||||
func (nopTunWriter) WriteGSO(hdr []byte, transportHdr []byte, pays [][]byte, _ tio.GSOProto) error {
|
||||
return nil
|
||||
}
|
||||
func (nopTunWriter) Capabilities() tio.Capabilities {
|
||||
return tio.Capabilities{TSO: true, USO: true}
|
||||
}
|
||||
|
||||
// buildTCPv4BulkFlow returns a slice of N adjacent ACK-only TCP segments
|
||||
// on a single 5-tuple, each carrying payloadLen bytes. Seq numbers are
|
||||
// contiguous so every packet is coalesceable onto the previous one.
|
||||
func buildTCPv4BulkFlow(n, payloadLen int) [][]byte {
|
||||
pkts := make([][]byte, n)
|
||||
pay := make([]byte, payloadLen)
|
||||
seq := uint32(1000)
|
||||
for i := range n {
|
||||
pkts[i] = buildTCPv4(seq, tcpAck, pay)
|
||||
seq += uint32(payloadLen)
|
||||
}
|
||||
return pkts
|
||||
}
|
||||
|
||||
// buildTCPv4Interleaved returns nFlows * perFlow packets with per-flow
|
||||
// seq continuity but round-robin across flows — worst case for any
|
||||
// "last-slot" cache.
|
||||
func buildTCPv4Interleaved(nFlows, perFlow, payloadLen int) [][]byte {
|
||||
pay := make([]byte, payloadLen)
|
||||
seqs := make([]uint32, nFlows)
|
||||
for i := range seqs {
|
||||
seqs[i] = uint32(1000 + i*1000000)
|
||||
}
|
||||
pkts := make([][]byte, 0, nFlows*perFlow)
|
||||
for range perFlow {
|
||||
for f := range nFlows {
|
||||
sport := uint16(10000 + f)
|
||||
pkts = append(pkts, buildTCPv4Ports(sport, 2000, seqs[f], tcpAck, pay))
|
||||
seqs[f] += uint32(payloadLen)
|
||||
}
|
||||
}
|
||||
return pkts
|
||||
}
|
||||
|
||||
// buildICMPv4 returns a minimal non-TCP packet that takes the passthrough
|
||||
// branch in Commit.
|
||||
func buildICMPv4() []byte {
|
||||
pkt := make([]byte, 28)
|
||||
pkt[0] = 0x45
|
||||
binary.BigEndian.PutUint16(pkt[2:4], 28)
|
||||
pkt[9] = 1 // ICMP
|
||||
copy(pkt[12:16], []byte{10, 0, 0, 1})
|
||||
copy(pkt[16:20], []byte{10, 0, 0, 2})
|
||||
return pkt
|
||||
}
|
||||
|
||||
// runCommitBench drives Commit over pkts batchSize at a time, flushing
|
||||
// between batches, and reports per-packet cost.
|
||||
func runCommitBench(b *testing.B, pkts [][]byte, batchSize int) {
|
||||
b.Helper()
|
||||
c := NewTCPCoalescer(nopTunWriter{})
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(pkts[0])))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
pkt := pkts[i%len(pkts)]
|
||||
if err := c.Commit(pkt); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
if (i+1)%batchSize == 0 {
|
||||
if err := c.Flush(); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Drain any trailing partial batch so slot state doesn't leak across runs.
|
||||
_ = c.Flush()
|
||||
}
|
||||
|
||||
// BenchmarkCommitSingleFlow is the bulk-TCP steady state: one flow,
|
||||
// contiguous seq, 1200-byte payloads. Every packet past the seed should
|
||||
// append onto the open slot. This is the case we most care about.
|
||||
func BenchmarkCommitSingleFlow(b *testing.B) {
|
||||
pkts := buildTCPv4BulkFlow(tcpCoalesceMaxSegs, 1200)
|
||||
runCommitBench(b, pkts, tcpCoalesceMaxSegs)
|
||||
}
|
||||
|
||||
// BenchmarkCommitInterleaved4 has 4 concurrent bulk flows round-robined.
|
||||
// A single-entry fast-path cache will miss on every packet; an N-way
|
||||
// cache or map lookup carries the weight.
|
||||
func BenchmarkCommitInterleaved4(b *testing.B) {
|
||||
pkts := buildTCPv4Interleaved(4, tcpCoalesceMaxSegs, 1200)
|
||||
runCommitBench(b, pkts, len(pkts))
|
||||
}
|
||||
|
||||
// BenchmarkCommitInterleaved16 stresses the map at higher flow counts.
|
||||
func BenchmarkCommitInterleaved16(b *testing.B) {
|
||||
pkts := buildTCPv4Interleaved(16, tcpCoalesceMaxSegs, 1200)
|
||||
runCommitBench(b, pkts, len(pkts))
|
||||
}
|
||||
|
||||
// BenchmarkCommitPassthrough exercises the non-TCP branch: parseTCPBase
|
||||
// bails early and addPassthrough is the only work.
|
||||
func BenchmarkCommitPassthrough(b *testing.B) {
|
||||
pkt := buildICMPv4()
|
||||
pkts := make([][]byte, 64)
|
||||
for i := range pkts {
|
||||
pkts[i] = pkt
|
||||
}
|
||||
runCommitBench(b, pkts, 64)
|
||||
}
|
||||
|
||||
// BenchmarkCommitNonCoalesceableTCP sends SYN|ACK packets on one flow.
|
||||
// Each packet takes the "TCP but not admissible" branch which does a
|
||||
// map delete + passthrough. Measures the seal-without-slot cost.
|
||||
func BenchmarkCommitNonCoalesceableTCP(b *testing.B) {
|
||||
pay := make([]byte, 0)
|
||||
pkts := make([][]byte, 64)
|
||||
for i := range pkts {
|
||||
pkts[i] = buildTCPv4(uint32(1000+i), tcpSyn|tcpAck, pay)
|
||||
}
|
||||
runCommitBench(b, pkts, 64)
|
||||
}
|
||||
|
||||
// runMultiCommitBench drives MultiCoalescer.Commit. The dispatcher does
|
||||
// the IP/L4 parse once and passes the parsed struct to the lane, so this
|
||||
// is the bench that shows the savings of skipping the lane's re-parse.
|
||||
func runMultiCommitBench(b *testing.B, pkts [][]byte, batchSize int) {
|
||||
b.Helper()
|
||||
m := NewMultiCoalescer(nopTunWriter{}, true, true)
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(pkts[0])))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
pkt := pkts[i%len(pkts)]
|
||||
if err := m.Commit(pkt); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
if (i+1)%batchSize == 0 {
|
||||
if err := m.Flush(); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = m.Flush()
|
||||
}
|
||||
|
||||
// BenchmarkMultiCommitSingleFlow is the multi-lane analogue of
|
||||
// BenchmarkCommitSingleFlow — same workload but routed through the
|
||||
// dispatcher. The delta vs the single-lane bench measures dispatcher
|
||||
// overhead.
|
||||
func BenchmarkMultiCommitSingleFlow(b *testing.B) {
|
||||
pkts := buildTCPv4BulkFlow(tcpCoalesceMaxSegs, 1200)
|
||||
runMultiCommitBench(b, pkts, tcpCoalesceMaxSegs)
|
||||
}
|
||||
|
||||
// BenchmarkMultiCommitInterleaved4 mirrors BenchmarkCommitInterleaved4
|
||||
// through the dispatcher.
|
||||
func BenchmarkMultiCommitInterleaved4(b *testing.B) {
|
||||
pkts := buildTCPv4Interleaved(4, tcpCoalesceMaxSegs, 1200)
|
||||
runMultiCommitBench(b, pkts, len(pkts))
|
||||
}
|
||||
1022
overlay/batch/tcp_coalesce_test.go
Normal file
1022
overlay/batch/tcp_coalesce_test.go
Normal file
File diff suppressed because it is too large
Load Diff
@@ -4,58 +4,63 @@ import "net/netip"
|
||||
|
||||
const SendBatchCap = 128
|
||||
|
||||
// SendBatch accumulates encrypted UDP packets for potential TX offloading.
|
||||
// batchWriter is the minimal subset of udp.Conn needed by SendBatch to flush.
|
||||
type batchWriter interface {
|
||||
WriteBatch(bufs [][]byte, addrs []netip.AddrPort, outerECNs []byte) error
|
||||
}
|
||||
|
||||
// SendBatch accumulates encrypted UDP packets and flushes them via WriteBatch.
|
||||
// One SendBatch is owned by each listenIn goroutine; no locking is needed.
|
||||
// The backing storage holds up to batchCap packets of slotCap bytes each;
|
||||
// bufs and dsts are parallel slices of committed slots.
|
||||
// The backing arena grows on demand: when there isn't room for the next slot
|
||||
// we allocate a fresh backing array. Already-committed slices keep referencing
|
||||
// the old array and remain valid until Flush drops them.
|
||||
type SendBatch struct {
|
||||
bufs [][]byte
|
||||
dsts []netip.AddrPort
|
||||
backing []byte
|
||||
slotCap int
|
||||
batchCap int
|
||||
nextSlot int
|
||||
out batchWriter
|
||||
bufs [][]byte
|
||||
dsts []netip.AddrPort
|
||||
ecns []byte
|
||||
backing []byte
|
||||
}
|
||||
|
||||
func NewSendBatch(batchCap, slotCap int) *SendBatch {
|
||||
func NewSendBatch(out batchWriter, batchCap, slotCap int) *SendBatch {
|
||||
return &SendBatch{
|
||||
bufs: make([][]byte, 0, batchCap),
|
||||
dsts: make([]netip.AddrPort, 0, batchCap),
|
||||
backing: make([]byte, batchCap*slotCap),
|
||||
slotCap: slotCap,
|
||||
batchCap: batchCap,
|
||||
out: out,
|
||||
bufs: make([][]byte, 0, batchCap),
|
||||
dsts: make([]netip.AddrPort, 0, batchCap),
|
||||
ecns: make([]byte, 0, batchCap),
|
||||
backing: make([]byte, 0, batchCap*slotCap),
|
||||
}
|
||||
}
|
||||
|
||||
func (b *SendBatch) Next() []byte {
|
||||
if b.nextSlot >= b.batchCap {
|
||||
return nil
|
||||
func (b *SendBatch) Reserve(sz int) []byte {
|
||||
if len(b.backing)+sz > cap(b.backing) {
|
||||
// Grow: allocate a fresh backing. Already-committed slices still
|
||||
// reference the old array and remain valid until Flush drops them.
|
||||
newCap := max(cap(b.backing)*2, sz)
|
||||
b.backing = make([]byte, 0, newCap)
|
||||
}
|
||||
start := b.nextSlot * b.slotCap
|
||||
return b.backing[start : start : start+b.slotCap] //set len to 0 but cap to slotCap
|
||||
start := len(b.backing)
|
||||
b.backing = b.backing[:start+sz]
|
||||
return b.backing[start : start+sz : start+sz]
|
||||
}
|
||||
|
||||
func (b *SendBatch) Commit(n int, dst netip.AddrPort) {
|
||||
start := b.nextSlot * b.slotCap
|
||||
b.bufs = append(b.bufs, b.backing[start:start+n])
|
||||
func (b *SendBatch) Commit(pkt []byte, dst netip.AddrPort, outerECN byte) {
|
||||
b.bufs = append(b.bufs, pkt)
|
||||
b.dsts = append(b.dsts, dst)
|
||||
b.nextSlot++
|
||||
b.ecns = append(b.ecns, outerECN)
|
||||
}
|
||||
|
||||
func (b *SendBatch) Reset() {
|
||||
func (b *SendBatch) Flush() error {
|
||||
var err error
|
||||
if len(b.bufs) > 0 {
|
||||
err = b.out.WriteBatch(b.bufs, b.dsts, b.ecns)
|
||||
}
|
||||
for i := range b.bufs {
|
||||
b.bufs[i] = nil
|
||||
}
|
||||
b.bufs = b.bufs[:0]
|
||||
b.dsts = b.dsts[:0]
|
||||
b.nextSlot = 0
|
||||
}
|
||||
|
||||
func (b *SendBatch) Len() int {
|
||||
return len(b.bufs)
|
||||
}
|
||||
|
||||
func (b *SendBatch) Cap() int {
|
||||
return b.batchCap
|
||||
}
|
||||
|
||||
func (b *SendBatch) Get() ([][]byte, []netip.AddrPort) {
|
||||
return b.bufs, b.dsts
|
||||
b.ecns = b.ecns[:0]
|
||||
b.backing = b.backing[:0]
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -5,65 +5,120 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSendBatchBookkeeping(t *testing.T) {
|
||||
b := NewSendBatch(4, 32)
|
||||
if b.Len() != 0 || b.Cap() != 4 {
|
||||
t.Fatalf("fresh batch: len=%d cap=%d", b.Len(), b.Cap())
|
||||
type fakeBatchWriter struct {
|
||||
bufs [][]byte
|
||||
addrs []netip.AddrPort
|
||||
ecns []byte
|
||||
}
|
||||
|
||||
func (w *fakeBatchWriter) WriteBatch(bufs [][]byte, addrs []netip.AddrPort, ecns []byte) error {
|
||||
// Snapshot — SendBatch.Flush nils its slot pointers right after WriteBatch
|
||||
// returns, so tests must capture data before that happens.
|
||||
w.bufs = make([][]byte, len(bufs))
|
||||
for i, b := range bufs {
|
||||
cp := make([]byte, len(b))
|
||||
copy(cp, b)
|
||||
w.bufs[i] = cp
|
||||
}
|
||||
w.addrs = append(w.addrs[:0], addrs...)
|
||||
w.ecns = append(w.ecns[:0], ecns...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestSendBatchReserveCommitFlush(t *testing.T) {
|
||||
fw := &fakeBatchWriter{}
|
||||
b := NewSendBatch(fw, 4, 32)
|
||||
|
||||
ap := netip.MustParseAddrPort("10.0.0.1:4242")
|
||||
for i := 0; i < 4; i++ {
|
||||
slot := b.Next()
|
||||
if slot == nil {
|
||||
t.Fatalf("slot %d: Next returned nil before cap", i)
|
||||
slot := b.Reserve(32)
|
||||
if cap(slot) != 32 {
|
||||
t.Fatalf("slot %d: cap=%d want 32", i, cap(slot))
|
||||
}
|
||||
if cap(slot) != 32 || len(slot) != 0 {
|
||||
t.Fatalf("slot %d: got len=%d cap=%d want len=0 cap=32", i, len(slot), cap(slot))
|
||||
}
|
||||
// Write a marker byte.
|
||||
slot = append(slot, byte(i), byte(i+1), byte(i+2))
|
||||
b.Commit(len(slot), ap)
|
||||
pkt := append(slot[:0], byte(i), byte(i+1), byte(i+2))
|
||||
b.Commit(pkt, ap, 0)
|
||||
}
|
||||
if b.Next() != nil {
|
||||
t.Fatalf("Next should return nil when full")
|
||||
if err := b.Flush(); err != nil {
|
||||
t.Fatalf("Flush: %v", err)
|
||||
}
|
||||
if b.Len() != 4 {
|
||||
t.Fatalf("Len=%d want 4", b.Len())
|
||||
if len(fw.bufs) != 4 {
|
||||
t.Fatalf("WriteBatch got %d bufs want 4", len(fw.bufs))
|
||||
}
|
||||
for i, buf := range b.bufs {
|
||||
for i, buf := range fw.bufs {
|
||||
if len(buf) != 3 || buf[0] != byte(i) {
|
||||
t.Errorf("buf %d: %x", i, buf)
|
||||
}
|
||||
if b.dsts[i] != ap {
|
||||
t.Errorf("dst %d: got %v want %v", i, b.dsts[i], ap)
|
||||
if fw.addrs[i] != ap {
|
||||
t.Errorf("addr %d: got %v want %v", i, fw.addrs[i], ap)
|
||||
}
|
||||
}
|
||||
|
||||
// Reset returns empty and Next works again.
|
||||
b.Reset()
|
||||
if b.Len() != 0 {
|
||||
t.Fatalf("after Reset Len=%d want 0", b.Len())
|
||||
// Flush again with nothing committed — should be a no-op.
|
||||
fw.bufs = nil
|
||||
if err := b.Flush(); err != nil {
|
||||
t.Fatalf("empty Flush: %v", err)
|
||||
}
|
||||
slot := b.Next()
|
||||
if slot == nil || cap(slot) != 32 {
|
||||
t.Fatalf("after Reset Next nil or wrong cap: %v cap=%d", slot == nil, cap(slot))
|
||||
if fw.bufs != nil {
|
||||
t.Fatalf("empty Flush triggered WriteBatch")
|
||||
}
|
||||
|
||||
// Reuse after Flush.
|
||||
slot := b.Reserve(32)
|
||||
if cap(slot) != 32 {
|
||||
t.Fatalf("after Flush Reserve wrong cap: %d", cap(slot))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendBatchSlotsDoNotOverlap(t *testing.T) {
|
||||
b := NewSendBatch(3, 8)
|
||||
fw := &fakeBatchWriter{}
|
||||
b := NewSendBatch(fw, 3, 8)
|
||||
ap := netip.MustParseAddrPort("10.0.0.1:80")
|
||||
|
||||
// Fill three slots, each with its own sentinel byte.
|
||||
for i := 0; i < 3; i++ {
|
||||
s := b.Next()
|
||||
s = append(s, byte(0xA0+i), byte(0xB0+i))
|
||||
b.Commit(len(s), ap)
|
||||
s := b.Reserve(8)
|
||||
pkt := append(s[:0], byte(0xA0+i), byte(0xB0+i))
|
||||
b.Commit(pkt, ap, 0)
|
||||
}
|
||||
if err := b.Flush(); err != nil {
|
||||
t.Fatalf("Flush: %v", err)
|
||||
}
|
||||
|
||||
for i, buf := range b.bufs {
|
||||
for i, buf := range fw.bufs {
|
||||
if buf[0] != byte(0xA0+i) || buf[1] != byte(0xB0+i) {
|
||||
t.Errorf("slot %d corrupted: %x", i, buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendBatchGrowPreservesCommitted(t *testing.T) {
|
||||
fw := &fakeBatchWriter{}
|
||||
// Tiny initial backing forces a grow on the second Reserve.
|
||||
b := NewSendBatch(fw, 1, 4)
|
||||
ap := netip.MustParseAddrPort("10.0.0.1:80")
|
||||
|
||||
s1 := b.Reserve(4)
|
||||
pkt1 := append(s1[:0], 0x11, 0x22, 0x33, 0x44)
|
||||
b.Commit(pkt1, ap, 0)
|
||||
|
||||
s2 := b.Reserve(8) // exceeds remaining cap, triggers grow
|
||||
pkt2 := append(s2[:0], 0xA, 0xB, 0xC, 0xD, 0xE)
|
||||
b.Commit(pkt2, ap, 0)
|
||||
|
||||
// pkt1 must still be intact even though backing reallocated.
|
||||
if pkt1[0] != 0x11 || pkt1[3] != 0x44 {
|
||||
t.Fatalf("first packet corrupted by grow: %x", pkt1)
|
||||
}
|
||||
|
||||
if err := b.Flush(); err != nil {
|
||||
t.Fatalf("Flush: %v", err)
|
||||
}
|
||||
if len(fw.bufs) != 2 {
|
||||
t.Fatalf("got %d bufs want 2", len(fw.bufs))
|
||||
}
|
||||
if fw.bufs[0][0] != 0x11 || fw.bufs[0][3] != 0x44 {
|
||||
t.Errorf("first packet on the wire: %x", fw.bufs[0])
|
||||
}
|
||||
if fw.bufs[1][0] != 0xA || fw.bufs[1][4] != 0xE {
|
||||
t.Errorf("second packet on the wire: %x", fw.bufs[1])
|
||||
}
|
||||
}
|
||||
|
||||
342
overlay/batch/udp_coalesce.go
Normal file
342
overlay/batch/udp_coalesce.go
Normal file
@@ -0,0 +1,342 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"io"
|
||||
|
||||
"github.com/slackhq/nebula/overlay/tio"
|
||||
)
|
||||
|
||||
// ipProtoUDP is the IANA protocol number for UDP.
|
||||
const ipProtoUDP = 17
|
||||
|
||||
// udpCoalesceBufSize caps total bytes per UDP superpacket. Mirrors the
|
||||
// kernel's gso_max_size; payloads beyond this are emitted as-is.
|
||||
const udpCoalesceBufSize = 65535
|
||||
|
||||
// udpCoalesceMaxSegs caps how many segments we'll coalesce. Kernel UDP-GSO
|
||||
// accepts up to 64 segments per skb (UDP_MAX_SEGMENTS); stay under that.
|
||||
const udpCoalesceMaxSegs = 64
|
||||
|
||||
// udpCoalesceHdrCap is the scratch space we copy a seed's IP+UDP header
|
||||
// into. IPv6 (40) + UDP (8) = 48; round up for safety.
|
||||
const udpCoalesceHdrCap = 64
|
||||
|
||||
// udpSlot is one entry in the UDPCoalescer's ordered event queue. Same
|
||||
// passthrough-vs-coalesced shape as the TCP coalescer's slot, but no
|
||||
// seq/PSH/CWR bookkeeping — UDP segments only need 5-tuple + length
|
||||
// matching to coalesce.
|
||||
type udpSlot struct {
|
||||
passthrough bool
|
||||
rawPkt []byte // borrowed when passthrough
|
||||
|
||||
fk flowKey
|
||||
hdrBuf [udpCoalesceHdrCap]byte
|
||||
hdrLen int
|
||||
ipHdrLen int
|
||||
isV6 bool
|
||||
gsoSize int // per-segment UDP payload length
|
||||
numSeg int
|
||||
totalPay int
|
||||
// sealed closes the chain: set when a sub-gsoSize segment is appended
|
||||
// (kernel UDP-GSO requires every segment but the last to be exactly
|
||||
// gsoSize) or when limits are hit. No further appends after.
|
||||
sealed bool
|
||||
payIovs [][]byte
|
||||
}
|
||||
|
||||
// UDPCoalescer accumulates adjacent in-flow UDP datagrams across multiple
|
||||
// concurrent flows and emits each flow's run as a single GSO_UDP_L4
|
||||
// superpacket via tio.GSOWriter. Falls back to per-packet writes when the
|
||||
// underlying writer doesn't support USO.
|
||||
//
|
||||
// All output — coalesced or not — is deferred until Flush so per-flow
|
||||
// arrival order is preserved on the wire. Cross-flow order is NOT preserved
|
||||
// across the TCP/UDP/passthrough split when this coalescer runs alongside
|
||||
// others — see multi_coalesce.go. Per-flow order is preserved because a
|
||||
// single 5-tuple only ever lands in one lane and each lane preserves its
|
||||
// own slot order.
|
||||
//
|
||||
// Owns no locks; one coalescer per TUN write queue.
|
||||
type UDPCoalescer struct {
|
||||
plainW io.Writer
|
||||
gsoW tio.GSOWriter // nil when the queue can't accept GSO_UDP_L4
|
||||
|
||||
slots []*udpSlot
|
||||
openSlots map[flowKey]*udpSlot
|
||||
pool []*udpSlot
|
||||
|
||||
backing []byte
|
||||
}
|
||||
|
||||
// NewUDPCoalescer wraps w. The caller is responsible for only constructing
|
||||
// this when the underlying Queue's Capabilities advertise USO; otherwise
|
||||
// the kernel may reject GSO_UDP_L4 writes. If w does not implement
|
||||
// tio.GSOWriter at all (single-packet Queue), the coalescer degrades to
|
||||
// plain Writes — same defensive shape as the TCP coalescer.
|
||||
func NewUDPCoalescer(w io.Writer) *UDPCoalescer {
|
||||
c := &UDPCoalescer{
|
||||
plainW: w,
|
||||
slots: make([]*udpSlot, 0, initialSlots),
|
||||
openSlots: make(map[flowKey]*udpSlot, initialSlots),
|
||||
pool: make([]*udpSlot, 0, initialSlots),
|
||||
backing: make([]byte, 0, initialSlots*udpCoalesceBufSize),
|
||||
}
|
||||
if gw, ok := tio.SupportsGSO(w, tio.GSOProtoUDP); ok {
|
||||
c.gsoW = gw
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// parsedUDP holds the fields extracted from a single parse so later steps
|
||||
// (admission, slot lookup, canAppend) don't re-walk the header.
|
||||
type parsedUDP struct {
|
||||
fk flowKey
|
||||
ipHdrLen int
|
||||
hdrLen int // ipHdrLen + 8
|
||||
payLen int
|
||||
}
|
||||
|
||||
// parseUDP extracts the flow key and IP/UDP offsets for a UDP packet.
|
||||
// Returns ok=false for non-UDP, malformed, or unsupported header shapes
|
||||
// (IPv4 with options/fragmentation, IPv6 with extension headers).
|
||||
func parseUDP(pkt []byte) (parsedUDP, bool) {
|
||||
var p parsedUDP
|
||||
ip, ok := parseIPPrologue(pkt, ipProtoUDP)
|
||||
if !ok {
|
||||
return p, false
|
||||
}
|
||||
pkt = ip.pkt
|
||||
p.fk = ip.fk
|
||||
p.ipHdrLen = ip.ipHdrLen
|
||||
|
||||
if len(pkt) < p.ipHdrLen+8 {
|
||||
return p, false
|
||||
}
|
||||
p.hdrLen = p.ipHdrLen + 8
|
||||
// UDP `length` field: must equal IP-derived length-of-UDP-header-plus-payload.
|
||||
udpLen := int(binary.BigEndian.Uint16(pkt[p.ipHdrLen+4 : p.ipHdrLen+6]))
|
||||
if udpLen < 8 || udpLen > len(pkt)-p.ipHdrLen {
|
||||
return p, false
|
||||
}
|
||||
p.payLen = udpLen - 8
|
||||
p.fk.sport = binary.BigEndian.Uint16(pkt[p.ipHdrLen : p.ipHdrLen+2])
|
||||
p.fk.dport = binary.BigEndian.Uint16(pkt[p.ipHdrLen+2 : p.ipHdrLen+4])
|
||||
return p, true
|
||||
}
|
||||
|
||||
func (c *UDPCoalescer) Reserve(sz int) []byte {
|
||||
return reserveFromBacking(&c.backing, sz)
|
||||
}
|
||||
|
||||
// Commit borrows pkt. The caller must keep pkt valid until the next Flush.
|
||||
func (c *UDPCoalescer) Commit(pkt []byte) error {
|
||||
if c.gsoW == nil {
|
||||
c.addPassthrough(pkt)
|
||||
return nil
|
||||
}
|
||||
info, ok := parseUDP(pkt)
|
||||
if !ok {
|
||||
c.addPassthrough(pkt)
|
||||
return nil
|
||||
}
|
||||
return c.commitParsed(pkt, info)
|
||||
}
|
||||
|
||||
// commitParsed is the post-parse half of Commit. The caller must have
|
||||
// already verified parseUDP succeeded. Used by MultiCoalescer.Commit to
|
||||
// avoid re-walking the IP/UDP header.
|
||||
func (c *UDPCoalescer) commitParsed(pkt []byte, info parsedUDP) error {
|
||||
if c.gsoW == nil {
|
||||
c.addPassthrough(pkt)
|
||||
return nil
|
||||
}
|
||||
if open := c.openSlots[info.fk]; open != nil {
|
||||
if c.canAppend(open, pkt, info) {
|
||||
c.appendPayload(open, pkt, info)
|
||||
if open.sealed {
|
||||
delete(c.openSlots, info.fk)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// Can't extend — seal it and fall through to seed a fresh slot.
|
||||
delete(c.openSlots, info.fk)
|
||||
}
|
||||
c.seed(pkt, info)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *UDPCoalescer) Flush() error {
|
||||
var first error
|
||||
for _, s := range c.slots {
|
||||
var err error
|
||||
if s.passthrough {
|
||||
_, err = c.plainW.Write(s.rawPkt)
|
||||
} else {
|
||||
err = c.flushSlot(s)
|
||||
}
|
||||
if err != nil && first == nil {
|
||||
first = err
|
||||
}
|
||||
c.release(s)
|
||||
}
|
||||
for i := range c.slots {
|
||||
c.slots[i] = nil
|
||||
}
|
||||
c.slots = c.slots[:0]
|
||||
for k := range c.openSlots {
|
||||
delete(c.openSlots, k)
|
||||
}
|
||||
c.backing = c.backing[:0]
|
||||
return first
|
||||
}
|
||||
|
||||
func (c *UDPCoalescer) addPassthrough(pkt []byte) {
|
||||
s := c.take()
|
||||
s.passthrough = true
|
||||
s.rawPkt = pkt
|
||||
c.slots = append(c.slots, s)
|
||||
}
|
||||
|
||||
func (c *UDPCoalescer) seed(pkt []byte, info parsedUDP) {
|
||||
if info.hdrLen > udpCoalesceHdrCap || info.hdrLen+info.payLen > udpCoalesceBufSize {
|
||||
c.addPassthrough(pkt)
|
||||
return
|
||||
}
|
||||
s := c.take()
|
||||
s.passthrough = false
|
||||
s.rawPkt = nil
|
||||
copy(s.hdrBuf[:], pkt[:info.hdrLen])
|
||||
s.hdrLen = info.hdrLen
|
||||
s.ipHdrLen = info.ipHdrLen
|
||||
s.isV6 = info.fk.isV6
|
||||
s.fk = info.fk
|
||||
s.gsoSize = info.payLen
|
||||
s.numSeg = 1
|
||||
s.totalPay = info.payLen
|
||||
s.sealed = false
|
||||
s.payIovs = append(s.payIovs[:0], pkt[info.hdrLen:info.hdrLen+info.payLen])
|
||||
c.slots = append(c.slots, s)
|
||||
c.openSlots[info.fk] = s
|
||||
}
|
||||
|
||||
// canAppend reports whether info's packet extends the slot's seed.
|
||||
// Kernel UDP-GSO requires every segment except possibly the last to be
|
||||
// exactly gsoSize, and the last may be shorter (≤ gsoSize).
|
||||
func (c *UDPCoalescer) canAppend(s *udpSlot, pkt []byte, info parsedUDP) bool {
|
||||
if s.sealed {
|
||||
return false
|
||||
}
|
||||
if info.hdrLen != s.hdrLen {
|
||||
return false
|
||||
}
|
||||
if s.numSeg >= udpCoalesceMaxSegs {
|
||||
return false
|
||||
}
|
||||
if info.payLen > s.gsoSize {
|
||||
return false
|
||||
}
|
||||
if s.hdrLen+s.totalPay+info.payLen > udpCoalesceBufSize {
|
||||
return false
|
||||
}
|
||||
if !udpHeadersMatch(s.hdrBuf[:s.hdrLen], pkt[:info.hdrLen], s.isV6, s.ipHdrLen) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *UDPCoalescer) appendPayload(s *udpSlot, pkt []byte, info parsedUDP) {
|
||||
s.payIovs = append(s.payIovs, pkt[info.hdrLen:info.hdrLen+info.payLen])
|
||||
s.numSeg++
|
||||
s.totalPay += info.payLen
|
||||
// Merge IP-level CE marks into the seed (same trick TCP coalescer uses).
|
||||
mergeECNIntoSeed(s.hdrBuf[:s.ipHdrLen], pkt[:s.ipHdrLen], s.isV6)
|
||||
if info.payLen < s.gsoSize {
|
||||
// Last-segment-can-be-shorter: this seals the chain.
|
||||
s.sealed = true
|
||||
}
|
||||
}
|
||||
|
||||
func (c *UDPCoalescer) take() *udpSlot {
|
||||
if n := len(c.pool); n > 0 {
|
||||
s := c.pool[n-1]
|
||||
c.pool[n-1] = nil
|
||||
c.pool = c.pool[:n-1]
|
||||
return s
|
||||
}
|
||||
return &udpSlot{}
|
||||
}
|
||||
|
||||
func (c *UDPCoalescer) release(s *udpSlot) {
|
||||
s.passthrough = false
|
||||
s.rawPkt = nil
|
||||
for i := range s.payIovs {
|
||||
s.payIovs[i] = nil
|
||||
}
|
||||
s.payIovs = s.payIovs[:0]
|
||||
s.numSeg = 0
|
||||
s.totalPay = 0
|
||||
s.sealed = false
|
||||
c.pool = append(c.pool, s)
|
||||
}
|
||||
|
||||
// flushSlot patches the IP header total length / IPv6 payload length and
|
||||
// the UDP length to the *total* across all coalesced segments, then seeds
|
||||
// the UDP checksum field with the pseudo-header partial (single-fold, not
|
||||
// inverted) per virtio NEEDS_CSUM. The kernel's ip_rcv_core (v4) and
|
||||
// ip6_rcv_core (v6) trim the skb to those length fields, so per-segment
|
||||
// values would silently drop everything but the first segment. The kernel
|
||||
// then walks each segment in __udp_gso_segment, recomputing per-segment
|
||||
// uh->len / iph->tot_len / IPv6 plen and adjusting the checksum via
|
||||
// `check = csum16_add(csum16_sub(uh->check, uh->len), newlen)` — meaning
|
||||
// our seed's uh->check must be consistent with the seed's uh->len, which
|
||||
// is what passing the total to both pseudoSum and the UDP length field
|
||||
// guarantees.
|
||||
func (c *UDPCoalescer) flushSlot(s *udpSlot) error {
|
||||
hdr := s.hdrBuf[:s.hdrLen]
|
||||
total := s.hdrLen + s.totalPay // full IP+UDP+all_payloads bytes
|
||||
l4Len := total - s.ipHdrLen // total UDP (8 + sum of payloads)
|
||||
|
||||
if s.isV6 {
|
||||
binary.BigEndian.PutUint16(hdr[4:6], uint16(l4Len))
|
||||
} else {
|
||||
binary.BigEndian.PutUint16(hdr[2:4], uint16(total))
|
||||
hdr[10] = 0
|
||||
hdr[11] = 0
|
||||
binary.BigEndian.PutUint16(hdr[10:12], ipv4HdrChecksum(hdr[:s.ipHdrLen]))
|
||||
}
|
||||
|
||||
// UDP length field (offset 4 inside the UDP header) = total UDP size.
|
||||
binary.BigEndian.PutUint16(hdr[s.ipHdrLen+4:s.ipHdrLen+6], uint16(l4Len))
|
||||
|
||||
var psum uint32
|
||||
if s.isV6 {
|
||||
psum = pseudoSumIPv6(hdr[8:24], hdr[24:40], ipProtoUDP, l4Len)
|
||||
} else {
|
||||
psum = pseudoSumIPv4(hdr[12:16], hdr[16:20], ipProtoUDP, l4Len)
|
||||
}
|
||||
udpCsumOff := s.ipHdrLen + 6
|
||||
binary.BigEndian.PutUint16(hdr[udpCsumOff:udpCsumOff+2], foldOnceNoInvert(psum))
|
||||
|
||||
return c.gsoW.WriteGSO(hdr[:s.ipHdrLen], hdr[s.ipHdrLen:], s.payIovs, tio.GSOProtoUDP)
|
||||
}
|
||||
|
||||
// udpHeadersMatch compares two IP+UDP header prefixes for byte-equality on
|
||||
// every field that must be identical across coalesced segments. Length
|
||||
// fields and the ECN bits in IP TOS/TC are masked out — appendPayload
|
||||
// merges CE into the seed; flushSlot rewrites lengths.
|
||||
func udpHeadersMatch(a, b []byte, isV6 bool, ipHdrLen int) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
if !ipHeadersMatch(a, b, isV6) {
|
||||
return false
|
||||
}
|
||||
// UDP: compare sport+dport ([0:4]). Skip length [4:6] and checksum [6:8] —
|
||||
// length varies (we rewrite at flush) and the checksum will be redone.
|
||||
udp := ipHdrLen
|
||||
if a[udp] != b[udp] || a[udp+1] != b[udp+1] || a[udp+2] != b[udp+2] || a[udp+3] != b[udp+3] {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
383
overlay/batch/udp_coalesce_test.go
Normal file
383
overlay/batch/udp_coalesce_test.go
Normal file
@@ -0,0 +1,383 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// buildUDPv4 builds a minimal IPv4+UDP packet with the given payload and ports.
|
||||
func buildUDPv4(sport, dport uint16, payload []byte) []byte {
|
||||
const ipHdrLen = 20
|
||||
const udpHdrLen = 8
|
||||
total := ipHdrLen + udpHdrLen + len(payload)
|
||||
pkt := make([]byte, total)
|
||||
|
||||
pkt[0] = 0x45
|
||||
pkt[1] = 0x00
|
||||
binary.BigEndian.PutUint16(pkt[2:4], uint16(total))
|
||||
binary.BigEndian.PutUint16(pkt[4:6], 0)
|
||||
binary.BigEndian.PutUint16(pkt[6:8], 0x4000)
|
||||
pkt[8] = 64
|
||||
pkt[9] = ipProtoUDP
|
||||
copy(pkt[12:16], []byte{10, 0, 0, 1})
|
||||
copy(pkt[16:20], []byte{10, 0, 0, 2})
|
||||
|
||||
binary.BigEndian.PutUint16(pkt[20:22], sport)
|
||||
binary.BigEndian.PutUint16(pkt[22:24], dport)
|
||||
binary.BigEndian.PutUint16(pkt[24:26], uint16(udpHdrLen+len(payload)))
|
||||
binary.BigEndian.PutUint16(pkt[26:28], 0)
|
||||
|
||||
copy(pkt[28:], payload)
|
||||
return pkt
|
||||
}
|
||||
|
||||
// buildUDPv6 builds a minimal IPv6+UDP packet.
|
||||
func buildUDPv6(sport, dport uint16, payload []byte) []byte {
|
||||
const ipHdrLen = 40
|
||||
const udpHdrLen = 8
|
||||
total := ipHdrLen + udpHdrLen + len(payload)
|
||||
pkt := make([]byte, total)
|
||||
|
||||
pkt[0] = 0x60
|
||||
binary.BigEndian.PutUint16(pkt[4:6], uint16(udpHdrLen+len(payload)))
|
||||
pkt[6] = ipProtoUDP
|
||||
pkt[7] = 64
|
||||
pkt[8] = 0xfe
|
||||
pkt[9] = 0x80
|
||||
pkt[23] = 1
|
||||
pkt[24] = 0xfe
|
||||
pkt[25] = 0x80
|
||||
pkt[39] = 2
|
||||
|
||||
binary.BigEndian.PutUint16(pkt[40:42], sport)
|
||||
binary.BigEndian.PutUint16(pkt[42:44], dport)
|
||||
binary.BigEndian.PutUint16(pkt[44:46], uint16(udpHdrLen+len(payload)))
|
||||
binary.BigEndian.PutUint16(pkt[46:48], 0)
|
||||
|
||||
copy(pkt[48:], payload)
|
||||
return pkt
|
||||
}
|
||||
|
||||
func TestUDPCoalescerPassthroughWhenGSOUnavailable(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: false}
|
||||
c := NewUDPCoalescer(w)
|
||||
pkt := buildUDPv4(1000, 53, make([]byte, 100))
|
||||
if err := c.Commit(pkt); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(w.writes) != 0 || len(w.gsoWrites) != 0 {
|
||||
t.Fatalf("no Add-time writes: writes=%d gso=%d", len(w.writes), len(w.gsoWrites))
|
||||
}
|
||||
if err := c.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(w.writes) != 1 || len(w.gsoWrites) != 0 {
|
||||
t.Fatalf("want single plain write, got writes=%d gso=%d", len(w.writes), len(w.gsoWrites))
|
||||
}
|
||||
}
|
||||
|
||||
func TestUDPCoalescerNonUDPPassthrough(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: true}
|
||||
c := NewUDPCoalescer(w)
|
||||
// ICMP packet
|
||||
pkt := make([]byte, 28)
|
||||
pkt[0] = 0x45
|
||||
binary.BigEndian.PutUint16(pkt[2:4], 28)
|
||||
pkt[9] = 1
|
||||
copy(pkt[12:16], []byte{10, 0, 0, 1})
|
||||
copy(pkt[16:20], []byte{10, 0, 0, 2})
|
||||
if err := c.Commit(pkt); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(w.writes) != 1 || len(w.gsoWrites) != 0 {
|
||||
t.Fatalf("ICMP must pass through unchanged: writes=%d gso=%d", len(w.writes), len(w.gsoWrites))
|
||||
}
|
||||
}
|
||||
|
||||
func TestUDPCoalescerSeedThenFlushAlone(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: true}
|
||||
c := NewUDPCoalescer(w)
|
||||
pkt := buildUDPv4(1000, 53, make([]byte, 800))
|
||||
if err := c.Commit(pkt); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Single-segment flush goes through WriteGSO; the writer infers GSO_NONE
|
||||
// from len(pays)==1 and the kernel fills in the UDP csum (NEEDS_CSUM).
|
||||
if len(w.gsoWrites) != 1 || len(w.writes) != 0 {
|
||||
t.Fatalf("single-seg flush: writes=%d gso=%d", len(w.writes), len(w.gsoWrites))
|
||||
}
|
||||
}
|
||||
|
||||
func TestUDPCoalescerCoalescesEqualSized(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: true}
|
||||
c := NewUDPCoalescer(w)
|
||||
pay := make([]byte, 1200)
|
||||
for i := 0; i < 3; i++ {
|
||||
if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
if err := c.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(w.gsoWrites) != 1 {
|
||||
t.Fatalf("want 1 gso write, got %d (plain=%d)", len(w.gsoWrites), len(w.writes))
|
||||
}
|
||||
g := w.gsoWrites[0]
|
||||
if g.gsoSize != 1200 {
|
||||
t.Errorf("gsoSize=%d want 1200", g.gsoSize)
|
||||
}
|
||||
if len(g.pays) != 3 {
|
||||
t.Errorf("pay count=%d want 3", len(g.pays))
|
||||
}
|
||||
if g.csumStart != 20 {
|
||||
t.Errorf("csumStart=%d want 20", g.csumStart)
|
||||
}
|
||||
// IP totalLen and UDP length must be the TOTAL across all segments —
|
||||
// the kernel's ip_rcv_core trims skbs to iph->tot_len, so a per-segment
|
||||
// value would silently drop everything but the first segment. Total =
|
||||
// IP(20) + UDP(8) + 3*1200 = 3628.
|
||||
gotTotalLen := binary.BigEndian.Uint16(g.hdr[2:4])
|
||||
if gotTotalLen != 3628 {
|
||||
t.Errorf("ipv4 total_len=%d want 3628 (must be total across segments)", gotTotalLen)
|
||||
}
|
||||
gotUDPLen := binary.BigEndian.Uint16(g.hdr[20+4 : 20+6])
|
||||
if gotUDPLen != 8+3*1200 {
|
||||
t.Errorf("udp len=%d want %d", gotUDPLen, 8+3*1200)
|
||||
}
|
||||
}
|
||||
|
||||
// Last segment may be shorter, sealing the chain.
|
||||
func TestUDPCoalescerShortLastSegmentSeals(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: true}
|
||||
c := NewUDPCoalescer(w)
|
||||
full := make([]byte, 1200)
|
||||
tail := make([]byte, 600)
|
||||
if err := c.Commit(buildUDPv4(1000, 53, full)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Commit(buildUDPv4(1000, 53, full)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Commit(buildUDPv4(1000, 53, tail)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// A 4th packet, even same-sized, must NOT join — chain is sealed.
|
||||
if err := c.Commit(buildUDPv4(1000, 53, full)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(w.gsoWrites) != 2 {
|
||||
t.Fatalf("want 2 gso writes (sealed + new seed), got %d", len(w.gsoWrites))
|
||||
}
|
||||
if len(w.gsoWrites[0].pays) != 3 {
|
||||
t.Errorf("first super: want 3 pays, got %d", len(w.gsoWrites[0].pays))
|
||||
}
|
||||
if len(w.gsoWrites[1].pays) != 1 {
|
||||
t.Errorf("second super: want 1 pay (re-seed), got %d", len(w.gsoWrites[1].pays))
|
||||
}
|
||||
}
|
||||
|
||||
// A larger-than-gsoSize packet cannot extend the slot — it reseeds.
|
||||
func TestUDPCoalescerLargerThanSeedReseeds(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: true}
|
||||
c := NewUDPCoalescer(w)
|
||||
if err := c.Commit(buildUDPv4(1000, 53, make([]byte, 800))); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Commit(buildUDPv4(1000, 53, make([]byte, 1200))); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(w.gsoWrites) != 2 {
|
||||
t.Fatalf("want 2 separate seeds, got %d", len(w.gsoWrites))
|
||||
}
|
||||
}
|
||||
|
||||
// Different 5-tuples must not coalesce.
|
||||
func TestUDPCoalescerDifferentFlowsKeepSeparate(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: true}
|
||||
c := NewUDPCoalescer(w)
|
||||
pay := make([]byte, 800)
|
||||
if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Commit(buildUDPv4(2000, 53, pay)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Commit(buildUDPv4(2000, 53, pay)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Two flows × 2 datagrams each = 2 superpackets of 2 segments.
|
||||
if len(w.gsoWrites) != 2 {
|
||||
t.Fatalf("want 2 gso writes (one per flow), got %d", len(w.gsoWrites))
|
||||
}
|
||||
for i, g := range w.gsoWrites {
|
||||
if len(g.pays) != 2 {
|
||||
t.Errorf("super %d: want 2 pays, got %d", i, len(g.pays))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Caps at udpCoalesceMaxSegs.
|
||||
func TestUDPCoalescerCapsAtMaxSegs(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: true}
|
||||
c := NewUDPCoalescer(w)
|
||||
pay := make([]byte, 100)
|
||||
for i := 0; i < udpCoalesceMaxSegs+5; i++ {
|
||||
if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
if err := c.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// First superpacket holds udpCoalesceMaxSegs segments; the spillover
|
||||
// reseeds a new one.
|
||||
if len(w.gsoWrites) != 2 {
|
||||
t.Fatalf("want 2 gso writes (cap then reseed), got %d", len(w.gsoWrites))
|
||||
}
|
||||
if len(w.gsoWrites[0].pays) != udpCoalesceMaxSegs {
|
||||
t.Errorf("first super: pays=%d want %d", len(w.gsoWrites[0].pays), udpCoalesceMaxSegs)
|
||||
}
|
||||
if len(w.gsoWrites[1].pays) != 5 {
|
||||
t.Errorf("second super: pays=%d want 5", len(w.gsoWrites[1].pays))
|
||||
}
|
||||
}
|
||||
|
||||
// CE marks on appended segments must be merged into the seed's IP TOS.
|
||||
func TestUDPCoalescerMergesCEMark(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: true}
|
||||
c := NewUDPCoalescer(w)
|
||||
pay := make([]byte, 800)
|
||||
pkt0 := buildUDPv4(1000, 53, pay) // ECN=00
|
||||
pkt1 := buildUDPv4(1000, 53, pay)
|
||||
pkt1[1] = 0x03 // CE
|
||||
pkt2 := buildUDPv4(1000, 53, pay)
|
||||
if err := c.Commit(pkt0); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Commit(pkt1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Commit(pkt2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(w.gsoWrites) != 1 {
|
||||
t.Fatalf("want 1 merged gso write, got %d (plain=%d)", len(w.gsoWrites), len(w.writes))
|
||||
}
|
||||
if w.gsoWrites[0].hdr[1]&0x03 != 0x03 {
|
||||
t.Errorf("CE not merged into seed (tos=%#x)", w.gsoWrites[0].hdr[1])
|
||||
}
|
||||
}
|
||||
|
||||
// IPv6 path: same flow, equal-sized → coalesced.
|
||||
func TestUDPCoalescerIPv6Coalesces(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: true}
|
||||
c := NewUDPCoalescer(w)
|
||||
pay := make([]byte, 1200)
|
||||
for i := 0; i < 3; i++ {
|
||||
if err := c.Commit(buildUDPv6(1000, 53, pay)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
if err := c.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(w.gsoWrites) != 1 {
|
||||
t.Fatalf("want 1 gso write, got %d", len(w.gsoWrites))
|
||||
}
|
||||
g := w.gsoWrites[0]
|
||||
if !g.isV6 {
|
||||
t.Errorf("expected v6 write")
|
||||
}
|
||||
if g.csumStart != 40 {
|
||||
t.Errorf("csumStart=%d want 40", g.csumStart)
|
||||
}
|
||||
// IPv6 payload_len and UDP length must be TOTAL — kernel's
|
||||
// ip6_rcv_core trims to payload_len + ipv6 hdr size. Total UDP = 8 +
|
||||
// 3*1200 = 3608.
|
||||
gotPlen := binary.BigEndian.Uint16(g.hdr[4:6])
|
||||
if gotPlen != 8+3*1200 {
|
||||
t.Errorf("ipv6 payload_len=%d want %d (must be total)", gotPlen, 8+3*1200)
|
||||
}
|
||||
gotUDPLen := binary.BigEndian.Uint16(g.hdr[40+4 : 40+6])
|
||||
if gotUDPLen != 8+3*1200 {
|
||||
t.Errorf("udp len=%d want %d", gotUDPLen, 8+3*1200)
|
||||
}
|
||||
}
|
||||
|
||||
// DSCP differences must reseed (headers don't match outside ECN).
|
||||
func TestUDPCoalescerDSCPMismatchReseeds(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: true}
|
||||
c := NewUDPCoalescer(w)
|
||||
pay := make([]byte, 800)
|
||||
pkt0 := buildUDPv4(1000, 53, pay)
|
||||
pkt1 := buildUDPv4(1000, 53, pay)
|
||||
pkt1[1] = 0xb8 // EF DSCP, ECN=0
|
||||
if err := c.Commit(pkt0); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Commit(pkt1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(w.gsoWrites) != 2 {
|
||||
t.Fatalf("want 2 separate seeds (different DSCP), got %d", len(w.gsoWrites))
|
||||
}
|
||||
}
|
||||
|
||||
// Fragmented IPv4 must not be coalesced.
|
||||
func TestUDPCoalescerFragmentedIPv4PassesThrough(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: true}
|
||||
c := NewUDPCoalescer(w)
|
||||
pkt := buildUDPv4(1000, 53, make([]byte, 200))
|
||||
binary.BigEndian.PutUint16(pkt[6:8], 0x2000) // MF=1
|
||||
if err := c.Commit(pkt); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(w.writes) != 1 || len(w.gsoWrites) != 0 {
|
||||
t.Fatalf("frag must pass through plain, got writes=%d gso=%d", len(w.writes), len(w.gsoWrites))
|
||||
}
|
||||
}
|
||||
|
||||
// IPv4 with options is not admissible (we require IHL=5).
|
||||
func TestUDPCoalescerIPv4WithOptionsPassesThrough(t *testing.T) {
|
||||
w := &fakeTunWriter{gsoEnabled: true}
|
||||
c := NewUDPCoalescer(w)
|
||||
pkt := buildUDPv4(1000, 53, make([]byte, 200))
|
||||
pkt[0] = 0x46 // IHL = 6 (24-byte IPv4 header — has options)
|
||||
if err := c.Commit(pkt); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := c.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(w.writes) != 1 || len(w.gsoWrites) != 0 {
|
||||
t.Fatalf("ipv4-with-options must pass through plain, got writes=%d gso=%d", len(w.writes), len(w.gsoWrites))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user