grr heap usage!
This commit is contained in:
JackDoan
2026-05-06 16:06:36 -05:00
parent 40b4ae7fb4
commit 6cb00c613c
3 changed files with 123 additions and 14 deletions

View File

@@ -6,7 +6,7 @@ import (
"io" "io"
"log/slog" "log/slog"
"net/netip" "net/netip"
"sort" "slices"
"github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/overlay/tio"
) )
@@ -543,13 +543,23 @@ func (c *TCPCoalescer) sortRun(run []*coalesceSlot) {
if len(run) <= 1 { if len(run) <= 1 {
return return
} }
sort.SliceStable(run, func(i, j int) bool { // slices.SortStableFunc with a free, non-capturing comparator avoids the
a, b := run[i], run[j] // reflection + closure-escape allocations that sort.SliceStable forces.
if cmp := flowKeyCompare(a.fk, b.fk); cmp != 0 { slices.SortStableFunc(run, compareCoalesceSlots)
return cmp < 0 }
}
return tcpSeqLess(slotSeedSeq(a), slotSeedSeq(b)) func compareCoalesceSlots(a, b *coalesceSlot) int {
}) if cmp := flowKeyCompare(a.fk, b.fk); cmp != 0 {
return cmp
}
aSeq, bSeq := slotSeedSeq(a), slotSeedSeq(b)
if aSeq == bSeq {
return 0
}
if tcpSeqLess(aSeq, bSeq) {
return -1
}
return 1
} }
// slotSeedSeq returns the TCP seq of the slot's seed (first segment). // slotSeedSeq returns the TCP seq of the slot's seed (first segment).
@@ -571,12 +581,11 @@ func tcpSeqLess(a, b uint32) bool {
// is irrelevant — only that same-flow slots cluster together so the // is irrelevant — only that same-flow slots cluster together so the
// post-sort sweep can merge contiguous pairs. // post-sort sweep can merge contiguous pairs.
func flowKeyCompare(a, b flowKey) int { func flowKeyCompare(a, b flowKey) int {
if c := bytes.Compare(a.src[:], b.src[:]); c != 0 { // Cheap scalar fields first so most non-matching keys short-circuit
return c // without ever calling bytes.Compare. sport is the ephemeral port on
} // egress flows and discriminates fastest. For matching keys (same
if c := bytes.Compare(a.dst[:], b.dst[:]); c != 0 { // flow), array equality on src/dst inlines to word-sized compares,
return c // so we only pay bytes.Compare when the arrays actually differ.
}
if a.sport != b.sport { if a.sport != b.sport {
if a.sport < b.sport { if a.sport < b.sport {
return -1 return -1
@@ -589,6 +598,12 @@ func flowKeyCompare(a, b flowKey) int {
} }
return 1 return 1
} }
if a.dst != b.dst {
return bytes.Compare(a.dst[:], b.dst[:])
}
if a.src != b.src {
return bytes.Compare(a.src[:], b.src[:])
}
if a.isV6 != b.isV6 { if a.isV6 != b.isV6 {
if !a.isV6 { if !a.isV6 {
return -1 return -1

View File

@@ -2,6 +2,7 @@ package batch
import ( import (
"encoding/binary" "encoding/binary"
"runtime"
"testing" "testing"
"github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/overlay/tio"
@@ -171,3 +172,68 @@ func BenchmarkMultiCommitInterleaved4(b *testing.B) {
pkts := buildTCPv4Interleaved(4, tcpCoalesceMaxSegs, 1200) pkts := buildTCPv4Interleaved(4, tcpCoalesceMaxSegs, 1200)
runMultiCommitBench(b, pkts, len(pkts)) runMultiCommitBench(b, pkts, len(pkts))
} }
// flowKeyPair is one comparison input for the flowKeyCompare bench.
type flowKeyPair struct{ a, b flowKey }
// makeFlowKey builds an IPv4 flowKey from compact inputs.
func makeFlowKey(srcLow, dstLow uint32, sport, dport uint16) flowKey {
var fk flowKey
binary.BigEndian.PutUint32(fk.src[12:16], srcLow)
binary.BigEndian.PutUint32(fk.dst[12:16], dstLow)
fk.sport = sport
fk.dport = dport
return fk
}
// flowKeyCases are the workload mixes flowKeyCompare sees in practice.
// - sameFlow: equal keys; tests the equal-path cost (sort runs hit this
// repeatedly when many segments share a flow).
// - sportDiffers: same src/dst/dport, different sport — the typical
// "sibling flows from one host to one server" pattern.
// - dstDiffers: same src/sport/dport, different dst — outbound to many
// servers from a fixed local port.
// - allDiffer: every field differs; worst case for short-circuiting.
func flowKeyCases() map[string][]flowKeyPair {
const n = 64
cases := map[string][]flowKeyPair{
"sameFlow": make([]flowKeyPair, n),
"sportDiffers": make([]flowKeyPair, n),
"dstDiffers": make([]flowKeyPair, n),
"allDiffer": make([]flowKeyPair, n),
}
for i := range n {
base := makeFlowKey(0x0a000001, 0x0a000002, 40000, 443)
cases["sameFlow"][i] = flowKeyPair{a: base, b: base}
cases["sportDiffers"][i] = flowKeyPair{
a: base,
b: makeFlowKey(0x0a000001, 0x0a000002, uint16(40001+i), 443),
}
cases["dstDiffers"][i] = flowKeyPair{
a: base,
b: makeFlowKey(0x0a000001, uint32(0x0a000002+i+1), 40000, 443),
}
cases["allDiffer"][i] = flowKeyPair{
a: makeFlowKey(uint32(0x0a000001+i), uint32(0x0a000002+i), uint16(40000+i), uint16(80+i)),
b: makeFlowKey(uint32(0x0b000001+i), uint32(0x0b000002+i), uint16(50000+i), uint16(443+i)),
}
}
return cases
}
// BenchmarkFlowKeyCompare measures flowKeyCompare across the workloads
// the sort step actually sees. Use this to compare reorderings.
func BenchmarkFlowKeyCompare(b *testing.B) {
for name, pairs := range flowKeyCases() {
b.Run(name, func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
var sink int
for i := 0; i < b.N; i++ {
p := pairs[i&(len(pairs)-1)]
sink += flowKeyCompare(p.a, p.b)
}
runtime.KeepAlive(sink)
})
}
}

View File

@@ -1020,3 +1020,31 @@ func TestCoalescerIPv6MergesCEMark(t *testing.T) {
t.Errorf("seed v6 ECN=0x%02x want CE 0x%02x", got, ecnCE) t.Errorf("seed v6 ECN=0x%02x want CE 0x%02x", got, ecnCE)
} }
} }
func TestSortRunZeroAllocs(t *testing.T) {
c := &TCPCoalescer{}
mk := func(srcByte byte, seq uint32, pay int) *coalesceSlot {
s := &coalesceSlot{nextSeq: seq + uint32(pay), totalPay: pay}
s.fk.src[0] = srcByte
return s
}
run := []*coalesceSlot{
mk(3, 5000, 100),
mk(1, 1000, 50),
mk(2, 2000, 75),
mk(1, 900, 50),
mk(3, 4900, 100),
mk(2, 1925, 75),
mk(1, 1050, 50),
mk(3, 5100, 100),
}
allocs := testing.AllocsPerRun(100, func() {
// Re-shuffle so each run actually does sorting work.
run[0], run[1], run[2], run[3] = run[3], run[2], run[1], run[0]
c.sortRun(run)
})
if allocs != 0 {
t.Fatalf("sortRun allocates %v times per run; want 0", allocs)
}
}