From c256f2cfbb39a7fba3e09ff46200aed6309ae91a Mon Sep 17 00:00:00 2001 From: JackDoan Date: Mon, 11 May 2026 17:30:56 -0500 Subject: [PATCH] use less ram pls --- interface.go | 6 ++- overlay/batch/coalesce_core.go | 51 ++++++++++++++++------ overlay/batch/multi_coalesce.go | 37 +++++++++------- overlay/batch/multi_coalesce_test.go | 6 +-- overlay/batch/passthrough.go | 36 ++++++++-------- overlay/batch/tcp_coalesce.go | 16 ++++--- overlay/batch/tcp_coalesce_bench_test.go | 4 +- overlay/batch/tcp_coalesce_test.go | 54 ++++++++++++------------ overlay/batch/udp_coalesce.go | 11 ++--- overlay/batch/udp_coalesce_test.go | 26 ++++++------ 10 files changed, 142 insertions(+), 105 deletions(-) diff --git a/interface.go b/interface.go index 422cb333..01efd82b 100644 --- a/interface.go +++ b/interface.go @@ -285,9 +285,11 @@ func (f *Interface) activate() error { // is on, everything else (and either lane disabled) falls // through to passthrough so non-IP / non-TCP-UDP traffic still // reaches the TUN. - f.batchers[i] = batch.NewMultiCoalescer(f.readers[i], f.l, caps.TSO, caps.USO) + arena := batch.NewArena(batch.DefaultMultiArenaCap) + f.batchers[i] = batch.NewMultiCoalescer(f.readers[i], f.l, arena, caps.TSO, caps.USO) } else { - f.batchers[i] = batch.NewPassthrough(f.readers[i]) + arena := batch.NewArena(batch.DefaultPassthroughArenaCap) + f.batchers[i] = batch.NewPassthrough(f.readers[i], arena) } } diff --git a/overlay/batch/coalesce_core.go b/overlay/batch/coalesce_core.go index 377881b2..d46addc3 100644 --- a/overlay/batch/coalesce_core.go +++ b/overlay/batch/coalesce_core.go @@ -148,16 +148,43 @@ func mergeECNIntoSeed(seedHdr, pktHdr []byte, isV6 bool) { } } -// reserveFromBacking implements the Reserve half of the RxBatcher contract -// shared by TCP and UDP coalescers. The backing slice grows on demand; -// already-committed slices reference the old array and remain valid until -// Flush resets backing. -func reserveFromBacking(backing *[]byte, sz int) []byte { - if len(*backing)+sz > cap(*backing) { - newCap := max(cap(*backing)*2, sz) - *backing = make([]byte, 0, newCap) - } - start := len(*backing) - *backing = (*backing)[:start+sz] - return (*backing)[start : start+sz : start+sz] +// Arena is an injectable byte-slab that hands out non-overlapping borrowed +// slices via Reserve and releases them in bulk via Reset. Coalescers take +// an *Arena at construction so the caller controls the slab lifetime and +// can share one slab across multiple coalescers (MultiCoalescer hands the +// same *Arena to every lane so the lanes don't carry their own backings). +// +// Reserve borrows; the slice is valid until the next Reset. The slab grows +// (by allocating a fresh, larger backing array) if a Reserve doesn't fit; +// pre-size the arena via NewArena to avoid that path on the hot path. +type Arena struct { + buf []byte +} + +// NewArena returns an Arena with a pre-allocated backing of the given +// capacity. Pass 0 if you don't intend to call Reserve (e.g. a test that +// only feeds the coalescer pre-made []byte packets via Commit). +func NewArena(capacity int) *Arena { + return &Arena{buf: make([]byte, 0, capacity)} +} + +// Reserve hands out a non-overlapping sz-byte slice from the arena. If the +// request doesn't fit the current backing, a fresh, larger backing is +// allocated; already-borrowed slices reference the old backing and remain +// valid until Reset. +func (a *Arena) Reserve(sz int) []byte { + if len(a.buf)+sz > cap(a.buf) { + newCap := max(cap(a.buf)*2, sz) + a.buf = make([]byte, 0, newCap) + } + start := len(a.buf) + a.buf = a.buf[:start+sz] + return a.buf[start : start+sz : start+sz] +} + +// Reset releases every slice handed out since the last Reset. Callers must +// not use any previously-borrowed slice after this returns. The underlying +// backing array is retained so subsequent Reserves don't re-allocate. +func (a *Arena) Reset() { + a.buf = a.buf[:0] } diff --git a/overlay/batch/multi_coalesce.go b/overlay/batch/multi_coalesce.go index 8f187549..b6f76fe6 100644 --- a/overlay/batch/multi_coalesce.go +++ b/overlay/batch/multi_coalesce.go @@ -28,37 +28,43 @@ type MultiCoalescer struct { udp *UDPCoalescer pt *Passthrough - // arena shared across all lanes so a single Reserve grows one backing - // slice; lane Commit calls borrow into this same arena. - backing []byte + // arena is shared across every lane (constructor hands the same + // *Arena to TCP, UDP, and Passthrough), so there's exactly one + // backing slab per MultiCoalescer instance. Each lane's Flush calls + // Reset; the resets are idempotent because Multi.Flush drains lanes + // sequentially and never Reserves in between, so a later lane's + // slots stay readable across an earlier lane's Reset (the underlying + // bytes are still alive — Reset only re-slices len to 0). + arena *Arena } +// DefaultMultiArenaCap is the recommended arena capacity for a Multi-lane +// batcher: 64 slots × 65535 bytes ≈ 4 MiB, enough to hold one recvmmsg +// burst worth of MTU-sized packets without the arena growing. +const DefaultMultiArenaCap = initialSlots * 65535 + // NewMultiCoalescer builds a multi-lane batcher. tcpEnabled lets the caller // opt out of TCP coalescing (e.g. when the queue can't do TSO); udpEnabled // likewise gates UDP coalescing (only enable when USO was negotiated). // Either lane disabled redirects its traffic into the passthrough lane. -func NewMultiCoalescer(w io.Writer, l *slog.Logger, tcpEnabled, udpEnabled bool) *MultiCoalescer { +// arena is the single backing slab shared across every lane; the caller +// pre-sizes it via NewArena so the hot path never allocates. +func NewMultiCoalescer(w io.Writer, l *slog.Logger, arena *Arena, tcpEnabled, udpEnabled bool) *MultiCoalescer { m := &MultiCoalescer{ - pt: NewPassthrough(w), - backing: make([]byte, 0, initialSlots*65535), + pt: NewPassthrough(w, arena), + arena: arena, } if tcpEnabled { - m.tcp = NewTCPCoalescer(w, l) + m.tcp = NewTCPCoalescer(w, l, arena) } if udpEnabled { - m.udp = NewUDPCoalescer(w) + m.udp = NewUDPCoalescer(w, arena) } return m } func (m *MultiCoalescer) Reserve(sz int) []byte { - if len(m.backing)+sz > cap(m.backing) { - newCap := max(cap(m.backing)*2, sz) - m.backing = make([]byte, 0, newCap) - } - start := len(m.backing) - m.backing = m.backing[:start+sz] - return m.backing[start : start+sz : start+sz] + return m.arena.Reserve(sz) } // Commit dispatches pkt to the appropriate lane based on IP version + L4 @@ -129,6 +135,5 @@ func (m *MultiCoalescer) Flush() error { if err := m.pt.Flush(); err != nil { errs = append(errs, err) } - m.backing = m.backing[:0] return errors.Join(errs...) } diff --git a/overlay/batch/multi_coalesce_test.go b/overlay/batch/multi_coalesce_test.go index 655cb708..f837510f 100644 --- a/overlay/batch/multi_coalesce_test.go +++ b/overlay/batch/multi_coalesce_test.go @@ -11,7 +11,7 @@ import ( // else (ICMP here) falls through to plain Write. func TestMultiCoalescerRoutesByProto(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - m := NewMultiCoalescer(w, test.NewLogger(), true, true) + m := NewMultiCoalescer(w, test.NewLogger(), NewArena(0), true, true) tcpPay := make([]byte, 1200) udpPay := make([]byte, 1200) @@ -53,7 +53,7 @@ func TestMultiCoalescerRoutesByProto(t *testing.T) { // the kernel via the passthrough lane rather than being lost. func TestMultiCoalescerDisabledUDPFallsThrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - m := NewMultiCoalescer(w, test.NewLogger(), true, false) // TSO on, USO off + m := NewMultiCoalescer(w, test.NewLogger(), NewArena(0), true, false) // TSO on, USO off if err := m.Commit(buildUDPv4(1000, 53, make([]byte, 800))); err != nil { t.Fatal(err) @@ -75,7 +75,7 @@ func TestMultiCoalescerDisabledUDPFallsThrough(t *testing.T) { // TestMultiCoalescerDisabledTCPFallsThrough mirrors the TSO=off case. func TestMultiCoalescerDisabledTCPFallsThrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - m := NewMultiCoalescer(w, test.NewLogger(), false, true) // TSO off, USO on + m := NewMultiCoalescer(w, test.NewLogger(), NewArena(0), false, true) // TSO off, USO on pay := make([]byte, 1200) if err := m.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { diff --git a/overlay/batch/passthrough.go b/overlay/batch/passthrough.go index c7676ccd..6b216005 100644 --- a/overlay/batch/passthrough.go +++ b/overlay/batch/passthrough.go @@ -8,31 +8,29 @@ import ( // Passthrough is a RxBatcher that doesn't batch anything, it just accumulates and then sends packets. type Passthrough struct { - out io.Writer - slots [][]byte - backing []byte - cursor int + out io.Writer + slots [][]byte + // arena is injected; see TCPCoalescer.arena for the contract. + arena *Arena + cursor int } -func NewPassthrough(w io.Writer) *Passthrough { - const baseNumSlots = 128 +const passthroughBaseNumSlots = 128 + +// DefaultPassthroughArenaCap is the recommended arena capacity for a +// standalone Passthrough batcher: 128 slots × udp.MTU ≈ 1.1 MiB. +const DefaultPassthroughArenaCap = passthroughBaseNumSlots * udp.MTU + +func NewPassthrough(w io.Writer, arena *Arena) *Passthrough { return &Passthrough{ - out: w, - slots: make([][]byte, 0, baseNumSlots), - backing: make([]byte, 0, baseNumSlots*udp.MTU), + out: w, + slots: make([][]byte, 0, passthroughBaseNumSlots), + arena: arena, } } func (p *Passthrough) Reserve(sz int) []byte { - if len(p.backing)+sz > cap(p.backing) { - // Grow: allocate a fresh backing. Already-committed slices still - // reference the old array and remain valid until Flush drops them. - newCap := max(cap(p.backing)*2, sz) - p.backing = make([]byte, 0, newCap) - } - start := len(p.backing) - p.backing = p.backing[:start+sz] - return p.backing[start : start+sz : start+sz] //return zero length, sz-cap slice + return p.arena.Reserve(sz) } func (p *Passthrough) Commit(pkt []byte) error { @@ -50,6 +48,6 @@ func (p *Passthrough) Flush() error { } clear(p.slots) p.slots = p.slots[:0] - p.backing = p.backing[:0] + p.arena.Reset() return firstErr } diff --git a/overlay/batch/tcp_coalesce.go b/overlay/batch/tcp_coalesce.go index c85e6190..c50667e0 100644 --- a/overlay/batch/tcp_coalesce.go +++ b/overlay/batch/tcp_coalesce.go @@ -84,17 +84,21 @@ type TCPCoalescer struct { lastSlot *coalesceSlot pool []*coalesceSlot // free list for reuse - backing []byte - l *slog.Logger + // arena is injected; the coalescer borrows slices from it via Reserve + // and tells it to release them via Reset on Flush. When wrapped in + // MultiCoalescer the same *Arena is shared with the other lanes so + // there's exactly one backing slab per Multi instance. + arena *Arena + l *slog.Logger } -func NewTCPCoalescer(w io.Writer, l *slog.Logger) *TCPCoalescer { +func NewTCPCoalescer(w io.Writer, l *slog.Logger, arena *Arena) *TCPCoalescer { c := &TCPCoalescer{ plainW: w, slots: make([]*coalesceSlot, 0, initialSlots), openSlots: make(map[flowKey]*coalesceSlot, initialSlots), pool: make([]*coalesceSlot, 0, initialSlots), - backing: make([]byte, 0, initialSlots*65535), + arena: arena, l: l, } if gw, ok := tio.SupportsGSO(w, tio.GSOProtoTCP); ok { @@ -174,7 +178,7 @@ func (p parsedTCP) coalesceable() bool { } func (c *TCPCoalescer) Reserve(sz int) []byte { - return reserveFromBacking(&c.backing, sz) + return c.arena.Reserve(sz) } // Commit borrows pkt. The caller must keep pkt valid until the next Flush, @@ -274,7 +278,7 @@ func (c *TCPCoalescer) Flush() error { clear(c.openSlots) c.lastSlot = nil - c.backing = c.backing[:0] + c.arena.Reset() return first } diff --git a/overlay/batch/tcp_coalesce_bench_test.go b/overlay/batch/tcp_coalesce_bench_test.go index 833a11e3..f4ebfcdf 100644 --- a/overlay/batch/tcp_coalesce_bench_test.go +++ b/overlay/batch/tcp_coalesce_bench_test.go @@ -71,7 +71,7 @@ func buildICMPv4() []byte { // between batches, and reports per-packet cost. func runCommitBench(b *testing.B, pkts [][]byte, batchSize int) { b.Helper() - c := NewTCPCoalescer(nopTunWriter{}, test.NewLogger()) + c := NewTCPCoalescer(nopTunWriter{}, test.NewLogger(), NewArena(0)) b.ReportAllocs() b.SetBytes(int64(len(pkts[0]))) b.ResetTimer() @@ -140,7 +140,7 @@ func BenchmarkCommitNonCoalesceableTCP(b *testing.B) { // is the bench that shows the savings of skipping the lane's re-parse. func runMultiCommitBench(b *testing.B, pkts [][]byte, batchSize int) { b.Helper() - m := NewMultiCoalescer(nopTunWriter{}, test.NewLogger(), true, true) + m := NewMultiCoalescer(nopTunWriter{}, test.NewLogger(), NewArena(0), true, true) b.ReportAllocs() b.SetBytes(int64(len(pkts[0]))) b.ResetTimer() diff --git a/overlay/batch/tcp_coalesce_test.go b/overlay/batch/tcp_coalesce_test.go index 1687653e..6da60861 100644 --- a/overlay/batch/tcp_coalesce_test.go +++ b/overlay/batch/tcp_coalesce_test.go @@ -128,7 +128,7 @@ const ( func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) { w := &fakeTunWriter{gsoEnabled: false} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pkt := buildTCPv4(1000, tcpAck, []byte("hello")) if err := c.Commit(pkt); err != nil { t.Fatal(err) @@ -147,7 +147,7 @@ func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) { func TestCoalescerNonTCPPassthrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pkt := make([]byte, 28) pkt[0] = 0x45 binary.BigEndian.PutUint16(pkt[2:4], 28) @@ -167,7 +167,7 @@ func TestCoalescerNonTCPPassthrough(t *testing.T) { func TestCoalescerSeedThenFlushAlone(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pkt := buildTCPv4(1000, tcpAck, make([]byte, 1000)) if err := c.Commit(pkt); err != nil { t.Fatal(err) @@ -194,7 +194,7 @@ func TestCoalescerSeedThenFlushAlone(t *testing.T) { func TestCoalescerCoalescesAdjacentACKs(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -234,7 +234,7 @@ func TestCoalescerCoalescesAdjacentACKs(t *testing.T) { func TestCoalescerRejectsSeqGap(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -253,7 +253,7 @@ func TestCoalescerRejectsSeqGap(t *testing.T) { func TestCoalescerRejectsFlagMismatch(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -274,7 +274,7 @@ func TestCoalescerRejectsFlagMismatch(t *testing.T) { func TestCoalescerRejectsFIN(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) fin := buildTCPv4(1000, tcpAck|tcpFin, []byte("x")) if err := c.Commit(fin); err != nil { t.Fatal(err) @@ -290,7 +290,7 @@ func TestCoalescerRejectsFIN(t *testing.T) { func TestCoalescerShortLastSegmentClosesChain(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) full := make([]byte, 1200) half := make([]byte, 500) if err := c.Commit(buildTCPv4(1000, tcpAck, full)); err != nil { @@ -325,7 +325,7 @@ func TestCoalescerShortLastSegmentClosesChain(t *testing.T) { func TestCoalescerPSHFinalizesChain(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -355,7 +355,7 @@ func TestCoalescerPSHFinalizesChain(t *testing.T) { // coalescer drops it the sender's push signal never reaches the receiver. func TestCoalescerPropagatesPSHFromAppended(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) // Seed has no PSH; second segment carries PSH and seals the chain. if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { @@ -383,7 +383,7 @@ func TestCoalescerPropagatesPSHFromAppended(t *testing.T) { func TestCoalescerRejectsDifferentFlow(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) p1 := buildTCPv4(1000, tcpAck, pay) p2 := buildTCPv4(2200, tcpAck, pay) @@ -405,7 +405,7 @@ func TestCoalescerRejectsDifferentFlow(t *testing.T) { func TestCoalescerRejectsIPOptions(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 500) pkt := buildTCPv4(1000, tcpAck, pay) // Bump IHL to 6 to simulate 4 bytes of IP options. Don't actually add @@ -425,7 +425,7 @@ func TestCoalescerRejectsIPOptions(t *testing.T) { func TestCoalescerCapBySegments(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 512) seq := uint32(1000) for i := 0; i < tcpCoalesceMaxSegs+5; i++ { @@ -449,7 +449,7 @@ func TestCoalescerCapBySegments(t *testing.T) { // flows coalesce independently in a single Flush. func TestCoalescerMultipleFlowsInSameBatch(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) // Flow A: sport 1000. Flow B: sport 3000. @@ -506,7 +506,7 @@ func TestCoalescerMultipleFlowsInSameBatch(t *testing.T) { // writing passthrough packets synchronously. func TestCoalescerPreservesArrivalOrder(t *testing.T) { w := &orderedFakeWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) // Sequence: coalesceable TCP, ICMP (passthrough), coalesceable TCP on // a different flow. Expected emit order: gso(X), plain(ICMP), gso(Y). pay := make([]byte, 1200) @@ -574,7 +574,7 @@ func stringSliceEq(a, b []string) bool { // packet (SYN) mid-flow only flushes its own flow, not others. func TestCoalescerInterleavedFlowsPreserveOrdering(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) // Flow A two segments. @@ -679,7 +679,7 @@ func buildTCPv6(tcLow byte, seq uint32, flags byte, payload []byte) []byte { // retains ECE on the wire. func TestCoalescerCoalescesEceFlow(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) flags := byte(tcpAck | tcpEce) if err := c.Commit(buildTCPv4(1000, flags, pay)); err != nil { @@ -708,7 +708,7 @@ func TestCoalescerCoalescesEceFlow(t *testing.T) { // in-flow segment seeds a new slot rather than extending the prior burst. func TestCoalescerCwrSealsFlow(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -741,7 +741,7 @@ func TestCoalescerCwrSealsFlow(t *testing.T) { // a CE-echoing window or none. func TestCoalescerEceMismatchReseeds(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck|tcpEce, pay)); err != nil { t.Fatal(err) @@ -766,7 +766,7 @@ func TestCoalescerEceMismatchReseeds(t *testing.T) { // CE-marked packet still coalesces, and the merged superpacket carries CE. func TestCoalescerMergesCEMark(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4WithToS(ecnECT0, 1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -797,7 +797,7 @@ func TestCoalescerMergesCEMark(t *testing.T) { // headersMatch did not also relax DSCP — different DSCP must still split. func TestCoalescerDscpMismatchReseeds(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) // Same ECN (Not-ECT), different DSCP (0x10 vs 0x20 in upper 6 bits). tosA := byte(0x10<<2) | ecnNotECT @@ -820,7 +820,7 @@ func TestCoalescerDscpMismatchReseeds(t *testing.T) { // TestCoalescerCoalescesEceFlow. func TestCoalescerIPv6CoalescesEceFlow(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) flags := byte(tcpAck | tcpEce) if err := c.Commit(buildTCPv6(0, 1000, flags, pay)); err != nil { @@ -851,7 +851,7 @@ func TestCoalescerIPv6CoalescesEceFlow(t *testing.T) { // seen had the wire never reordered. func TestCoalescerSortsReorderedSeedsAndMerges(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) // Arrival order: seq 1000, 3400, 2200. The 3400 seeds a separate slot // because 3400 != nextSeq=2200, then 2200 fails to extend the 3400 slot @@ -887,7 +887,7 @@ func TestCoalescerSortsReorderedSeedsAndMerges(t *testing.T) { // without any cross-flow contamination. func TestCoalescerSortAcrossFlowsMergesEachIndependently(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) // Flow A (sport 1000) seq 100, 1300; flow B (sport 3000) seq 500, 1700. // Arrival: A.1300, B.1700, A.100, B.500 — every flow reordered. @@ -938,7 +938,7 @@ func TestCoalescerSortAcrossFlowsMergesEachIndependently(t *testing.T) { // boundary by an arbitrary number of segments. func TestCoalescerSortKeepsPSHBoundary(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) // Seq 1000 (no PSH) + 2200 (PSH) → seal one slot with PSH set. // Seq 3400 (no PSH) is contiguous to 3400 from seq 2200+1200; without @@ -966,7 +966,7 @@ func TestCoalescerSortKeepsPSHBoundary(t *testing.T) { // is sorted/merged independently. func TestCoalescerSortKeepsPassthroughBarrier(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) // First two segments seed S1 (then a 3400 reorder seeds S2). if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { @@ -999,7 +999,7 @@ func TestCoalescerSortKeepsPassthroughBarrier(t *testing.T) { // TestCoalescerMergesCEMark. ECN bits live in TC[1:0] = byte 1 mask 0x30. func TestCoalescerIPv6MergesCEMark(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger()) + c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) pay := make([]byte, 1200) // tcLow is the low 4 bits of TC; ECN occupies the bottom 2 of those. if err := c.Commit(buildTCPv6(ecnECT0, 1000, tcpAck, pay)); err != nil { diff --git a/overlay/batch/udp_coalesce.go b/overlay/batch/udp_coalesce.go index 25b44613..410d2982 100644 --- a/overlay/batch/udp_coalesce.go +++ b/overlay/batch/udp_coalesce.go @@ -66,7 +66,8 @@ type UDPCoalescer struct { openSlots map[flowKey]*udpSlot pool []*udpSlot - backing []byte + // arena is injected; see TCPCoalescer.arena for the contract. + arena *Arena } // NewUDPCoalescer wraps w. The caller is responsible for only constructing @@ -74,13 +75,13 @@ type UDPCoalescer struct { // the kernel may reject GSO_UDP_L4 writes. If w does not implement // tio.GSOWriter at all (single-packet Queue), the coalescer degrades to // plain Writes — same defensive shape as the TCP coalescer. -func NewUDPCoalescer(w io.Writer) *UDPCoalescer { +func NewUDPCoalescer(w io.Writer, arena *Arena) *UDPCoalescer { c := &UDPCoalescer{ plainW: w, slots: make([]*udpSlot, 0, initialSlots), openSlots: make(map[flowKey]*udpSlot, initialSlots), pool: make([]*udpSlot, 0, initialSlots), - backing: make([]byte, 0, initialSlots*udpCoalesceBufSize), + arena: arena, } if gw, ok := tio.SupportsGSO(w, tio.GSOProtoUDP); ok { c.gsoW = gw @@ -126,7 +127,7 @@ func parseUDP(pkt []byte) (parsedUDP, bool) { } func (c *UDPCoalescer) Reserve(sz int) []byte { - return reserveFromBacking(&c.backing, sz) + return c.arena.Reserve(sz) } // Commit borrows pkt. The caller must keep pkt valid until the next Flush. @@ -183,7 +184,7 @@ func (c *UDPCoalescer) Flush() error { clear(c.slots) c.slots = c.slots[:0] clear(c.openSlots) - c.backing = c.backing[:0] + c.arena.Reset() return first } diff --git a/overlay/batch/udp_coalesce_test.go b/overlay/batch/udp_coalesce_test.go index 7eefc41a..368afafc 100644 --- a/overlay/batch/udp_coalesce_test.go +++ b/overlay/batch/udp_coalesce_test.go @@ -60,7 +60,7 @@ func buildUDPv6(sport, dport uint16, payload []byte) []byte { func TestUDPCoalescerPassthroughWhenGSOUnavailable(t *testing.T) { w := &fakeTunWriter{gsoEnabled: false} - c := NewUDPCoalescer(w) + c := NewUDPCoalescer(w, NewArena(0)) pkt := buildUDPv4(1000, 53, make([]byte, 100)) if err := c.Commit(pkt); err != nil { t.Fatal(err) @@ -78,7 +78,7 @@ func TestUDPCoalescerPassthroughWhenGSOUnavailable(t *testing.T) { func TestUDPCoalescerNonUDPPassthrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w) + c := NewUDPCoalescer(w, NewArena(0)) // ICMP packet pkt := make([]byte, 28) pkt[0] = 0x45 @@ -99,7 +99,7 @@ func TestUDPCoalescerNonUDPPassthrough(t *testing.T) { func TestUDPCoalescerSeedThenFlushAlone(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w) + c := NewUDPCoalescer(w, NewArena(0)) pkt := buildUDPv4(1000, 53, make([]byte, 800)) if err := c.Commit(pkt); err != nil { t.Fatal(err) @@ -116,7 +116,7 @@ func TestUDPCoalescerSeedThenFlushAlone(t *testing.T) { func TestUDPCoalescerCoalescesEqualSized(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w) + c := NewUDPCoalescer(w, NewArena(0)) pay := make([]byte, 1200) for i := 0; i < 3; i++ { if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil { @@ -156,7 +156,7 @@ func TestUDPCoalescerCoalescesEqualSized(t *testing.T) { // Last segment may be shorter, sealing the chain. func TestUDPCoalescerShortLastSegmentSeals(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w) + c := NewUDPCoalescer(w, NewArena(0)) full := make([]byte, 1200) tail := make([]byte, 600) if err := c.Commit(buildUDPv4(1000, 53, full)); err != nil { @@ -189,7 +189,7 @@ func TestUDPCoalescerShortLastSegmentSeals(t *testing.T) { // A larger-than-gsoSize packet cannot extend the slot — it reseeds. func TestUDPCoalescerLargerThanSeedReseeds(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w) + c := NewUDPCoalescer(w, NewArena(0)) if err := c.Commit(buildUDPv4(1000, 53, make([]byte, 800))); err != nil { t.Fatal(err) } @@ -207,7 +207,7 @@ func TestUDPCoalescerLargerThanSeedReseeds(t *testing.T) { // Different 5-tuples must not coalesce. func TestUDPCoalescerDifferentFlowsKeepSeparate(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w) + c := NewUDPCoalescer(w, NewArena(0)) pay := make([]byte, 800) if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil { t.Fatal(err) @@ -238,7 +238,7 @@ func TestUDPCoalescerDifferentFlowsKeepSeparate(t *testing.T) { // Caps at udpCoalesceMaxSegs. func TestUDPCoalescerCapsAtMaxSegs(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w) + c := NewUDPCoalescer(w, NewArena(0)) pay := make([]byte, 100) for i := 0; i < udpCoalesceMaxSegs+5; i++ { if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil { @@ -264,7 +264,7 @@ func TestUDPCoalescerCapsAtMaxSegs(t *testing.T) { // CE marks on appended segments must be merged into the seed's IP TOS. func TestUDPCoalescerMergesCEMark(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w) + c := NewUDPCoalescer(w, NewArena(0)) pay := make([]byte, 800) pkt0 := buildUDPv4(1000, 53, pay) // ECN=00 pkt1 := buildUDPv4(1000, 53, pay) @@ -293,7 +293,7 @@ func TestUDPCoalescerMergesCEMark(t *testing.T) { // IPv6 path: same flow, equal-sized → coalesced. func TestUDPCoalescerIPv6Coalesces(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w) + c := NewUDPCoalescer(w, NewArena(0)) pay := make([]byte, 1200) for i := 0; i < 3; i++ { if err := c.Commit(buildUDPv6(1000, 53, pay)); err != nil { @@ -329,7 +329,7 @@ func TestUDPCoalescerIPv6Coalesces(t *testing.T) { // DSCP differences must reseed (headers don't match outside ECN). func TestUDPCoalescerDSCPMismatchReseeds(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w) + c := NewUDPCoalescer(w, NewArena(0)) pay := make([]byte, 800) pkt0 := buildUDPv4(1000, 53, pay) pkt1 := buildUDPv4(1000, 53, pay) @@ -351,7 +351,7 @@ func TestUDPCoalescerDSCPMismatchReseeds(t *testing.T) { // Fragmented IPv4 must not be coalesced. func TestUDPCoalescerFragmentedIPv4PassesThrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w) + c := NewUDPCoalescer(w, NewArena(0)) pkt := buildUDPv4(1000, 53, make([]byte, 200)) binary.BigEndian.PutUint16(pkt[6:8], 0x2000) // MF=1 if err := c.Commit(pkt); err != nil { @@ -368,7 +368,7 @@ func TestUDPCoalescerFragmentedIPv4PassesThrough(t *testing.T) { // IPv4 with options is not admissible (we require IHL=5). func TestUDPCoalescerIPv4WithOptionsPassesThrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w) + c := NewUDPCoalescer(w, NewArena(0)) pkt := buildUDPv4(1000, 53, make([]byte, 200)) pkt[0] = 0x46 // IHL = 6 (24-byte IPv4 header — has options) if err := c.Commit(pkt); err != nil {