mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-16 04:47:38 +02:00
faster
grr heap usage!
This commit is contained in:
@@ -6,7 +6,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"sort"
|
"slices"
|
||||||
|
|
||||||
"github.com/slackhq/nebula/overlay/tio"
|
"github.com/slackhq/nebula/overlay/tio"
|
||||||
)
|
)
|
||||||
@@ -543,13 +543,23 @@ func (c *TCPCoalescer) sortRun(run []*coalesceSlot) {
|
|||||||
if len(run) <= 1 {
|
if len(run) <= 1 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
sort.SliceStable(run, func(i, j int) bool {
|
// slices.SortStableFunc with a free, non-capturing comparator avoids the
|
||||||
a, b := run[i], run[j]
|
// reflection + closure-escape allocations that sort.SliceStable forces.
|
||||||
if cmp := flowKeyCompare(a.fk, b.fk); cmp != 0 {
|
slices.SortStableFunc(run, compareCoalesceSlots)
|
||||||
return cmp < 0
|
|
||||||
}
|
}
|
||||||
return tcpSeqLess(slotSeedSeq(a), slotSeedSeq(b))
|
|
||||||
})
|
func compareCoalesceSlots(a, b *coalesceSlot) int {
|
||||||
|
if cmp := flowKeyCompare(a.fk, b.fk); cmp != 0 {
|
||||||
|
return cmp
|
||||||
|
}
|
||||||
|
aSeq, bSeq := slotSeedSeq(a), slotSeedSeq(b)
|
||||||
|
if aSeq == bSeq {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
if tcpSeqLess(aSeq, bSeq) {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
// slotSeedSeq returns the TCP seq of the slot's seed (first segment).
|
// slotSeedSeq returns the TCP seq of the slot's seed (first segment).
|
||||||
@@ -571,12 +581,11 @@ func tcpSeqLess(a, b uint32) bool {
|
|||||||
// is irrelevant — only that same-flow slots cluster together so the
|
// is irrelevant — only that same-flow slots cluster together so the
|
||||||
// post-sort sweep can merge contiguous pairs.
|
// post-sort sweep can merge contiguous pairs.
|
||||||
func flowKeyCompare(a, b flowKey) int {
|
func flowKeyCompare(a, b flowKey) int {
|
||||||
if c := bytes.Compare(a.src[:], b.src[:]); c != 0 {
|
// Cheap scalar fields first so most non-matching keys short-circuit
|
||||||
return c
|
// without ever calling bytes.Compare. sport is the ephemeral port on
|
||||||
}
|
// egress flows and discriminates fastest. For matching keys (same
|
||||||
if c := bytes.Compare(a.dst[:], b.dst[:]); c != 0 {
|
// flow), array equality on src/dst inlines to word-sized compares,
|
||||||
return c
|
// so we only pay bytes.Compare when the arrays actually differ.
|
||||||
}
|
|
||||||
if a.sport != b.sport {
|
if a.sport != b.sport {
|
||||||
if a.sport < b.sport {
|
if a.sport < b.sport {
|
||||||
return -1
|
return -1
|
||||||
@@ -589,6 +598,12 @@ func flowKeyCompare(a, b flowKey) int {
|
|||||||
}
|
}
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
if a.dst != b.dst {
|
||||||
|
return bytes.Compare(a.dst[:], b.dst[:])
|
||||||
|
}
|
||||||
|
if a.src != b.src {
|
||||||
|
return bytes.Compare(a.src[:], b.src[:])
|
||||||
|
}
|
||||||
if a.isV6 != b.isV6 {
|
if a.isV6 != b.isV6 {
|
||||||
if !a.isV6 {
|
if !a.isV6 {
|
||||||
return -1
|
return -1
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package batch
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"runtime"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/slackhq/nebula/overlay/tio"
|
"github.com/slackhq/nebula/overlay/tio"
|
||||||
@@ -171,3 +172,68 @@ func BenchmarkMultiCommitInterleaved4(b *testing.B) {
|
|||||||
pkts := buildTCPv4Interleaved(4, tcpCoalesceMaxSegs, 1200)
|
pkts := buildTCPv4Interleaved(4, tcpCoalesceMaxSegs, 1200)
|
||||||
runMultiCommitBench(b, pkts, len(pkts))
|
runMultiCommitBench(b, pkts, len(pkts))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// flowKeyPair is one comparison input for the flowKeyCompare bench.
|
||||||
|
type flowKeyPair struct{ a, b flowKey }
|
||||||
|
|
||||||
|
// makeFlowKey builds an IPv4 flowKey from compact inputs.
|
||||||
|
func makeFlowKey(srcLow, dstLow uint32, sport, dport uint16) flowKey {
|
||||||
|
var fk flowKey
|
||||||
|
binary.BigEndian.PutUint32(fk.src[12:16], srcLow)
|
||||||
|
binary.BigEndian.PutUint32(fk.dst[12:16], dstLow)
|
||||||
|
fk.sport = sport
|
||||||
|
fk.dport = dport
|
||||||
|
return fk
|
||||||
|
}
|
||||||
|
|
||||||
|
// flowKeyCases are the workload mixes flowKeyCompare sees in practice.
|
||||||
|
// - sameFlow: equal keys; tests the equal-path cost (sort runs hit this
|
||||||
|
// repeatedly when many segments share a flow).
|
||||||
|
// - sportDiffers: same src/dst/dport, different sport — the typical
|
||||||
|
// "sibling flows from one host to one server" pattern.
|
||||||
|
// - dstDiffers: same src/sport/dport, different dst — outbound to many
|
||||||
|
// servers from a fixed local port.
|
||||||
|
// - allDiffer: every field differs; worst case for short-circuiting.
|
||||||
|
func flowKeyCases() map[string][]flowKeyPair {
|
||||||
|
const n = 64
|
||||||
|
cases := map[string][]flowKeyPair{
|
||||||
|
"sameFlow": make([]flowKeyPair, n),
|
||||||
|
"sportDiffers": make([]flowKeyPair, n),
|
||||||
|
"dstDiffers": make([]flowKeyPair, n),
|
||||||
|
"allDiffer": make([]flowKeyPair, n),
|
||||||
|
}
|
||||||
|
for i := range n {
|
||||||
|
base := makeFlowKey(0x0a000001, 0x0a000002, 40000, 443)
|
||||||
|
cases["sameFlow"][i] = flowKeyPair{a: base, b: base}
|
||||||
|
cases["sportDiffers"][i] = flowKeyPair{
|
||||||
|
a: base,
|
||||||
|
b: makeFlowKey(0x0a000001, 0x0a000002, uint16(40001+i), 443),
|
||||||
|
}
|
||||||
|
cases["dstDiffers"][i] = flowKeyPair{
|
||||||
|
a: base,
|
||||||
|
b: makeFlowKey(0x0a000001, uint32(0x0a000002+i+1), 40000, 443),
|
||||||
|
}
|
||||||
|
cases["allDiffer"][i] = flowKeyPair{
|
||||||
|
a: makeFlowKey(uint32(0x0a000001+i), uint32(0x0a000002+i), uint16(40000+i), uint16(80+i)),
|
||||||
|
b: makeFlowKey(uint32(0x0b000001+i), uint32(0x0b000002+i), uint16(50000+i), uint16(443+i)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cases
|
||||||
|
}
|
||||||
|
|
||||||
|
// BenchmarkFlowKeyCompare measures flowKeyCompare across the workloads
|
||||||
|
// the sort step actually sees. Use this to compare reorderings.
|
||||||
|
func BenchmarkFlowKeyCompare(b *testing.B) {
|
||||||
|
for name, pairs := range flowKeyCases() {
|
||||||
|
b.Run(name, func(b *testing.B) {
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.ResetTimer()
|
||||||
|
var sink int
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
p := pairs[i&(len(pairs)-1)]
|
||||||
|
sink += flowKeyCompare(p.a, p.b)
|
||||||
|
}
|
||||||
|
runtime.KeepAlive(sink)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -1020,3 +1020,31 @@ func TestCoalescerIPv6MergesCEMark(t *testing.T) {
|
|||||||
t.Errorf("seed v6 ECN=0x%02x want CE 0x%02x", got, ecnCE)
|
t.Errorf("seed v6 ECN=0x%02x want CE 0x%02x", got, ecnCE)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSortRunZeroAllocs(t *testing.T) {
|
||||||
|
c := &TCPCoalescer{}
|
||||||
|
mk := func(srcByte byte, seq uint32, pay int) *coalesceSlot {
|
||||||
|
s := &coalesceSlot{nextSeq: seq + uint32(pay), totalPay: pay}
|
||||||
|
s.fk.src[0] = srcByte
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
run := []*coalesceSlot{
|
||||||
|
mk(3, 5000, 100),
|
||||||
|
mk(1, 1000, 50),
|
||||||
|
mk(2, 2000, 75),
|
||||||
|
mk(1, 900, 50),
|
||||||
|
mk(3, 4900, 100),
|
||||||
|
mk(2, 1925, 75),
|
||||||
|
mk(1, 1050, 50),
|
||||||
|
mk(3, 5100, 100),
|
||||||
|
}
|
||||||
|
|
||||||
|
allocs := testing.AllocsPerRun(100, func() {
|
||||||
|
// Re-shuffle so each run actually does sorting work.
|
||||||
|
run[0], run[1], run[2], run[3] = run[3], run[2], run[1], run[0]
|
||||||
|
c.sortRun(run)
|
||||||
|
})
|
||||||
|
if allocs != 0 {
|
||||||
|
t.Fatalf("sortRun allocates %v times per run; want 0", allocs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user