mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-16 04:47:38 +02:00
drop in a logger
This commit is contained in:
@@ -3,6 +3,7 @@ package batch
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
|
"log/slog"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MultiCoalescer fans plaintext packets out to lane-specific batchers based
|
// MultiCoalescer fans plaintext packets out to lane-specific batchers based
|
||||||
@@ -36,13 +37,13 @@ type MultiCoalescer struct {
|
|||||||
// opt out of TCP coalescing (e.g. when the queue can't do TSO); udpEnabled
|
// opt out of TCP coalescing (e.g. when the queue can't do TSO); udpEnabled
|
||||||
// likewise gates UDP coalescing (only enable when USO was negotiated).
|
// likewise gates UDP coalescing (only enable when USO was negotiated).
|
||||||
// Either lane disabled redirects its traffic into the passthrough lane.
|
// Either lane disabled redirects its traffic into the passthrough lane.
|
||||||
func NewMultiCoalescer(w io.Writer, tcpEnabled, udpEnabled bool) *MultiCoalescer {
|
func NewMultiCoalescer(w io.Writer, l *slog.Logger, tcpEnabled, udpEnabled bool) *MultiCoalescer {
|
||||||
m := &MultiCoalescer{
|
m := &MultiCoalescer{
|
||||||
pt: NewPassthrough(w),
|
pt: NewPassthrough(w),
|
||||||
backing: make([]byte, 0, initialSlots*65535),
|
backing: make([]byte, 0, initialSlots*65535),
|
||||||
}
|
}
|
||||||
if tcpEnabled {
|
if tcpEnabled {
|
||||||
m.tcp = NewTCPCoalescer(w)
|
m.tcp = NewTCPCoalescer(w, l)
|
||||||
}
|
}
|
||||||
if udpEnabled {
|
if udpEnabled {
|
||||||
m.udp = NewUDPCoalescer(w)
|
m.udp = NewUDPCoalescer(w)
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ package batch
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/slackhq/nebula/test"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestMultiCoalescerRoutesByProto confirms TCP/UDP/other land in the right
|
// TestMultiCoalescerRoutesByProto confirms TCP/UDP/other land in the right
|
||||||
@@ -9,7 +11,7 @@ import (
|
|||||||
// else (ICMP here) falls through to plain Write.
|
// else (ICMP here) falls through to plain Write.
|
||||||
func TestMultiCoalescerRoutesByProto(t *testing.T) {
|
func TestMultiCoalescerRoutesByProto(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
m := NewMultiCoalescer(w, true, true)
|
m := NewMultiCoalescer(w, test.NewLogger(), true, true)
|
||||||
|
|
||||||
tcpPay := make([]byte, 1200)
|
tcpPay := make([]byte, 1200)
|
||||||
udpPay := make([]byte, 1200)
|
udpPay := make([]byte, 1200)
|
||||||
@@ -51,7 +53,7 @@ func TestMultiCoalescerRoutesByProto(t *testing.T) {
|
|||||||
// the kernel via the passthrough lane rather than being lost.
|
// the kernel via the passthrough lane rather than being lost.
|
||||||
func TestMultiCoalescerDisabledUDPFallsThrough(t *testing.T) {
|
func TestMultiCoalescerDisabledUDPFallsThrough(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
m := NewMultiCoalescer(w, true, false) // TSO on, USO off
|
m := NewMultiCoalescer(w, test.NewLogger(), true, false) // TSO on, USO off
|
||||||
|
|
||||||
if err := m.Commit(buildUDPv4(1000, 53, make([]byte, 800))); err != nil {
|
if err := m.Commit(buildUDPv4(1000, 53, make([]byte, 800))); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -73,7 +75,7 @@ func TestMultiCoalescerDisabledUDPFallsThrough(t *testing.T) {
|
|||||||
// TestMultiCoalescerDisabledTCPFallsThrough mirrors the TSO=off case.
|
// TestMultiCoalescerDisabledTCPFallsThrough mirrors the TSO=off case.
|
||||||
func TestMultiCoalescerDisabledTCPFallsThrough(t *testing.T) {
|
func TestMultiCoalescerDisabledTCPFallsThrough(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
m := NewMultiCoalescer(w, false, true) // TSO off, USO on
|
m := NewMultiCoalescer(w, test.NewLogger(), false, true) // TSO off, USO on
|
||||||
|
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := m.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := m.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package batch
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
@@ -84,15 +85,17 @@ type TCPCoalescer struct {
|
|||||||
pool []*coalesceSlot // free list for reuse
|
pool []*coalesceSlot // free list for reuse
|
||||||
|
|
||||||
backing []byte
|
backing []byte
|
||||||
|
l *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTCPCoalescer(w io.Writer) *TCPCoalescer {
|
func NewTCPCoalescer(w io.Writer, l *slog.Logger) *TCPCoalescer {
|
||||||
c := &TCPCoalescer{
|
c := &TCPCoalescer{
|
||||||
plainW: w,
|
plainW: w,
|
||||||
slots: make([]*coalesceSlot, 0, initialSlots),
|
slots: make([]*coalesceSlot, 0, initialSlots),
|
||||||
openSlots: make(map[flowKey]*coalesceSlot, initialSlots),
|
openSlots: make(map[flowKey]*coalesceSlot, initialSlots),
|
||||||
pool: make([]*coalesceSlot, 0, initialSlots),
|
pool: make([]*coalesceSlot, 0, initialSlots),
|
||||||
backing: make([]byte, 0, initialSlots*65535),
|
backing: make([]byte, 0, initialSlots*65535),
|
||||||
|
l: l,
|
||||||
}
|
}
|
||||||
if gw, ok := tio.SupportsGSO(w, tio.GSOProtoTCP); ok {
|
if gw, ok := tio.SupportsGSO(w, tio.GSOProtoTCP); ok {
|
||||||
c.gsoW = gw
|
c.gsoW = gw
|
||||||
@@ -488,22 +491,25 @@ func (c *TCPCoalescer) reorderForFlush() {
|
|||||||
// the operator can quantify how often it happens; the data
|
// the operator can quantify how often it happens; the data
|
||||||
// itself still emits in seq order, kernel TCP handles the
|
// itself still emits in seq order, kernel TCP handles the
|
||||||
// gap via its OOO queue.
|
// gap via its OOO queue.
|
||||||
if prev.nextSeq != slotSeedSeq(s) {
|
if c.l.Enabled(context.Background(), slog.LevelDebug) {
|
||||||
logged = true
|
if prev.nextSeq != slotSeedSeq(s) {
|
||||||
gap := int64(slotSeedSeq(s)) - int64(prev.nextSeq)
|
logged = true
|
||||||
slog.Default().Warn("tcp coalesce: cross-slot seq gap",
|
gap := int64(slotSeedSeq(s)) - int64(prev.nextSeq)
|
||||||
"src", flowKeyAddr(s.fk, false),
|
c.l.Debug("tcp coalesce: cross-slot seq gap",
|
||||||
"dst", flowKeyAddr(s.fk, true),
|
"src", flowKeyAddr(s.fk, false),
|
||||||
"sport", s.fk.sport,
|
"dst", flowKeyAddr(s.fk, true),
|
||||||
"dport", s.fk.dport,
|
"sport", s.fk.sport,
|
||||||
"prev_seed_seq", slotSeedSeq(prev),
|
"dport", s.fk.dport,
|
||||||
"prev_next_seq", prev.nextSeq,
|
"prev_seed_seq", slotSeedSeq(prev),
|
||||||
"this_seed_seq", slotSeedSeq(s),
|
"prev_next_seq", prev.nextSeq,
|
||||||
"gap_bytes", gap,
|
"this_seed_seq", slotSeedSeq(s),
|
||||||
"prev_seg_count", prev.numSeg,
|
"gap_bytes", gap,
|
||||||
"prev_total_pay", prev.totalPay,
|
"prev_seg_count", prev.numSeg,
|
||||||
)
|
"prev_total_pay", prev.totalPay,
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if canMergeSlots(prev, s) {
|
if canMergeSlots(prev, s) {
|
||||||
mergeSlots(prev, s)
|
mergeSlots(prev, s)
|
||||||
c.release(s)
|
c.release(s)
|
||||||
@@ -514,7 +520,7 @@ func (c *TCPCoalescer) reorderForFlush() {
|
|||||||
out = append(out, s)
|
out = append(out, s)
|
||||||
}
|
}
|
||||||
if logged {
|
if logged {
|
||||||
slog.Default().Warn("==== end of batch ====")
|
c.l.Warn("==== end of batch ====")
|
||||||
}
|
}
|
||||||
c.slots = out
|
c.slots = out
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/slackhq/nebula/overlay/tio"
|
"github.com/slackhq/nebula/overlay/tio"
|
||||||
|
"github.com/slackhq/nebula/test"
|
||||||
)
|
)
|
||||||
|
|
||||||
// nopTunWriter is a zero-alloc tio.GSOWriter for benchmarks. Discards
|
// nopTunWriter is a zero-alloc tio.GSOWriter for benchmarks. Discards
|
||||||
@@ -70,7 +71,7 @@ func buildICMPv4() []byte {
|
|||||||
// between batches, and reports per-packet cost.
|
// between batches, and reports per-packet cost.
|
||||||
func runCommitBench(b *testing.B, pkts [][]byte, batchSize int) {
|
func runCommitBench(b *testing.B, pkts [][]byte, batchSize int) {
|
||||||
b.Helper()
|
b.Helper()
|
||||||
c := NewTCPCoalescer(nopTunWriter{})
|
c := NewTCPCoalescer(nopTunWriter{}, test.NewLogger())
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.SetBytes(int64(len(pkts[0])))
|
b.SetBytes(int64(len(pkts[0])))
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
@@ -139,7 +140,7 @@ func BenchmarkCommitNonCoalesceableTCP(b *testing.B) {
|
|||||||
// is the bench that shows the savings of skipping the lane's re-parse.
|
// is the bench that shows the savings of skipping the lane's re-parse.
|
||||||
func runMultiCommitBench(b *testing.B, pkts [][]byte, batchSize int) {
|
func runMultiCommitBench(b *testing.B, pkts [][]byte, batchSize int) {
|
||||||
b.Helper()
|
b.Helper()
|
||||||
m := NewMultiCoalescer(nopTunWriter{}, true, true)
|
m := NewMultiCoalescer(nopTunWriter{}, test.NewLogger(), true, true)
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.SetBytes(int64(len(pkts[0])))
|
b.SetBytes(int64(len(pkts[0])))
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/slackhq/nebula/overlay/tio"
|
"github.com/slackhq/nebula/overlay/tio"
|
||||||
|
"github.com/slackhq/nebula/test"
|
||||||
)
|
)
|
||||||
|
|
||||||
// fakeTunWriter records plain Writes and WriteGSO calls without touching a
|
// fakeTunWriter records plain Writes and WriteGSO calls without touching a
|
||||||
@@ -127,7 +128,7 @@ const (
|
|||||||
|
|
||||||
func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) {
|
func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: false}
|
w := &fakeTunWriter{gsoEnabled: false}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pkt := buildTCPv4(1000, tcpAck, []byte("hello"))
|
pkt := buildTCPv4(1000, tcpAck, []byte("hello"))
|
||||||
if err := c.Commit(pkt); err != nil {
|
if err := c.Commit(pkt); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -146,7 +147,7 @@ func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerNonTCPPassthrough(t *testing.T) {
|
func TestCoalescerNonTCPPassthrough(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pkt := make([]byte, 28)
|
pkt := make([]byte, 28)
|
||||||
pkt[0] = 0x45
|
pkt[0] = 0x45
|
||||||
binary.BigEndian.PutUint16(pkt[2:4], 28)
|
binary.BigEndian.PutUint16(pkt[2:4], 28)
|
||||||
@@ -166,7 +167,7 @@ func TestCoalescerNonTCPPassthrough(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerSeedThenFlushAlone(t *testing.T) {
|
func TestCoalescerSeedThenFlushAlone(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pkt := buildTCPv4(1000, tcpAck, make([]byte, 1000))
|
pkt := buildTCPv4(1000, tcpAck, make([]byte, 1000))
|
||||||
if err := c.Commit(pkt); err != nil {
|
if err := c.Commit(pkt); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -193,7 +194,7 @@ func TestCoalescerSeedThenFlushAlone(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerCoalescesAdjacentACKs(t *testing.T) {
|
func TestCoalescerCoalescesAdjacentACKs(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -233,7 +234,7 @@ func TestCoalescerCoalescesAdjacentACKs(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerRejectsSeqGap(t *testing.T) {
|
func TestCoalescerRejectsSeqGap(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -252,7 +253,7 @@ func TestCoalescerRejectsSeqGap(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerRejectsFlagMismatch(t *testing.T) {
|
func TestCoalescerRejectsFlagMismatch(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -273,7 +274,7 @@ func TestCoalescerRejectsFlagMismatch(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerRejectsFIN(t *testing.T) {
|
func TestCoalescerRejectsFIN(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
fin := buildTCPv4(1000, tcpAck|tcpFin, []byte("x"))
|
fin := buildTCPv4(1000, tcpAck|tcpFin, []byte("x"))
|
||||||
if err := c.Commit(fin); err != nil {
|
if err := c.Commit(fin); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -289,7 +290,7 @@ func TestCoalescerRejectsFIN(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerShortLastSegmentClosesChain(t *testing.T) {
|
func TestCoalescerShortLastSegmentClosesChain(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
full := make([]byte, 1200)
|
full := make([]byte, 1200)
|
||||||
half := make([]byte, 500)
|
half := make([]byte, 500)
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, full)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, full)); err != nil {
|
||||||
@@ -324,7 +325,7 @@ func TestCoalescerShortLastSegmentClosesChain(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerPSHFinalizesChain(t *testing.T) {
|
func TestCoalescerPSHFinalizesChain(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -354,7 +355,7 @@ func TestCoalescerPSHFinalizesChain(t *testing.T) {
|
|||||||
// coalescer drops it the sender's push signal never reaches the receiver.
|
// coalescer drops it the sender's push signal never reaches the receiver.
|
||||||
func TestCoalescerPropagatesPSHFromAppended(t *testing.T) {
|
func TestCoalescerPropagatesPSHFromAppended(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
// Seed has no PSH; second segment carries PSH and seals the chain.
|
// Seed has no PSH; second segment carries PSH and seals the chain.
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
@@ -382,7 +383,7 @@ func TestCoalescerPropagatesPSHFromAppended(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerRejectsDifferentFlow(t *testing.T) {
|
func TestCoalescerRejectsDifferentFlow(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
p1 := buildTCPv4(1000, tcpAck, pay)
|
p1 := buildTCPv4(1000, tcpAck, pay)
|
||||||
p2 := buildTCPv4(2200, tcpAck, pay)
|
p2 := buildTCPv4(2200, tcpAck, pay)
|
||||||
@@ -404,7 +405,7 @@ func TestCoalescerRejectsDifferentFlow(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerRejectsIPOptions(t *testing.T) {
|
func TestCoalescerRejectsIPOptions(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 500)
|
pay := make([]byte, 500)
|
||||||
pkt := buildTCPv4(1000, tcpAck, pay)
|
pkt := buildTCPv4(1000, tcpAck, pay)
|
||||||
// Bump IHL to 6 to simulate 4 bytes of IP options. Don't actually add
|
// Bump IHL to 6 to simulate 4 bytes of IP options. Don't actually add
|
||||||
@@ -424,7 +425,7 @@ func TestCoalescerRejectsIPOptions(t *testing.T) {
|
|||||||
|
|
||||||
func TestCoalescerCapBySegments(t *testing.T) {
|
func TestCoalescerCapBySegments(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 512)
|
pay := make([]byte, 512)
|
||||||
seq := uint32(1000)
|
seq := uint32(1000)
|
||||||
for i := 0; i < tcpCoalesceMaxSegs+5; i++ {
|
for i := 0; i < tcpCoalesceMaxSegs+5; i++ {
|
||||||
@@ -448,7 +449,7 @@ func TestCoalescerCapBySegments(t *testing.T) {
|
|||||||
// flows coalesce independently in a single Flush.
|
// flows coalesce independently in a single Flush.
|
||||||
func TestCoalescerMultipleFlowsInSameBatch(t *testing.T) {
|
func TestCoalescerMultipleFlowsInSameBatch(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
|
|
||||||
// Flow A: sport 1000. Flow B: sport 3000.
|
// Flow A: sport 1000. Flow B: sport 3000.
|
||||||
@@ -505,7 +506,7 @@ func TestCoalescerMultipleFlowsInSameBatch(t *testing.T) {
|
|||||||
// writing passthrough packets synchronously.
|
// writing passthrough packets synchronously.
|
||||||
func TestCoalescerPreservesArrivalOrder(t *testing.T) {
|
func TestCoalescerPreservesArrivalOrder(t *testing.T) {
|
||||||
w := &orderedFakeWriter{gsoEnabled: true}
|
w := &orderedFakeWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
// Sequence: coalesceable TCP, ICMP (passthrough), coalesceable TCP on
|
// Sequence: coalesceable TCP, ICMP (passthrough), coalesceable TCP on
|
||||||
// a different flow. Expected emit order: gso(X), plain(ICMP), gso(Y).
|
// a different flow. Expected emit order: gso(X), plain(ICMP), gso(Y).
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
@@ -573,7 +574,7 @@ func stringSliceEq(a, b []string) bool {
|
|||||||
// packet (SYN) mid-flow only flushes its own flow, not others.
|
// packet (SYN) mid-flow only flushes its own flow, not others.
|
||||||
func TestCoalescerInterleavedFlowsPreserveOrdering(t *testing.T) {
|
func TestCoalescerInterleavedFlowsPreserveOrdering(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
|
|
||||||
// Flow A two segments.
|
// Flow A two segments.
|
||||||
@@ -678,7 +679,7 @@ func buildTCPv6(tcLow byte, seq uint32, flags byte, payload []byte) []byte {
|
|||||||
// retains ECE on the wire.
|
// retains ECE on the wire.
|
||||||
func TestCoalescerCoalescesEceFlow(t *testing.T) {
|
func TestCoalescerCoalescesEceFlow(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
flags := byte(tcpAck | tcpEce)
|
flags := byte(tcpAck | tcpEce)
|
||||||
if err := c.Commit(buildTCPv4(1000, flags, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, flags, pay)); err != nil {
|
||||||
@@ -707,7 +708,7 @@ func TestCoalescerCoalescesEceFlow(t *testing.T) {
|
|||||||
// in-flow segment seeds a new slot rather than extending the prior burst.
|
// in-flow segment seeds a new slot rather than extending the prior burst.
|
||||||
func TestCoalescerCwrSealsFlow(t *testing.T) {
|
func TestCoalescerCwrSealsFlow(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -740,7 +741,7 @@ func TestCoalescerCwrSealsFlow(t *testing.T) {
|
|||||||
// a CE-echoing window or none.
|
// a CE-echoing window or none.
|
||||||
func TestCoalescerEceMismatchReseeds(t *testing.T) {
|
func TestCoalescerEceMismatchReseeds(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck|tcpEce, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck|tcpEce, pay)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -765,7 +766,7 @@ func TestCoalescerEceMismatchReseeds(t *testing.T) {
|
|||||||
// CE-marked packet still coalesces, and the merged superpacket carries CE.
|
// CE-marked packet still coalesces, and the merged superpacket carries CE.
|
||||||
func TestCoalescerMergesCEMark(t *testing.T) {
|
func TestCoalescerMergesCEMark(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
if err := c.Commit(buildTCPv4WithToS(ecnECT0, 1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4WithToS(ecnECT0, 1000, tcpAck, pay)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -796,7 +797,7 @@ func TestCoalescerMergesCEMark(t *testing.T) {
|
|||||||
// headersMatch did not also relax DSCP — different DSCP must still split.
|
// headersMatch did not also relax DSCP — different DSCP must still split.
|
||||||
func TestCoalescerDscpMismatchReseeds(t *testing.T) {
|
func TestCoalescerDscpMismatchReseeds(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
// Same ECN (Not-ECT), different DSCP (0x10 vs 0x20 in upper 6 bits).
|
// Same ECN (Not-ECT), different DSCP (0x10 vs 0x20 in upper 6 bits).
|
||||||
tosA := byte(0x10<<2) | ecnNotECT
|
tosA := byte(0x10<<2) | ecnNotECT
|
||||||
@@ -819,7 +820,7 @@ func TestCoalescerDscpMismatchReseeds(t *testing.T) {
|
|||||||
// TestCoalescerCoalescesEceFlow.
|
// TestCoalescerCoalescesEceFlow.
|
||||||
func TestCoalescerIPv6CoalescesEceFlow(t *testing.T) {
|
func TestCoalescerIPv6CoalescesEceFlow(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
flags := byte(tcpAck | tcpEce)
|
flags := byte(tcpAck | tcpEce)
|
||||||
if err := c.Commit(buildTCPv6(0, 1000, flags, pay)); err != nil {
|
if err := c.Commit(buildTCPv6(0, 1000, flags, pay)); err != nil {
|
||||||
@@ -850,7 +851,7 @@ func TestCoalescerIPv6CoalescesEceFlow(t *testing.T) {
|
|||||||
// seen had the wire never reordered.
|
// seen had the wire never reordered.
|
||||||
func TestCoalescerSortsReorderedSeedsAndMerges(t *testing.T) {
|
func TestCoalescerSortsReorderedSeedsAndMerges(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
// Arrival order: seq 1000, 3400, 2200. The 3400 seeds a separate slot
|
// Arrival order: seq 1000, 3400, 2200. The 3400 seeds a separate slot
|
||||||
// because 3400 != nextSeq=2200, then 2200 fails to extend the 3400 slot
|
// because 3400 != nextSeq=2200, then 2200 fails to extend the 3400 slot
|
||||||
@@ -886,7 +887,7 @@ func TestCoalescerSortsReorderedSeedsAndMerges(t *testing.T) {
|
|||||||
// without any cross-flow contamination.
|
// without any cross-flow contamination.
|
||||||
func TestCoalescerSortAcrossFlowsMergesEachIndependently(t *testing.T) {
|
func TestCoalescerSortAcrossFlowsMergesEachIndependently(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
// Flow A (sport 1000) seq 100, 1300; flow B (sport 3000) seq 500, 1700.
|
// Flow A (sport 1000) seq 100, 1300; flow B (sport 3000) seq 500, 1700.
|
||||||
// Arrival: A.1300, B.1700, A.100, B.500 — every flow reordered.
|
// Arrival: A.1300, B.1700, A.100, B.500 — every flow reordered.
|
||||||
@@ -937,7 +938,7 @@ func TestCoalescerSortAcrossFlowsMergesEachIndependently(t *testing.T) {
|
|||||||
// boundary by an arbitrary number of segments.
|
// boundary by an arbitrary number of segments.
|
||||||
func TestCoalescerSortKeepsPSHBoundary(t *testing.T) {
|
func TestCoalescerSortKeepsPSHBoundary(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
// Seq 1000 (no PSH) + 2200 (PSH) → seal one slot with PSH set.
|
// Seq 1000 (no PSH) + 2200 (PSH) → seal one slot with PSH set.
|
||||||
// Seq 3400 (no PSH) is contiguous to 3400 from seq 2200+1200; without
|
// Seq 3400 (no PSH) is contiguous to 3400 from seq 2200+1200; without
|
||||||
@@ -965,7 +966,7 @@ func TestCoalescerSortKeepsPSHBoundary(t *testing.T) {
|
|||||||
// is sorted/merged independently.
|
// is sorted/merged independently.
|
||||||
func TestCoalescerSortKeepsPassthroughBarrier(t *testing.T) {
|
func TestCoalescerSortKeepsPassthroughBarrier(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
// First two segments seed S1 (then a 3400 reorder seeds S2).
|
// First two segments seed S1 (then a 3400 reorder seeds S2).
|
||||||
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
|
||||||
@@ -998,7 +999,7 @@ func TestCoalescerSortKeepsPassthroughBarrier(t *testing.T) {
|
|||||||
// TestCoalescerMergesCEMark. ECN bits live in TC[1:0] = byte 1 mask 0x30.
|
// TestCoalescerMergesCEMark. ECN bits live in TC[1:0] = byte 1 mask 0x30.
|
||||||
func TestCoalescerIPv6MergesCEMark(t *testing.T) {
|
func TestCoalescerIPv6MergesCEMark(t *testing.T) {
|
||||||
w := &fakeTunWriter{gsoEnabled: true}
|
w := &fakeTunWriter{gsoEnabled: true}
|
||||||
c := NewTCPCoalescer(w)
|
c := NewTCPCoalescer(w, test.NewLogger())
|
||||||
pay := make([]byte, 1200)
|
pay := make([]byte, 1200)
|
||||||
// tcLow is the low 4 bits of TC; ECN occupies the bottom 2 of those.
|
// tcLow is the low 4 bits of TC; ECN occupies the bottom 2 of those.
|
||||||
if err := c.Commit(buildTCPv6(ecnECT0, 1000, tcpAck, pay)); err != nil {
|
if err := c.Commit(buildTCPv6(ecnECT0, 1000, tcpAck, pay)); err != nil {
|
||||||
|
|||||||
@@ -22,13 +22,14 @@ type SendBatch struct {
|
|||||||
backing []byte
|
backing []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSendBatch(out batchWriter, batchCap, slotCap int) *SendBatch {
|
// NewSendBatch makes a SendBatch with batchCap slots and an arenaSize byte buffer for slices to back those slots
|
||||||
|
func NewSendBatch(out batchWriter, batchCap, arenaSize int) *SendBatch {
|
||||||
return &SendBatch{
|
return &SendBatch{
|
||||||
out: out,
|
out: out,
|
||||||
bufs: make([][]byte, 0, batchCap),
|
bufs: make([][]byte, 0, batchCap),
|
||||||
dsts: make([]netip.AddrPort, 0, batchCap),
|
dsts: make([]netip.AddrPort, 0, batchCap),
|
||||||
ecns: make([]byte, 0, batchCap),
|
ecns: make([]byte, 0, batchCap),
|
||||||
backing: make([]byte, 0, batchCap*slotCap),
|
backing: make([]byte, 0, arenaSize),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user