diff --git a/overlay/batch/multi_coalesce.go b/overlay/batch/multi_coalesce.go index f6a8316c..8f187549 100644 --- a/overlay/batch/multi_coalesce.go +++ b/overlay/batch/multi_coalesce.go @@ -3,6 +3,7 @@ package batch import ( "errors" "io" + "log/slog" ) // MultiCoalescer fans plaintext packets out to lane-specific batchers based @@ -36,13 +37,13 @@ type MultiCoalescer struct { // opt out of TCP coalescing (e.g. when the queue can't do TSO); udpEnabled // likewise gates UDP coalescing (only enable when USO was negotiated). // Either lane disabled redirects its traffic into the passthrough lane. -func NewMultiCoalescer(w io.Writer, tcpEnabled, udpEnabled bool) *MultiCoalescer { +func NewMultiCoalescer(w io.Writer, l *slog.Logger, tcpEnabled, udpEnabled bool) *MultiCoalescer { m := &MultiCoalescer{ pt: NewPassthrough(w), backing: make([]byte, 0, initialSlots*65535), } if tcpEnabled { - m.tcp = NewTCPCoalescer(w) + m.tcp = NewTCPCoalescer(w, l) } if udpEnabled { m.udp = NewUDPCoalescer(w) diff --git a/overlay/batch/multi_coalesce_test.go b/overlay/batch/multi_coalesce_test.go index 9d718ecf..655cb708 100644 --- a/overlay/batch/multi_coalesce_test.go +++ b/overlay/batch/multi_coalesce_test.go @@ -2,6 +2,8 @@ package batch import ( "testing" + + "github.com/slackhq/nebula/test" ) // TestMultiCoalescerRoutesByProto confirms TCP/UDP/other land in the right @@ -9,7 +11,7 @@ import ( // else (ICMP here) falls through to plain Write. func TestMultiCoalescerRoutesByProto(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - m := NewMultiCoalescer(w, true, true) + m := NewMultiCoalescer(w, test.NewLogger(), true, true) tcpPay := make([]byte, 1200) udpPay := make([]byte, 1200) @@ -51,7 +53,7 @@ func TestMultiCoalescerRoutesByProto(t *testing.T) { // the kernel via the passthrough lane rather than being lost. func TestMultiCoalescerDisabledUDPFallsThrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - m := NewMultiCoalescer(w, true, false) // TSO on, USO off + m := NewMultiCoalescer(w, test.NewLogger(), true, false) // TSO on, USO off if err := m.Commit(buildUDPv4(1000, 53, make([]byte, 800))); err != nil { t.Fatal(err) @@ -73,7 +75,7 @@ func TestMultiCoalescerDisabledUDPFallsThrough(t *testing.T) { // TestMultiCoalescerDisabledTCPFallsThrough mirrors the TSO=off case. func TestMultiCoalescerDisabledTCPFallsThrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - m := NewMultiCoalescer(w, false, true) // TSO off, USO on + m := NewMultiCoalescer(w, test.NewLogger(), false, true) // TSO off, USO on pay := make([]byte, 1200) if err := m.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { diff --git a/overlay/batch/tcp_coalesce.go b/overlay/batch/tcp_coalesce.go index 59d46bd5..c85e6190 100644 --- a/overlay/batch/tcp_coalesce.go +++ b/overlay/batch/tcp_coalesce.go @@ -2,6 +2,7 @@ package batch import ( "bytes" + "context" "encoding/binary" "io" "log/slog" @@ -84,15 +85,17 @@ type TCPCoalescer struct { pool []*coalesceSlot // free list for reuse backing []byte + l *slog.Logger } -func NewTCPCoalescer(w io.Writer) *TCPCoalescer { +func NewTCPCoalescer(w io.Writer, l *slog.Logger) *TCPCoalescer { c := &TCPCoalescer{ plainW: w, slots: make([]*coalesceSlot, 0, initialSlots), openSlots: make(map[flowKey]*coalesceSlot, initialSlots), pool: make([]*coalesceSlot, 0, initialSlots), backing: make([]byte, 0, initialSlots*65535), + l: l, } if gw, ok := tio.SupportsGSO(w, tio.GSOProtoTCP); ok { c.gsoW = gw @@ -488,22 +491,25 @@ func (c *TCPCoalescer) reorderForFlush() { // the operator can quantify how often it happens; the data // itself still emits in seq order, kernel TCP handles the // gap via its OOO queue. - if prev.nextSeq != slotSeedSeq(s) { - logged = true - gap := int64(slotSeedSeq(s)) - int64(prev.nextSeq) - slog.Default().Warn("tcp coalesce: cross-slot seq gap", - "src", flowKeyAddr(s.fk, false), - "dst", flowKeyAddr(s.fk, true), - "sport", s.fk.sport, - "dport", s.fk.dport, - "prev_seed_seq", slotSeedSeq(prev), - "prev_next_seq", prev.nextSeq, - "this_seed_seq", slotSeedSeq(s), - "gap_bytes", gap, - "prev_seg_count", prev.numSeg, - "prev_total_pay", prev.totalPay, - ) + if c.l.Enabled(context.Background(), slog.LevelDebug) { + if prev.nextSeq != slotSeedSeq(s) { + logged = true + gap := int64(slotSeedSeq(s)) - int64(prev.nextSeq) + c.l.Debug("tcp coalesce: cross-slot seq gap", + "src", flowKeyAddr(s.fk, false), + "dst", flowKeyAddr(s.fk, true), + "sport", s.fk.sport, + "dport", s.fk.dport, + "prev_seed_seq", slotSeedSeq(prev), + "prev_next_seq", prev.nextSeq, + "this_seed_seq", slotSeedSeq(s), + "gap_bytes", gap, + "prev_seg_count", prev.numSeg, + "prev_total_pay", prev.totalPay, + ) + } } + if canMergeSlots(prev, s) { mergeSlots(prev, s) c.release(s) @@ -514,7 +520,7 @@ func (c *TCPCoalescer) reorderForFlush() { out = append(out, s) } if logged { - slog.Default().Warn("==== end of batch ====") + c.l.Warn("==== end of batch ====") } c.slots = out } diff --git a/overlay/batch/tcp_coalesce_bench_test.go b/overlay/batch/tcp_coalesce_bench_test.go index 41aafebd..833a11e3 100644 --- a/overlay/batch/tcp_coalesce_bench_test.go +++ b/overlay/batch/tcp_coalesce_bench_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/slackhq/nebula/overlay/tio" + "github.com/slackhq/nebula/test" ) // nopTunWriter is a zero-alloc tio.GSOWriter for benchmarks. Discards @@ -70,7 +71,7 @@ func buildICMPv4() []byte { // between batches, and reports per-packet cost. func runCommitBench(b *testing.B, pkts [][]byte, batchSize int) { b.Helper() - c := NewTCPCoalescer(nopTunWriter{}) + c := NewTCPCoalescer(nopTunWriter{}, test.NewLogger()) b.ReportAllocs() b.SetBytes(int64(len(pkts[0]))) b.ResetTimer() @@ -139,7 +140,7 @@ func BenchmarkCommitNonCoalesceableTCP(b *testing.B) { // is the bench that shows the savings of skipping the lane's re-parse. func runMultiCommitBench(b *testing.B, pkts [][]byte, batchSize int) { b.Helper() - m := NewMultiCoalescer(nopTunWriter{}, true, true) + m := NewMultiCoalescer(nopTunWriter{}, test.NewLogger(), true, true) b.ReportAllocs() b.SetBytes(int64(len(pkts[0]))) b.ResetTimer() diff --git a/overlay/batch/tcp_coalesce_test.go b/overlay/batch/tcp_coalesce_test.go index a6179983..1687653e 100644 --- a/overlay/batch/tcp_coalesce_test.go +++ b/overlay/batch/tcp_coalesce_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/slackhq/nebula/overlay/tio" + "github.com/slackhq/nebula/test" ) // fakeTunWriter records plain Writes and WriteGSO calls without touching a @@ -127,7 +128,7 @@ const ( func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) { w := &fakeTunWriter{gsoEnabled: false} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pkt := buildTCPv4(1000, tcpAck, []byte("hello")) if err := c.Commit(pkt); err != nil { t.Fatal(err) @@ -146,7 +147,7 @@ func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) { func TestCoalescerNonTCPPassthrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pkt := make([]byte, 28) pkt[0] = 0x45 binary.BigEndian.PutUint16(pkt[2:4], 28) @@ -166,7 +167,7 @@ func TestCoalescerNonTCPPassthrough(t *testing.T) { func TestCoalescerSeedThenFlushAlone(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pkt := buildTCPv4(1000, tcpAck, make([]byte, 1000)) if err := c.Commit(pkt); err != nil { t.Fatal(err) @@ -193,7 +194,7 @@ func TestCoalescerSeedThenFlushAlone(t *testing.T) { func TestCoalescerCoalescesAdjacentACKs(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -233,7 +234,7 @@ func TestCoalescerCoalescesAdjacentACKs(t *testing.T) { func TestCoalescerRejectsSeqGap(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -252,7 +253,7 @@ func TestCoalescerRejectsSeqGap(t *testing.T) { func TestCoalescerRejectsFlagMismatch(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -273,7 +274,7 @@ func TestCoalescerRejectsFlagMismatch(t *testing.T) { func TestCoalescerRejectsFIN(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) fin := buildTCPv4(1000, tcpAck|tcpFin, []byte("x")) if err := c.Commit(fin); err != nil { t.Fatal(err) @@ -289,7 +290,7 @@ func TestCoalescerRejectsFIN(t *testing.T) { func TestCoalescerShortLastSegmentClosesChain(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) full := make([]byte, 1200) half := make([]byte, 500) if err := c.Commit(buildTCPv4(1000, tcpAck, full)); err != nil { @@ -324,7 +325,7 @@ func TestCoalescerShortLastSegmentClosesChain(t *testing.T) { func TestCoalescerPSHFinalizesChain(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -354,7 +355,7 @@ func TestCoalescerPSHFinalizesChain(t *testing.T) { // coalescer drops it the sender's push signal never reaches the receiver. func TestCoalescerPropagatesPSHFromAppended(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) // Seed has no PSH; second segment carries PSH and seals the chain. if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { @@ -382,7 +383,7 @@ func TestCoalescerPropagatesPSHFromAppended(t *testing.T) { func TestCoalescerRejectsDifferentFlow(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) p1 := buildTCPv4(1000, tcpAck, pay) p2 := buildTCPv4(2200, tcpAck, pay) @@ -404,7 +405,7 @@ func TestCoalescerRejectsDifferentFlow(t *testing.T) { func TestCoalescerRejectsIPOptions(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 500) pkt := buildTCPv4(1000, tcpAck, pay) // Bump IHL to 6 to simulate 4 bytes of IP options. Don't actually add @@ -424,7 +425,7 @@ func TestCoalescerRejectsIPOptions(t *testing.T) { func TestCoalescerCapBySegments(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 512) seq := uint32(1000) for i := 0; i < tcpCoalesceMaxSegs+5; i++ { @@ -448,7 +449,7 @@ func TestCoalescerCapBySegments(t *testing.T) { // flows coalesce independently in a single Flush. func TestCoalescerMultipleFlowsInSameBatch(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) // Flow A: sport 1000. Flow B: sport 3000. @@ -505,7 +506,7 @@ func TestCoalescerMultipleFlowsInSameBatch(t *testing.T) { // writing passthrough packets synchronously. func TestCoalescerPreservesArrivalOrder(t *testing.T) { w := &orderedFakeWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) // Sequence: coalesceable TCP, ICMP (passthrough), coalesceable TCP on // a different flow. Expected emit order: gso(X), plain(ICMP), gso(Y). pay := make([]byte, 1200) @@ -573,7 +574,7 @@ func stringSliceEq(a, b []string) bool { // packet (SYN) mid-flow only flushes its own flow, not others. func TestCoalescerInterleavedFlowsPreserveOrdering(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) // Flow A two segments. @@ -678,7 +679,7 @@ func buildTCPv6(tcLow byte, seq uint32, flags byte, payload []byte) []byte { // retains ECE on the wire. func TestCoalescerCoalescesEceFlow(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) flags := byte(tcpAck | tcpEce) if err := c.Commit(buildTCPv4(1000, flags, pay)); err != nil { @@ -707,7 +708,7 @@ func TestCoalescerCoalescesEceFlow(t *testing.T) { // in-flow segment seeds a new slot rather than extending the prior burst. func TestCoalescerCwrSealsFlow(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -740,7 +741,7 @@ func TestCoalescerCwrSealsFlow(t *testing.T) { // a CE-echoing window or none. func TestCoalescerEceMismatchReseeds(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck|tcpEce, pay)); err != nil { t.Fatal(err) @@ -765,7 +766,7 @@ func TestCoalescerEceMismatchReseeds(t *testing.T) { // CE-marked packet still coalesces, and the merged superpacket carries CE. func TestCoalescerMergesCEMark(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4WithToS(ecnECT0, 1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -796,7 +797,7 @@ func TestCoalescerMergesCEMark(t *testing.T) { // headersMatch did not also relax DSCP — different DSCP must still split. func TestCoalescerDscpMismatchReseeds(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) // Same ECN (Not-ECT), different DSCP (0x10 vs 0x20 in upper 6 bits). tosA := byte(0x10<<2) | ecnNotECT @@ -819,7 +820,7 @@ func TestCoalescerDscpMismatchReseeds(t *testing.T) { // TestCoalescerCoalescesEceFlow. func TestCoalescerIPv6CoalescesEceFlow(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) flags := byte(tcpAck | tcpEce) if err := c.Commit(buildTCPv6(0, 1000, flags, pay)); err != nil { @@ -850,7 +851,7 @@ func TestCoalescerIPv6CoalescesEceFlow(t *testing.T) { // seen had the wire never reordered. func TestCoalescerSortsReorderedSeedsAndMerges(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) // Arrival order: seq 1000, 3400, 2200. The 3400 seeds a separate slot // because 3400 != nextSeq=2200, then 2200 fails to extend the 3400 slot @@ -886,7 +887,7 @@ func TestCoalescerSortsReorderedSeedsAndMerges(t *testing.T) { // without any cross-flow contamination. func TestCoalescerSortAcrossFlowsMergesEachIndependently(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) // Flow A (sport 1000) seq 100, 1300; flow B (sport 3000) seq 500, 1700. // Arrival: A.1300, B.1700, A.100, B.500 — every flow reordered. @@ -937,7 +938,7 @@ func TestCoalescerSortAcrossFlowsMergesEachIndependently(t *testing.T) { // boundary by an arbitrary number of segments. func TestCoalescerSortKeepsPSHBoundary(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) // Seq 1000 (no PSH) + 2200 (PSH) → seal one slot with PSH set. // Seq 3400 (no PSH) is contiguous to 3400 from seq 2200+1200; without @@ -965,7 +966,7 @@ func TestCoalescerSortKeepsPSHBoundary(t *testing.T) { // is sorted/merged independently. func TestCoalescerSortKeepsPassthroughBarrier(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) // First two segments seed S1 (then a 3400 reorder seeds S2). if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { @@ -998,7 +999,7 @@ func TestCoalescerSortKeepsPassthroughBarrier(t *testing.T) { // TestCoalescerMergesCEMark. ECN bits live in TC[1:0] = byte 1 mask 0x30. func TestCoalescerIPv6MergesCEMark(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w) + c := NewTCPCoalescer(w, test.NewLogger()) pay := make([]byte, 1200) // tcLow is the low 4 bits of TC; ECN occupies the bottom 2 of those. if err := c.Commit(buildTCPv6(ecnECT0, 1000, tcpAck, pay)); err != nil { diff --git a/overlay/batch/tx_batch.go b/overlay/batch/tx_batch.go index 0b12b67e..38f86b25 100644 --- a/overlay/batch/tx_batch.go +++ b/overlay/batch/tx_batch.go @@ -22,13 +22,14 @@ type SendBatch struct { backing []byte } -func NewSendBatch(out batchWriter, batchCap, slotCap int) *SendBatch { +// NewSendBatch makes a SendBatch with batchCap slots and an arenaSize byte buffer for slices to back those slots +func NewSendBatch(out batchWriter, batchCap, arenaSize int) *SendBatch { return &SendBatch{ out: out, bufs: make([][]byte, 0, batchCap), dsts: make([]netip.AddrPort, 0, batchCap), ecns: make([]byte, 0, batchCap), - backing: make([]byte, 0, batchCap*slotCap), + backing: make([]byte, 0, arenaSize), } }