mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-16 04:47:38 +02:00
use less ram pls
This commit is contained in:
@@ -285,9 +285,11 @@ func (f *Interface) activate() error {
|
|||||||
// is on, everything else (and either lane disabled) falls
|
// is on, everything else (and either lane disabled) falls
|
||||||
// through to passthrough so non-IP / non-TCP-UDP traffic still
|
// through to passthrough so non-IP / non-TCP-UDP traffic still
|
||||||
// reaches the TUN.
|
// reaches the TUN.
|
||||||
f.batchers[i] = batch.NewMultiCoalescer(f.readers[i], f.l, caps.TSO, caps.USO)
|
arena := batch.NewArena(batch.DefaultMultiArenaCap)
|
||||||
|
f.batchers[i] = batch.NewMultiCoalescer(f.readers[i], f.l, arena, caps.TSO, caps.USO)
|
||||||
} else {
|
} else {
|
||||||
f.batchers[i] = batch.NewPassthrough(f.readers[i])
|
arena := batch.NewArena(batch.DefaultPassthroughArenaCap)
|
||||||
|
f.batchers[i] = batch.NewPassthrough(f.readers[i], arena)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -148,16 +148,43 @@ func mergeECNIntoSeed(seedHdr, pktHdr []byte, isV6 bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// reserveFromBacking implements the Reserve half of the RxBatcher contract
|
// Arena is an injectable byte-slab that hands out non-overlapping borrowed
|
||||||
// shared by TCP and UDP coalescers. The backing slice grows on demand;
|
// slices via Reserve and releases them in bulk via Reset. Coalescers take
|
||||||
// already-committed slices reference the old array and remain valid until
|
// an *Arena at construction so the caller controls the slab lifetime and
|
||||||
// Flush resets backing.
|
// can share one slab across multiple coalescers (MultiCoalescer hands the
|
||||||
func reserveFromBacking(backing *[]byte, sz int) []byte {
|
// same *Arena to every lane so the lanes don't carry their own backings).
|
||||||
if len(*backing)+sz > cap(*backing) {
|
//
|
||||||
newCap := max(cap(*backing)*2, sz)
|
// Reserve borrows; the slice is valid until the next Reset. The slab grows
|
||||||
*backing = make([]byte, 0, newCap)
|
// (by allocating a fresh, larger backing array) if a Reserve doesn't fit;
|
||||||
|
// pre-size the arena via NewArena to avoid that path on the hot path.
|
||||||
|
type Arena struct {
|
||||||
|
buf []byte
|
||||||
}
|
}
|
||||||
start := len(*backing)
|
|
||||||
*backing = (*backing)[:start+sz]
|
// NewArena returns an Arena with a pre-allocated backing of the given
|
||||||
return (*backing)[start : start+sz : start+sz]
|
// capacity. Pass 0 if you don't intend to call Reserve (e.g. a test that
|
||||||
|
// only feeds the coalescer pre-made []byte packets via Commit).
|
||||||
|
func NewArena(capacity int) *Arena {
|
||||||
|
return &Arena{buf: make([]byte, 0, capacity)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reserve hands out a non-overlapping sz-byte slice from the arena. If the
|
||||||
|
// request doesn't fit the current backing, a fresh, larger backing is
|
||||||
|
// allocated; already-borrowed slices reference the old backing and remain
|
||||||
|
// valid until Reset.
|
||||||
|
func (a *Arena) Reserve(sz int) []byte {
|
||||||
|
if len(a.buf)+sz > cap(a.buf) {
|
||||||
|
newCap := max(cap(a.buf)*2, sz)
|
||||||
|
a.buf = make([]byte, 0, newCap)
|
||||||
|
}
|
||||||
|
start := len(a.buf)
|
||||||
|
a.buf = a.buf[:start+sz]
|
||||||
|
return a.buf[start : start+sz : start+sz]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset releases every slice handed out since the last Reset. Callers must
|
||||||
|
// not use any previously-borrowed slice after this returns. The underlying
|
||||||
|
// backing array is retained so subsequent Reserves don't re-allocate.
|
||||||
|
func (a *Arena) Reset() {
|
||||||
|
a.buf = a.buf[:0]
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,37 +28,43 @@ type MultiCoalescer struct {
|
|||||||
udp *UDPCoalescer
|
udp *UDPCoalescer
|
||||||
pt *Passthrough
|
pt *Passthrough
|
||||||
|
|
||||||
// arena shared across all lanes so a single Reserve grows one backing
|
// arena is shared across every lane (constructor hands the same
|
||||||
// slice; lane Commit calls borrow into this same arena.
|
// *Arena to TCP, UDP, and Passthrough), so there's exactly one
|
||||||
backing []byte
|
// backing slab per MultiCoalescer instance. Each lane's Flush calls
|
||||||
|
// Reset; the resets are idempotent because Multi.Flush drains lanes
|
||||||
|
// sequentially and never Reserves in between, so a later lane's
|
||||||
|
// slots stay readable across an earlier lane's Reset (the underlying
|
||||||
|
// bytes are still alive — Reset only re-slices len to 0).
|
||||||
|
arena *Arena
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DefaultMultiArenaCap is the recommended arena capacity for a Multi-lane
|
||||||
|
// batcher: 64 slots × 65535 bytes ≈ 4 MiB, enough to hold one recvmmsg
|
||||||
|
// burst worth of MTU-sized packets without the arena growing.
|
||||||
|
const DefaultMultiArenaCap = initialSlots * 65535
|
||||||
|
|
||||||
// NewMultiCoalescer builds a multi-lane batcher. tcpEnabled lets the caller
|
// NewMultiCoalescer builds a multi-lane batcher. tcpEnabled lets the caller
|
||||||
// opt out of TCP coalescing (e.g. when the queue can't do TSO); udpEnabled
|
// opt out of TCP coalescing (e.g. when the queue can't do TSO); udpEnabled
|
||||||
// likewise gates UDP coalescing (only enable when USO was negotiated).
|
// likewise gates UDP coalescing (only enable when USO was negotiated).
|
||||||
// Either lane disabled redirects its traffic into the passthrough lane.
|
// Either lane disabled redirects its traffic into the passthrough lane.
|
||||||
func NewMultiCoalescer(w io.Writer, l *slog.Logger, tcpEnabled, udpEnabled bool) *MultiCoalescer {
|
// arena is the single backing slab shared across every lane; the caller
|
||||||
|
// pre-sizes it via NewArena so the hot path never allocates.
|
||||||
|
func NewMultiCoalescer(w io.Writer, l *slog.Logger, arena *Arena, tcpEnabled, udpEnabled bool) *MultiCoalescer {
|
||||||
m := &MultiCoalescer{
|
m := &MultiCoalescer{
|
||||||
pt: NewPassthrough(w),
|
pt: NewPassthrough(w, arena),
|
||||||
backing: make([]byte, 0, initialSlots*65535),
|
arena: arena,
|
||||||
}
|
}
|
||||||
if tcpEnabled {
|
if tcpEnabled {
|
||||||
m.tcp = NewTCPCoalescer(w, l)
|
m.tcp = NewTCPCoalescer(w, l, arena)
|
||||||
}
|
}
|
||||||
if udpEnabled {
|
if udpEnabled {
|
||||||
m.udp = NewUDPCoalescer(w)
|
m.udp = NewUDPCoalescer(w, arena)
|
||||||
}
|
}
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MultiCoalescer) Reserve(sz int) []byte {
|
func (m *MultiCoalescer) Reserve(sz int) []byte {
|
||||||
if len(m.backing)+sz > cap(m.backing) {
|
return m.arena.Reserve(sz)
|
||||||
newCap := max(cap(m.backing)*2, sz)
|
|
||||||
m.backing = make([]byte, 0, newCap)
|
|
||||||
}
|
|
||||||
start := len(m.backing)
|
|
||||||
m.backing = m.backing[:start+sz]
|
|
||||||
return m.backing[start : start+sz : start+sz]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit dispatches pkt to the appropriate lane based on IP version + L4
|
// Commit dispatches pkt to the appropriate lane based on IP version + L4
|
||||||
@@ -129,6 +135,5 @@ func (m *MultiCoalescer) Flush() error {
|
|||||||
if err := m.pt.Flush(); err != nil {
|
if err := m.pt.Flush(); err != nil {
|
||||||
errs = append(errs, err)
|
errs = append(errs, err)
|
||||||
}
|
}
|
||||||
m.backing = m.backing[:0]
|
|
||||||
return errors.Join(errs...)
|
return errors.Join(errs...)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ import (
|
|||||||
// else (ICMP here) falls through to plain Write.
|
// else (ICMP here) falls through to plain Write.
|
||||||
func TestMultiCoalescerRoutesByProto(t *testing.T) {
|
func TestMultiCoalescerRoutesByProto(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
m := NewMultiCoalescer(w, test.NewLogger(), true, true)
|
m := NewMultiCoalescer(w, test.NewLogger(), NewArena(0), true, true)
|
||||||
|
|
||||||
tcpPay := make([]byte, 1200)
|
tcpPay := make([]byte, 1200)
|
||||||
udpPay := make([]byte, 1200)
|
udpPay := make([]byte, 1200)
|
||||||
@@ -53,7 +53,7 @@ func TestMultiCoalescerRoutesByProto(t *testing.T) {
|
|||||||
// the kernel via the passthrough lane rather than being lost.
|
// the kernel via the passthrough lane rather than being lost.
|
||||||
func TestMultiCoalescerDisabledUDPFallsThrough(t *testing.T) {
|
func TestMultiCoalescerDisabledUDPFallsThrough(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
m := NewMultiCoalescer(w, test.NewLogger(), true, false) // TSO on, USO off
|
m := NewMultiCoalescer(w, test.NewLogger(), NewArena(0), true, false) // TSO on, USO off
|
||||||
|
|
||||||
if err := m.Commit(buildUDPv4(1000, 53, make([]byte, 800))); err != nil {
|
if err := m.Commit(buildUDPv4(1000, 53, make([]byte, 800))); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -75,7 +75,7 @@ func TestMultiCoalescerDisabledUDPFallsThrough(t *testing.T) {
|
|||||||
// TestMultiCoalescerDisabledTCPFallsThrough mirrors the TSO=off case.
|
// TestMultiCoalescerDisabledTCPFallsThrough mirrors the TSO=off case.
|
||||||
func TestMultiCoalescerDisabledTCPFallsThrough(t *testing.T) {
|
func TestMultiCoalescerDisabledTCPFallsThrough(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
m := NewMultiCoalescer(w, test.NewLogger(), false, true) // TSO off, USO on
|
m := NewMultiCoalescer(w, test.NewLogger(), NewArena(0), false, true) // TSO off, USO on
|
||||||
|
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := m.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := m.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
|
|||||||
@@ -10,29 +10,27 @@ import (
|
|||||||
type Passthrough struct {
|
type Passthrough struct {
|
||||||
out io.Writer
|
out io.Writer
|
||||||
slots [][]byte
|
slots [][]byte
|
||||||
backing []byte
|
// arena is injected; see TCPCoalescer.arena for the contract.
|
||||||
|
arena *Arena
|
||||||
cursor int
|
cursor int
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPassthrough(w io.Writer) *Passthrough {
|
const passthroughBaseNumSlots = 128
|
||||||
const baseNumSlots = 128
|
|
||||||
|
// DefaultPassthroughArenaCap is the recommended arena capacity for a
|
||||||
|
// standalone Passthrough batcher: 128 slots × udp.MTU ≈ 1.1 MiB.
|
||||||
|
const DefaultPassthroughArenaCap = passthroughBaseNumSlots * udp.MTU
|
||||||
|
|
||||||
|
func NewPassthrough(w io.Writer, arena *Arena) *Passthrough {
|
||||||
return &Passthrough{
|
return &Passthrough{
|
||||||
out: w,
|
out: w,
|
||||||
slots: make([][]byte, 0, baseNumSlots),
|
slots: make([][]byte, 0, passthroughBaseNumSlots),
|
||||||
backing: make([]byte, 0, baseNumSlots*udp.MTU),
|
arena: arena,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Passthrough) Reserve(sz int) []byte {
|
func (p *Passthrough) Reserve(sz int) []byte {
|
||||||
if len(p.backing)+sz > cap(p.backing) {
|
return p.arena.Reserve(sz)
|
||||||
// Grow: allocate a fresh backing. Already-committed slices still
|
|
||||||
// reference the old array and remain valid until Flush drops them.
|
|
||||||
newCap := max(cap(p.backing)*2, sz)
|
|
||||||
p.backing = make([]byte, 0, newCap)
|
|
||||||
}
|
|
||||||
start := len(p.backing)
|
|
||||||
p.backing = p.backing[:start+sz]
|
|
||||||
return p.backing[start : start+sz : start+sz] //return zero length, sz-cap slice
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Passthrough) Commit(pkt []byte) error {
|
func (p *Passthrough) Commit(pkt []byte) error {
|
||||||
@@ -50,6 +48,6 @@ func (p *Passthrough) Flush() error {
|
|||||||
}
|
}
|
||||||
clear(p.slots)
|
clear(p.slots)
|
||||||
p.slots = p.slots[:0]
|
p.slots = p.slots[:0]
|
||||||
p.backing = p.backing[:0]
|
p.arena.Reset()
|
||||||
return firstErr
|
return firstErr
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -84,17 +84,21 @@ type TCPCoalescer struct {
|
|||||||
lastSlot *coalesceSlot
|
lastSlot *coalesceSlot
|
||||||
pool []*coalesceSlot // free list for reuse
|
pool []*coalesceSlot // free list for reuse
|
||||||
|
|
||||||
backing []byte
|
// arena is injected; the coalescer borrows slices from it via Reserve
|
||||||
|
// and tells it to release them via Reset on Flush. When wrapped in
|
||||||
|
// MultiCoalescer the same *Arena is shared with the other lanes so
|
||||||
|
// there's exactly one backing slab per Multi instance.
|
||||||
|
arena *Arena
|
||||||
l *slog.Logger
|
l *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTCPCoalescer(w io.Writer, l *slog.Logger) *TCPCoalescer {
|
func NewTCPCoalescer(w io.Writer, l *slog.Logger, arena *Arena) *TCPCoalescer {
|
||||||
c := &TCPCoalescer{
|
c := &TCPCoalescer{
|
||||||
plainW: w,
|
plainW: w,
|
||||||
slots: make([]*coalesceSlot, 0, initialSlots),
|
slots: make([]*coalesceSlot, 0, initialSlots),
|
||||||
openSlots: make(map[flowKey]*coalesceSlot, initialSlots),
|
openSlots: make(map[flowKey]*coalesceSlot, initialSlots),
|
||||||
pool: make([]*coalesceSlot, 0, initialSlots),
|
pool: make([]*coalesceSlot, 0, initialSlots),
|
||||||
backing: make([]byte, 0, initialSlots*65535),
|
arena: arena,
|
||||||
l: l,
|
l: l,
|
||||||
}
|
}
|
||||||
if gw, ok := tio.SupportsGSO(w, tio.GSOProtoTCP); ok {
|
if gw, ok := tio.SupportsGSO(w, tio.GSOProtoTCP); ok {
|
||||||
@@ -174,7 +178,7 @@ func (p parsedTCP) coalesceable() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *TCPCoalescer) Reserve(sz int) []byte {
|
func (c *TCPCoalescer) Reserve(sz int) []byte {
|
||||||
return reserveFromBacking(&c.backing, sz)
|
return c.arena.Reserve(sz)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit borrows pkt. The caller must keep pkt valid until the next Flush,
|
// Commit borrows pkt. The caller must keep pkt valid until the next Flush,
|
||||||
@@ -274,7 +278,7 @@ func (c *TCPCoalescer) Flush() error {
|
|||||||
clear(c.openSlots)
|
clear(c.openSlots)
|
||||||
c.lastSlot = nil
|
c.lastSlot = nil
|
||||||
|
|
||||||
c.backing = c.backing[:0]
|
c.arena.Reset()
|
||||||
return first
|
return first
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -71,7 +71,7 @@ func buildICMPv4() []byte {
|
|||||||
// between batches, and reports per-packet cost.
|
// between batches, and reports per-packet cost.
|
||||||
func runCommitBench(b *testing.B, pkts [][]byte, batchSize int) {
|
func runCommitBench(b *testing.B, pkts [][]byte, batchSize int) {
|
||||||
b.Helper()
|
b.Helper()
|
||||||
c := NewTCPCoalescer(nopTunWriter{}, test.NewLogger())
|
c := NewTCPCoalescer(nopTunWriter{}, test.NewLogger(), NewArena(0))
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.SetBytes(int64(len(pkts[0])))
|
b.SetBytes(int64(len(pkts[0])))
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
@@ -140,7 +140,7 @@ func BenchmarkCommitNonCoalesceableTCP(b *testing.B) {
|
|||||||
// is the bench that shows the savings of skipping the lane's re-parse.
|
// is the bench that shows the savings of skipping the lane's re-parse.
|
||||||
func runMultiCommitBench(b *testing.B, pkts [][]byte, batchSize int) {
|
func runMultiCommitBench(b *testing.B, pkts [][]byte, batchSize int) {
|
||||||
b.Helper()
|
b.Helper()
|
||||||
m := NewMultiCoalescer(nopTunWriter{}, test.NewLogger(), true, true)
|
m := NewMultiCoalescer(nopTunWriter{}, test.NewLogger(), NewArena(0), true, true)
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.SetBytes(int64(len(pkts[0])))
|
b.SetBytes(int64(len(pkts[0])))
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|||||||
@@ -128,7 +128,7 @@ const (
|
|||||||
|
|
||||||
func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) {
|
func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: false}
|
w := &fakeTunWriter{gsoEnabled: false}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pkt := buildTCPv4(1000, tcpAck, []byte("hello"))
|
pkt := buildTCPv4(1000, tcpAck, []byte("hello"))
|
||||||
if err := c.Commit(pkt); err != nil {
|
if err := c.Commit(pkt); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -147,7 +147,7 @@ func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerNonTCPPassthrough(t *testing.T) {
|
func TestCoalescerNonTCPPassthrough(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pkt := make([]byte, 28)
|
pkt := make([]byte, 28)
|
||||||
pkt[0] = 0x45
|
pkt[0] = 0x45
|
||||||
binary.BigEndian.PutUint16(pkt[2:4], 28)
|
binary.BigEndian.PutUint16(pkt[2:4], 28)
|
||||||
@@ -167,7 +167,7 @@ func TestCoalescerNonTCPPassthrough(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerSeedThenFlushAlone(t *testing.T) {
|
func TestCoalescerSeedThenFlushAlone(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pkt := buildTCPv4(1000, tcpAck, make([]byte, 1000))
|
pkt := buildTCPv4(1000, tcpAck, make([]byte, 1000))
|
||||||
if err := c.Commit(pkt); err != nil {
|
if err := c.Commit(pkt); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -194,7 +194,7 @@ func TestCoalescerSeedThenFlushAlone(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerCoalescesAdjacentACKs(t *testing.T) {
|
func TestCoalescerCoalescesAdjacentACKs(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -234,7 +234,7 @@ func TestCoalescerCoalescesAdjacentACKs(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerRejectsSeqGap(t *testing.T) {
|
func TestCoalescerRejectsSeqGap(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -253,7 +253,7 @@ func TestCoalescerRejectsSeqGap(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerRejectsFlagMismatch(t *testing.T) {
|
func TestCoalescerRejectsFlagMismatch(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -274,7 +274,7 @@ func TestCoalescerRejectsFlagMismatch(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerRejectsFIN(t *testing.T) {
|
func TestCoalescerRejectsFIN(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
fin := buildTCPv4(1000, tcpAck|tcpFin, []byte("x"))
|
fin := buildTCPv4(1000, tcpAck|tcpFin, []byte("x"))
|
||||||
if err := c.Commit(fin); err != nil {
|
if err := c.Commit(fin); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -290,7 +290,7 @@ func TestCoalescerRejectsFIN(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerShortLastSegmentClosesChain(t *testing.T) {
|
func TestCoalescerShortLastSegmentClosesChain(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
full := make([]byte, 1200)
|
full := make([]byte, 1200)
|
||||||
half := make([]byte, 500)
|
half := make([]byte, 500)
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, full)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, full)); err != nil {
|
||||||
@@ -325,7 +325,7 @@ func TestCoalescerShortLastSegmentClosesChain(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerPSHFinalizesChain(t *testing.T) {
|
func TestCoalescerPSHFinalizesChain(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -355,7 +355,7 @@ func TestCoalescerPSHFinalizesChain(t *testing.T) {
|
|||||||
// coalescer drops it the sender's push signal never reaches the receiver.
|
// coalescer drops it the sender's push signal never reaches the receiver.
|
||||||
func TestCoalescerPropagatesPSHFromAppended(t *testing.T) {
|
func TestCoalescerPropagatesPSHFromAppended(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
// Seed has no PSH; second segment carries PSH and seals the chain.
|
// Seed has no PSH; second segment carries PSH and seals the chain.
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
@@ -383,7 +383,7 @@ func TestCoalescerPropagatesPSHFromAppended(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerRejectsDifferentFlow(t *testing.T) {
|
func TestCoalescerRejectsDifferentFlow(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
p1 := buildTCPv4(1000, tcpAck, pay)
|
p1 := buildTCPv4(1000, tcpAck, pay)
|
||||||
p2 := buildTCPv4(2200, tcpAck, pay)
|
p2 := buildTCPv4(2200, tcpAck, pay)
|
||||||
@@ -405,7 +405,7 @@ func TestCoalescerRejectsDifferentFlow(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerRejectsIPOptions(t *testing.T) {
|
func TestCoalescerRejectsIPOptions(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 500)
|
pay := make([]byte, 500)
|
||||||
pkt := buildTCPv4(1000, tcpAck, pay)
|
pkt := buildTCPv4(1000, tcpAck, pay)
|
||||||
// Bump IHL to 6 to simulate 4 bytes of IP options. Don't actually add
|
// Bump IHL to 6 to simulate 4 bytes of IP options. Don't actually add
|
||||||
@@ -425,7 +425,7 @@ func TestCoalescerRejectsIPOptions(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerCapBySegments(t *testing.T) {
|
func TestCoalescerCapBySegments(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 512)
|
pay := make([]byte, 512)
|
||||||
seq := uint32(1000)
|
seq := uint32(1000)
|
||||||
for i := 0; i < tcpCoalesceMaxSegs+5; i++ {
|
for i := 0; i < tcpCoalesceMaxSegs+5; i++ {
|
||||||
@@ -449,7 +449,7 @@ func TestCoalescerCapBySegments(t *testing.T) {
|
|||||||
// flows coalesce independently in a single Flush.
|
// flows coalesce independently in a single Flush.
|
||||||
func TestCoalescerMultipleFlowsInSameBatch(t *testing.T) {
|
func TestCoalescerMultipleFlowsInSameBatch(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
|
|
||||||
// Flow A: sport 1000. Flow B: sport 3000.
|
// Flow A: sport 1000. Flow B: sport 3000.
|
||||||
@@ -506,7 +506,7 @@ func TestCoalescerMultipleFlowsInSameBatch(t *testing.T) {
|
|||||||
// writing passthrough packets synchronously.
|
// writing passthrough packets synchronously.
|
||||||
func TestCoalescerPreservesArrivalOrder(t *testing.T) {
|
func TestCoalescerPreservesArrivalOrder(t *testing.T) {
|
||||||
w := &orderedFakeWriter{gsoEnabled: true}
|
w := &orderedFakeWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
// Sequence: coalesceable TCP, ICMP (passthrough), coalesceable TCP on
|
// Sequence: coalesceable TCP, ICMP (passthrough), coalesceable TCP on
|
||||||
// a different flow. Expected emit order: gso(X), plain(ICMP), gso(Y).
|
// a different flow. Expected emit order: gso(X), plain(ICMP), gso(Y).
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
@@ -574,7 +574,7 @@ func stringSliceEq(a, b []string) bool {
|
|||||||
// packet (SYN) mid-flow only flushes its own flow, not others.
|
// packet (SYN) mid-flow only flushes its own flow, not others.
|
||||||
func TestCoalescerInterleavedFlowsPreserveOrdering(t *testing.T) {
|
func TestCoalescerInterleavedFlowsPreserveOrdering(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
|
|
||||||
// Flow A two segments.
|
// Flow A two segments.
|
||||||
@@ -679,7 +679,7 @@ func buildTCPv6(tcLow byte, seq uint32, flags byte, payload []byte) []byte {
|
|||||||
// retains ECE on the wire.
|
// retains ECE on the wire.
|
||||||
func TestCoalescerCoalescesEceFlow(t *testing.T) {
|
func TestCoalescerCoalescesEceFlow(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
flags := byte(tcpAck | tcpEce)
|
flags := byte(tcpAck | tcpEce)
|
||||||
if err := c.Commit(buildTCPv4(1000, flags, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, flags, pay)); err != nil {
|
||||||
@@ -708,7 +708,7 @@ func TestCoalescerCoalescesEceFlow(t *testing.T) {
|
|||||||
// in-flow segment seeds a new slot rather than extending the prior burst.
|
// in-flow segment seeds a new slot rather than extending the prior burst.
|
||||||
func TestCoalescerCwrSealsFlow(t *testing.T) {
|
func TestCoalescerCwrSealsFlow(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -741,7 +741,7 @@ func TestCoalescerCwrSealsFlow(t *testing.T) {
|
|||||||
// a CE-echoing window or none.
|
// a CE-echoing window or none.
|
||||||
func TestCoalescerEceMismatchReseeds(t *testing.T) {
|
func TestCoalescerEceMismatchReseeds(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck|tcpEce, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck|tcpEce, pay)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -766,7 +766,7 @@ func TestCoalescerEceMismatchReseeds(t *testing.T) {
|
|||||||
// CE-marked packet still coalesces, and the merged superpacket carries CE.
|
// CE-marked packet still coalesces, and the merged superpacket carries CE.
|
||||||
func TestCoalescerMergesCEMark(t *testing.T) {
|
func TestCoalescerMergesCEMark(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := c.Commit(buildTCPv4WithToS(ecnECT0, 1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4WithToS(ecnECT0, 1000, tcpAck, pay)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -797,7 +797,7 @@ func TestCoalescerMergesCEMark(t *testing.T) {
|
|||||||
// headersMatch did not also relax DSCP — different DSCP must still split.
|
// headersMatch did not also relax DSCP — different DSCP must still split.
|
||||||
func TestCoalescerDscpMismatchReseeds(t *testing.T) {
|
func TestCoalescerDscpMismatchReseeds(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
// Same ECN (Not-ECT), different DSCP (0x10 vs 0x20 in upper 6 bits).
|
// Same ECN (Not-ECT), different DSCP (0x10 vs 0x20 in upper 6 bits).
|
||||||
tosA := byte(0x10<<2) | ecnNotECT
|
tosA := byte(0x10<<2) | ecnNotECT
|
||||||
@@ -820,7 +820,7 @@ func TestCoalescerDscpMismatchReseeds(t *testing.T) {
|
|||||||
// TestCoalescerCoalescesEceFlow.
|
// TestCoalescerCoalescesEceFlow.
|
||||||
func TestCoalescerIPv6CoalescesEceFlow(t *testing.T) {
|
func TestCoalescerIPv6CoalescesEceFlow(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
flags := byte(tcpAck | tcpEce)
|
flags := byte(tcpAck | tcpEce)
|
||||||
if err := c.Commit(buildTCPv6(0, 1000, flags, pay)); err != nil {
|
if err := c.Commit(buildTCPv6(0, 1000, flags, pay)); err != nil {
|
||||||
@@ -851,7 +851,7 @@ func TestCoalescerIPv6CoalescesEceFlow(t *testing.T) {
|
|||||||
// seen had the wire never reordered.
|
// seen had the wire never reordered.
|
||||||
func TestCoalescerSortsReorderedSeedsAndMerges(t *testing.T) {
|
func TestCoalescerSortsReorderedSeedsAndMerges(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
// Arrival order: seq 1000, 3400, 2200. The 3400 seeds a separate slot
|
// Arrival order: seq 1000, 3400, 2200. The 3400 seeds a separate slot
|
||||||
// because 3400 != nextSeq=2200, then 2200 fails to extend the 3400 slot
|
// because 3400 != nextSeq=2200, then 2200 fails to extend the 3400 slot
|
||||||
@@ -887,7 +887,7 @@ func TestCoalescerSortsReorderedSeedsAndMerges(t *testing.T) {
|
|||||||
// without any cross-flow contamination.
|
// without any cross-flow contamination.
|
||||||
func TestCoalescerSortAcrossFlowsMergesEachIndependently(t *testing.T) {
|
func TestCoalescerSortAcrossFlowsMergesEachIndependently(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
// Flow A (sport 1000) seq 100, 1300; flow B (sport 3000) seq 500, 1700.
|
// Flow A (sport 1000) seq 100, 1300; flow B (sport 3000) seq 500, 1700.
|
||||||
// Arrival: A.1300, B.1700, A.100, B.500 — every flow reordered.
|
// Arrival: A.1300, B.1700, A.100, B.500 — every flow reordered.
|
||||||
@@ -938,7 +938,7 @@ func TestCoalescerSortAcrossFlowsMergesEachIndependently(t *testing.T) {
|
|||||||
// boundary by an arbitrary number of segments.
|
// boundary by an arbitrary number of segments.
|
||||||
func TestCoalescerSortKeepsPSHBoundary(t *testing.T) {
|
func TestCoalescerSortKeepsPSHBoundary(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
// Seq 1000 (no PSH) + 2200 (PSH) → seal one slot with PSH set.
|
// Seq 1000 (no PSH) + 2200 (PSH) → seal one slot with PSH set.
|
||||||
// Seq 3400 (no PSH) is contiguous to 3400 from seq 2200+1200; without
|
// Seq 3400 (no PSH) is contiguous to 3400 from seq 2200+1200; without
|
||||||
@@ -966,7 +966,7 @@ func TestCoalescerSortKeepsPSHBoundary(t *testing.T) {
|
|||||||
// is sorted/merged independently.
|
// is sorted/merged independently.
|
||||||
func TestCoalescerSortKeepsPassthroughBarrier(t *testing.T) {
|
func TestCoalescerSortKeepsPassthroughBarrier(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
// First two segments seed S1 (then a 3400 reorder seeds S2).
|
// First two segments seed S1 (then a 3400 reorder seeds S2).
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
@@ -999,7 +999,7 @@ func TestCoalescerSortKeepsPassthroughBarrier(t *testing.T) {
|
|||||||
// TestCoalescerMergesCEMark. ECN bits live in TC[1:0] = byte 1 mask 0x30.
|
// TestCoalescerMergesCEMark. ECN bits live in TC[1:0] = byte 1 mask 0x30.
|
||||||
func TestCoalescerIPv6MergesCEMark(t *testing.T) {
|
func TestCoalescerIPv6MergesCEMark(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w, test.NewLogger())
|
c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
// tcLow is the low 4 bits of TC; ECN occupies the bottom 2 of those.
|
// tcLow is the low 4 bits of TC; ECN occupies the bottom 2 of those.
|
||||||
if err := c.Commit(buildTCPv6(ecnECT0, 1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv6(ecnECT0, 1000, tcpAck, pay)); err != nil {
|
||||||
|
|||||||
@@ -66,7 +66,8 @@ type UDPCoalescer struct {
|
|||||||
openSlots map[flowKey]*udpSlot
|
openSlots map[flowKey]*udpSlot
|
||||||
pool []*udpSlot
|
pool []*udpSlot
|
||||||
|
|
||||||
backing []byte
|
// arena is injected; see TCPCoalescer.arena for the contract.
|
||||||
|
arena *Arena
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewUDPCoalescer wraps w. The caller is responsible for only constructing
|
// NewUDPCoalescer wraps w. The caller is responsible for only constructing
|
||||||
@@ -74,13 +75,13 @@ type UDPCoalescer struct {
|
|||||||
// the kernel may reject GSO_UDP_L4 writes. If w does not implement
|
// the kernel may reject GSO_UDP_L4 writes. If w does not implement
|
||||||
// tio.GSOWriter at all (single-packet Queue), the coalescer degrades to
|
// tio.GSOWriter at all (single-packet Queue), the coalescer degrades to
|
||||||
// plain Writes — same defensive shape as the TCP coalescer.
|
// plain Writes — same defensive shape as the TCP coalescer.
|
||||||
func NewUDPCoalescer(w io.Writer) *UDPCoalescer {
|
func NewUDPCoalescer(w io.Writer, arena *Arena) *UDPCoalescer {
|
||||||
c := &UDPCoalescer{
|
c := &UDPCoalescer{
|
||||||
plainW: w,
|
plainW: w,
|
||||||
slots: make([]*udpSlot, 0, initialSlots),
|
slots: make([]*udpSlot, 0, initialSlots),
|
||||||
openSlots: make(map[flowKey]*udpSlot, initialSlots),
|
openSlots: make(map[flowKey]*udpSlot, initialSlots),
|
||||||
pool: make([]*udpSlot, 0, initialSlots),
|
pool: make([]*udpSlot, 0, initialSlots),
|
||||||
backing: make([]byte, 0, initialSlots*udpCoalesceBufSize),
|
arena: arena,
|
||||||
}
|
}
|
||||||
if gw, ok := tio.SupportsGSO(w, tio.GSOProtoUDP); ok {
|
if gw, ok := tio.SupportsGSO(w, tio.GSOProtoUDP); ok {
|
||||||
c.gsoW = gw
|
c.gsoW = gw
|
||||||
@@ -126,7 +127,7 @@ func parseUDP(pkt []byte) (parsedUDP, bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *UDPCoalescer) Reserve(sz int) []byte {
|
func (c *UDPCoalescer) Reserve(sz int) []byte {
|
||||||
return reserveFromBacking(&c.backing, sz)
|
return c.arena.Reserve(sz)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit borrows pkt. The caller must keep pkt valid until the next Flush.
|
// Commit borrows pkt. The caller must keep pkt valid until the next Flush.
|
||||||
@@ -183,7 +184,7 @@ func (c *UDPCoalescer) Flush() error {
|
|||||||
clear(c.slots)
|
clear(c.slots)
|
||||||
c.slots = c.slots[:0]
|
c.slots = c.slots[:0]
|
||||||
clear(c.openSlots)
|
clear(c.openSlots)
|
||||||
c.backing = c.backing[:0]
|
c.arena.Reset()
|
||||||
return first
|
return first
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -60,7 +60,7 @@ func buildUDPv6(sport, dport uint16, payload []byte) []byte {
|
|||||||
|
|
||||||
func TestUDPCoalescerPassthroughWhenGSOUnavailable(t *testing.T) {
|
func TestUDPCoalescerPassthroughWhenGSOUnavailable(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: false}
|
w := &fakeTunWriter{gsoEnabled: false}
|
||||||
c := NewUDPCoalescer(w)
|
c := NewUDPCoalescer(w, NewArena(0))
|
||||||
pkt := buildUDPv4(1000, 53, make([]byte, 100))
|
pkt := buildUDPv4(1000, 53, make([]byte, 100))
|
||||||
if err := c.Commit(pkt); err != nil {
|
if err := c.Commit(pkt); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -78,7 +78,7 @@ func TestUDPCoalescerPassthroughWhenGSOUnavailable(t *testing.T) {
|
|||||||
|
|
||||||
func TestUDPCoalescerNonUDPPassthrough(t *testing.T) {
|
func TestUDPCoalescerNonUDPPassthrough(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewUDPCoalescer(w)
|
c := NewUDPCoalescer(w, NewArena(0))
|
||||||
// ICMP packet
|
// ICMP packet
|
||||||
pkt := make([]byte, 28)
|
pkt := make([]byte, 28)
|
||||||
pkt[0] = 0x45
|
pkt[0] = 0x45
|
||||||
@@ -99,7 +99,7 @@ func TestUDPCoalescerNonUDPPassthrough(t *testing.T) {
|
|||||||
|
|
||||||
func TestUDPCoalescerSeedThenFlushAlone(t *testing.T) {
|
func TestUDPCoalescerSeedThenFlushAlone(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewUDPCoalescer(w)
|
c := NewUDPCoalescer(w, NewArena(0))
|
||||||
pkt := buildUDPv4(1000, 53, make([]byte, 800))
|
pkt := buildUDPv4(1000, 53, make([]byte, 800))
|
||||||
if err := c.Commit(pkt); err != nil {
|
if err := c.Commit(pkt); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -116,7 +116,7 @@ func TestUDPCoalescerSeedThenFlushAlone(t *testing.T) {
|
|||||||
|
|
||||||
func TestUDPCoalescerCoalescesEqualSized(t *testing.T) {
|
func TestUDPCoalescerCoalescesEqualSized(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewUDPCoalescer(w)
|
c := NewUDPCoalescer(w, NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil {
|
if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil {
|
||||||
@@ -156,7 +156,7 @@ func TestUDPCoalescerCoalescesEqualSized(t *testing.T) {
|
|||||||
// Last segment may be shorter, sealing the chain.
|
// Last segment may be shorter, sealing the chain.
|
||||||
func TestUDPCoalescerShortLastSegmentSeals(t *testing.T) {
|
func TestUDPCoalescerShortLastSegmentSeals(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewUDPCoalescer(w)
|
c := NewUDPCoalescer(w, NewArena(0))
|
||||||
full := make([]byte, 1200)
|
full := make([]byte, 1200)
|
||||||
tail := make([]byte, 600)
|
tail := make([]byte, 600)
|
||||||
if err := c.Commit(buildUDPv4(1000, 53, full)); err != nil {
|
if err := c.Commit(buildUDPv4(1000, 53, full)); err != nil {
|
||||||
@@ -189,7 +189,7 @@ func TestUDPCoalescerShortLastSegmentSeals(t *testing.T) {
|
|||||||
// A larger-than-gsoSize packet cannot extend the slot — it reseeds.
|
// A larger-than-gsoSize packet cannot extend the slot — it reseeds.
|
||||||
func TestUDPCoalescerLargerThanSeedReseeds(t *testing.T) {
|
func TestUDPCoalescerLargerThanSeedReseeds(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewUDPCoalescer(w)
|
c := NewUDPCoalescer(w, NewArena(0))
|
||||||
if err := c.Commit(buildUDPv4(1000, 53, make([]byte, 800))); err != nil {
|
if err := c.Commit(buildUDPv4(1000, 53, make([]byte, 800))); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -207,7 +207,7 @@ func TestUDPCoalescerLargerThanSeedReseeds(t *testing.T) {
|
|||||||
// Different 5-tuples must not coalesce.
|
// Different 5-tuples must not coalesce.
|
||||||
func TestUDPCoalescerDifferentFlowsKeepSeparate(t *testing.T) {
|
func TestUDPCoalescerDifferentFlowsKeepSeparate(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewUDPCoalescer(w)
|
c := NewUDPCoalescer(w, NewArena(0))
|
||||||
pay := make([]byte, 800)
|
pay := make([]byte, 800)
|
||||||
if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil {
|
if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -238,7 +238,7 @@ func TestUDPCoalescerDifferentFlowsKeepSeparate(t *testing.T) {
|
|||||||
// Caps at udpCoalesceMaxSegs.
|
// Caps at udpCoalesceMaxSegs.
|
||||||
func TestUDPCoalescerCapsAtMaxSegs(t *testing.T) {
|
func TestUDPCoalescerCapsAtMaxSegs(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewUDPCoalescer(w)
|
c := NewUDPCoalescer(w, NewArena(0))
|
||||||
pay := make([]byte, 100)
|
pay := make([]byte, 100)
|
||||||
for i := 0; i < udpCoalesceMaxSegs+5; i++ {
|
for i := 0; i < udpCoalesceMaxSegs+5; i++ {
|
||||||
if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil {
|
if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil {
|
||||||
@@ -264,7 +264,7 @@ func TestUDPCoalescerCapsAtMaxSegs(t *testing.T) {
|
|||||||
// CE marks on appended segments must be merged into the seed's IP TOS.
|
// CE marks on appended segments must be merged into the seed's IP TOS.
|
||||||
func TestUDPCoalescerMergesCEMark(t *testing.T) {
|
func TestUDPCoalescerMergesCEMark(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewUDPCoalescer(w)
|
c := NewUDPCoalescer(w, NewArena(0))
|
||||||
pay := make([]byte, 800)
|
pay := make([]byte, 800)
|
||||||
pkt0 := buildUDPv4(1000, 53, pay) // ECN=00
|
pkt0 := buildUDPv4(1000, 53, pay) // ECN=00
|
||||||
pkt1 := buildUDPv4(1000, 53, pay)
|
pkt1 := buildUDPv4(1000, 53, pay)
|
||||||
@@ -293,7 +293,7 @@ func TestUDPCoalescerMergesCEMark(t *testing.T) {
|
|||||||
// IPv6 path: same flow, equal-sized → coalesced.
|
// IPv6 path: same flow, equal-sized → coalesced.
|
||||||
func TestUDPCoalescerIPv6Coalesces(t *testing.T) {
|
func TestUDPCoalescerIPv6Coalesces(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewUDPCoalescer(w)
|
c := NewUDPCoalescer(w, NewArena(0))
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
if err := c.Commit(buildUDPv6(1000, 53, pay)); err != nil {
|
if err := c.Commit(buildUDPv6(1000, 53, pay)); err != nil {
|
||||||
@@ -329,7 +329,7 @@ func TestUDPCoalescerIPv6Coalesces(t *testing.T) {
|
|||||||
// DSCP differences must reseed (headers don't match outside ECN).
|
// DSCP differences must reseed (headers don't match outside ECN).
|
||||||
func TestUDPCoalescerDSCPMismatchReseeds(t *testing.T) {
|
func TestUDPCoalescerDSCPMismatchReseeds(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewUDPCoalescer(w)
|
c := NewUDPCoalescer(w, NewArena(0))
|
||||||
pay := make([]byte, 800)
|
pay := make([]byte, 800)
|
||||||
pkt0 := buildUDPv4(1000, 53, pay)
|
pkt0 := buildUDPv4(1000, 53, pay)
|
||||||
pkt1 := buildUDPv4(1000, 53, pay)
|
pkt1 := buildUDPv4(1000, 53, pay)
|
||||||
@@ -351,7 +351,7 @@ func TestUDPCoalescerDSCPMismatchReseeds(t *testing.T) {
|
|||||||
// Fragmented IPv4 must not be coalesced.
|
// Fragmented IPv4 must not be coalesced.
|
||||||
func TestUDPCoalescerFragmentedIPv4PassesThrough(t *testing.T) {
|
func TestUDPCoalescerFragmentedIPv4PassesThrough(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewUDPCoalescer(w)
|
c := NewUDPCoalescer(w, NewArena(0))
|
||||||
pkt := buildUDPv4(1000, 53, make([]byte, 200))
|
pkt := buildUDPv4(1000, 53, make([]byte, 200))
|
||||||
binary.BigEndian.PutUint16(pkt[6:8], 0x2000) // MF=1
|
binary.BigEndian.PutUint16(pkt[6:8], 0x2000) // MF=1
|
||||||
if err := c.Commit(pkt); err != nil {
|
if err := c.Commit(pkt); err != nil {
|
||||||
@@ -368,7 +368,7 @@ func TestUDPCoalescerFragmentedIPv4PassesThrough(t *testing.T) {
|
|||||||
// IPv4 with options is not admissible (we require IHL=5).
|
// IPv4 with options is not admissible (we require IHL=5).
|
||||||
func TestUDPCoalescerIPv4WithOptionsPassesThrough(t *testing.T) {
|
func TestUDPCoalescerIPv4WithOptionsPassesThrough(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewUDPCoalescer(w)
|
c := NewUDPCoalescer(w, NewArena(0))
|
||||||
pkt := buildUDPv4(1000, 53, make([]byte, 200))
|
pkt := buildUDPv4(1000, 53, make([]byte, 200))
|
||||||
pkt[0] = 0x46 // IHL = 6 (24-byte IPv4 header — has options)
|
pkt[0] = 0x46 // IHL = 6 (24-byte IPv4 header — has options)
|
||||||
if err := c.Commit(pkt); err != nil {
|
if err := c.Commit(pkt); err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user