mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-16 04:47:38 +02:00
87 lines
3.0 KiB
Go
87 lines
3.0 KiB
Go
//go:build !android && !e2e_testing
|
|
// +build !android,!e2e_testing
|
|
|
|
package udp
|
|
|
|
import (
|
|
"cmp"
|
|
"net/netip"
|
|
"slices"
|
|
)
|
|
|
|
// rxSegment is one nebula packet pulled out of a recvmmsg entry — either a
|
|
// lone datagram or one segment of a GRO superpacket. cnt is the big-endian
|
|
// uint64 message counter at bytes [8:16] of the nebula header; 0 if the
|
|
// segment is too short to contain a header. ecn is the 2-bit IP-level ECN
|
|
// codepoint stamped on the carrier (one value per slot, since GRO requires
|
|
// equal ECN across coalesced datagrams).
|
|
type rxSegment struct {
|
|
src netip.AddrPort
|
|
cnt uint64
|
|
buf []byte
|
|
ecn byte
|
|
}
|
|
|
|
// rxReorderBuffer accumulates one recvmmsg batch worth of segments,
|
|
// splits any GRO superpackets at gso_size boundaries, stable-sorts by
|
|
// (src, port, counter), then delivers in order. The reorder distance is
|
|
// bounded by len(buf), which the caller sizes to stay well within the
|
|
// receiver's ReplayWindow so older arrivals are not rejected as replays.
|
|
type rxReorderBuffer struct {
|
|
buf []rxSegment
|
|
}
|
|
|
|
func newRxReorderBuffer(initialCap int) *rxReorderBuffer {
|
|
return &rxReorderBuffer{buf: make([]rxSegment, 0, initialCap)}
|
|
}
|
|
|
|
// reset prepares the buffer for the next recvmmsg batch.
|
|
func (r *rxReorderBuffer) reset() { r.buf = r.buf[:0] }
|
|
|
|
// addEntry expands one recvmmsg slot into rxSegments. When segSize <= 0 or
|
|
// segSize >= len(payload) the payload is appended as a single segment;
|
|
// otherwise the kernel-coalesced GRO superpacket is split at segSize
|
|
// boundaries (the kernel guarantees every segment is exactly segSize bytes
|
|
// except for the final one, which may be short). ecn applies uniformly to
|
|
// every produced segment because GRO requires equal ECN across coalesced
|
|
// datagrams.
|
|
func (r *rxReorderBuffer) addEntry(from netip.AddrPort, payload []byte, segSize int, ecn byte) {
|
|
if segSize <= 0 || segSize >= len(payload) {
|
|
r.buf = append(r.buf, rxSegment{from, headerCounter(payload), payload, ecn})
|
|
return
|
|
}
|
|
for off := 0; off < len(payload); off += segSize {
|
|
end := off + segSize
|
|
if end > len(payload) {
|
|
end = len(payload)
|
|
}
|
|
seg := payload[off:end]
|
|
r.buf = append(r.buf, rxSegment{from, headerCounter(seg), seg, ecn})
|
|
}
|
|
}
|
|
|
|
// sortStable orders the accumulated segments by (src addr, src port,
|
|
// counter). Same-source segments are reordered into counter order;
|
|
// cross-source relative order is determined by a stable address compare so
|
|
// the sort is total and predictable.
|
|
func (r *rxReorderBuffer) sortStable() {
|
|
slices.SortStableFunc(r.buf, func(a, b rxSegment) int {
|
|
if c := a.src.Addr().Compare(b.src.Addr()); c != 0 {
|
|
return c
|
|
}
|
|
if c := cmp.Compare(a.src.Port(), b.src.Port()); c != 0 {
|
|
return c
|
|
}
|
|
return cmp.Compare(a.cnt, b.cnt)
|
|
})
|
|
}
|
|
|
|
// deliver invokes fn once per segment in sorted order, then nils the
|
|
// per-entry buf reference so the next batch's append doesn't alias it.
|
|
func (r *rxReorderBuffer) deliver(fn EncReader) {
|
|
for k := range r.buf {
|
|
fn(r.buf[k].src, r.buf[k].buf, RxMeta{OuterECN: r.buf[k].ecn})
|
|
r.buf[k].buf = nil
|
|
}
|
|
}
|