diff --git a/inside.go b/inside.go index 319d887e..2a85e618 100644 --- a/inside.go +++ b/inside.go @@ -64,7 +64,7 @@ func (f *Interface) consumeInsidePacket(pkt wire.TunPacket, fwPacket *firewall.P } hostinfo, ready := f.getOrHandshakeConsiderRouting(fwPacket, func(hh *HandshakeHostInfo) { - // borrowed: SegmentSuperpacket builds each segment in the kernel-supplied pkt + // borrowed: PerSegment builds each segment in the kernel-supplied pkt // bytes underneath. cachePacket explicitly copies its argument (handshake_manager.go cachePacket), // so retaining segments past the loop is safe. err := pkt.PerSegment(func(seg []byte) error { @@ -500,8 +500,9 @@ func (f *Interface) prepareSendVia(via *HostInfo, // via is the HostInfo through which the message is relayed. // ad is the plaintext data to authenticate, but not encrypt // nb is a buffer used to store the nonce value, re-used for performance reasons. -// out is a buffer used to store the result of the Encrypt operation -// q indicates which writer to use to send the packet. +// out is a buffer used to store the result of the Encrypt operation. +// The write goes through writers[0] — SendVia is called from contexts +// without a per-queue index (handshake, async control paths). func (f *Interface) SendVia(via *HostInfo, relay *Relay, ad, diff --git a/overlay/batch/multi_coalesce.go b/overlay/batch/multi_coalesce.go index 5eef43f2..8e3aea13 100644 --- a/overlay/batch/multi_coalesce.go +++ b/overlay/batch/multi_coalesce.go @@ -40,11 +40,6 @@ type MultiCoalescer struct { arena *util.Arena } -// DefaultMultiArenaCap is the recommended arena capacity for a Multi-lane -// batcher: 64 slots × 65535 bytes ≈ 4 MiB, enough to hold one recvmmsg -// burst worth of MTU-sized packets without the arena growing. -const DefaultMultiArenaCap = initialSlots * 65535 - // NewMultiCoalescer builds a multi-lane batcher. tcpEnabled lets the caller // opt out of TCP coalescing (e.g. when the queue can't do TSO); udpEnabled // likewise gates UDP coalescing (only enable when USO was negotiated). diff --git a/overlay/batch/multi_coalesce_test.go b/overlay/batch/multi_coalesce_test.go index f837510f..7688e367 100644 --- a/overlay/batch/multi_coalesce_test.go +++ b/overlay/batch/multi_coalesce_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/slackhq/nebula/test" + "github.com/slackhq/nebula/util" ) // TestMultiCoalescerRoutesByProto confirms TCP/UDP/other land in the right @@ -11,7 +12,7 @@ import ( // else (ICMP here) falls through to plain Write. func TestMultiCoalescerRoutesByProto(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - m := NewMultiCoalescer(w, test.NewLogger(), NewArena(0), true, true) + m := NewMultiCoalescer(w, test.NewLogger(), util.NewArena(0), true, true) tcpPay := make([]byte, 1200) udpPay := make([]byte, 1200) @@ -53,7 +54,7 @@ func TestMultiCoalescerRoutesByProto(t *testing.T) { // the kernel via the passthrough lane rather than being lost. func TestMultiCoalescerDisabledUDPFallsThrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - m := NewMultiCoalescer(w, test.NewLogger(), NewArena(0), true, false) // TSO on, USO off + m := NewMultiCoalescer(w, test.NewLogger(), util.NewArena(0), true, false) // TSO on, USO off if err := m.Commit(buildUDPv4(1000, 53, make([]byte, 800))); err != nil { t.Fatal(err) @@ -75,7 +76,7 @@ func TestMultiCoalescerDisabledUDPFallsThrough(t *testing.T) { // TestMultiCoalescerDisabledTCPFallsThrough mirrors the TSO=off case. func TestMultiCoalescerDisabledTCPFallsThrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - m := NewMultiCoalescer(w, test.NewLogger(), NewArena(0), false, true) // TSO off, USO on + m := NewMultiCoalescer(w, test.NewLogger(), util.NewArena(0), false, true) // TSO off, USO on pay := make([]byte, 1200) if err := m.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { diff --git a/overlay/batch/passthrough.go b/overlay/batch/passthrough.go index 39aaca2e..82246dfa 100644 --- a/overlay/batch/passthrough.go +++ b/overlay/batch/passthrough.go @@ -3,7 +3,6 @@ package batch import ( "io" - "github.com/slackhq/nebula/udp" "github.com/slackhq/nebula/util" ) @@ -16,12 +15,6 @@ type Passthrough struct { cursor int } -const passthroughBaseNumSlots = 128 - -// DefaultPassthroughArenaCap is the recommended arena capacity for a -// standalone Passthrough batcher: 128 slots × udp.MTU ≈ 1.1 MiB. -const DefaultPassthroughArenaCap = passthroughBaseNumSlots * udp.MTU - func NewPassthrough(w io.Writer, slots int, arena *util.Arena) *Passthrough { return &Passthrough{ out: w, diff --git a/overlay/batch/tcp_coalesce_bench_test.go b/overlay/batch/tcp_coalesce_bench_test.go index f4ebfcdf..417a60de 100644 --- a/overlay/batch/tcp_coalesce_bench_test.go +++ b/overlay/batch/tcp_coalesce_bench_test.go @@ -7,14 +7,18 @@ import ( "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/test" + "github.com/slackhq/nebula/util" + "github.com/slackhq/nebula/wire" ) // nopTunWriter is a zero-alloc tio.GSOWriter for benchmarks. Discards // everything but satisfies the interface the coalescer detects. type nopTunWriter struct{} -func (nopTunWriter) Write(p []byte) (int, error) { return len(p), nil } -func (nopTunWriter) WriteGSO(hdr []byte, transportHdr []byte, pays [][]byte, _ tio.GSOProto) error { +func (nopTunWriter) Write(p []byte) (int, error) { return len(p), nil } +func (nopTunWriter) Read(_ []wire.TunPacket, _ []byte) (int, error) { return 0, nil } +func (nopTunWriter) Close() error { return nil } +func (nopTunWriter) WriteGSO(hdr []byte, transportHdr []byte, pays [][]byte, _ wire.GSOProto) error { return nil } func (nopTunWriter) Capabilities() tio.Capabilities { @@ -71,7 +75,7 @@ func buildICMPv4() []byte { // between batches, and reports per-packet cost. func runCommitBench(b *testing.B, pkts [][]byte, batchSize int) { b.Helper() - c := NewTCPCoalescer(nopTunWriter{}, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(nopTunWriter{}, test.NewLogger(), util.NewArena(0)) b.ReportAllocs() b.SetBytes(int64(len(pkts[0]))) b.ResetTimer() @@ -140,7 +144,7 @@ func BenchmarkCommitNonCoalesceableTCP(b *testing.B) { // is the bench that shows the savings of skipping the lane's re-parse. func runMultiCommitBench(b *testing.B, pkts [][]byte, batchSize int) { b.Helper() - m := NewMultiCoalescer(nopTunWriter{}, test.NewLogger(), NewArena(0), true, true) + m := NewMultiCoalescer(nopTunWriter{}, test.NewLogger(), util.NewArena(0), true, true) b.ReportAllocs() b.SetBytes(int64(len(pkts[0]))) b.ResetTimer() diff --git a/overlay/batch/tcp_coalesce_test.go b/overlay/batch/tcp_coalesce_test.go index 6da60861..9e6bad35 100644 --- a/overlay/batch/tcp_coalesce_test.go +++ b/overlay/batch/tcp_coalesce_test.go @@ -6,6 +6,8 @@ import ( "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/test" + "github.com/slackhq/nebula/util" + "github.com/slackhq/nebula/wire" ) // fakeTunWriter records plain Writes and WriteGSO calls without touching a @@ -53,7 +55,12 @@ func (w *fakeTunWriter) Write(p []byte) (int, error) { return len(p), nil } -func (w *fakeTunWriter) WriteGSO(hdr []byte, transportHdr []byte, pays [][]byte, _ tio.GSOProto) error { +// Read and Close exist solely to satisfy tio.Queue; coalescer tests never +// invoke them. +func (w *fakeTunWriter) Read(_ []wire.TunPacket, _ []byte) (int, error) { return 0, nil } +func (w *fakeTunWriter) Close() error { return nil } + +func (w *fakeTunWriter) WriteGSO(hdr []byte, transportHdr []byte, pays [][]byte, _ wire.GSOProto) error { hcopy := make([]byte, len(hdr)+len(transportHdr)) copy(hcopy, hdr) copy(hcopy[len(hdr):], transportHdr) @@ -128,7 +135,7 @@ const ( func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) { w := &fakeTunWriter{gsoEnabled: false} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pkt := buildTCPv4(1000, tcpAck, []byte("hello")) if err := c.Commit(pkt); err != nil { t.Fatal(err) @@ -147,7 +154,7 @@ func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) { func TestCoalescerNonTCPPassthrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pkt := make([]byte, 28) pkt[0] = 0x45 binary.BigEndian.PutUint16(pkt[2:4], 28) @@ -167,7 +174,7 @@ func TestCoalescerNonTCPPassthrough(t *testing.T) { func TestCoalescerSeedThenFlushAlone(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pkt := buildTCPv4(1000, tcpAck, make([]byte, 1000)) if err := c.Commit(pkt); err != nil { t.Fatal(err) @@ -194,7 +201,7 @@ func TestCoalescerSeedThenFlushAlone(t *testing.T) { func TestCoalescerCoalescesAdjacentACKs(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -234,7 +241,7 @@ func TestCoalescerCoalescesAdjacentACKs(t *testing.T) { func TestCoalescerRejectsSeqGap(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -253,7 +260,7 @@ func TestCoalescerRejectsSeqGap(t *testing.T) { func TestCoalescerRejectsFlagMismatch(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -274,7 +281,7 @@ func TestCoalescerRejectsFlagMismatch(t *testing.T) { func TestCoalescerRejectsFIN(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) fin := buildTCPv4(1000, tcpAck|tcpFin, []byte("x")) if err := c.Commit(fin); err != nil { t.Fatal(err) @@ -290,7 +297,7 @@ func TestCoalescerRejectsFIN(t *testing.T) { func TestCoalescerShortLastSegmentClosesChain(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) full := make([]byte, 1200) half := make([]byte, 500) if err := c.Commit(buildTCPv4(1000, tcpAck, full)); err != nil { @@ -325,7 +332,7 @@ func TestCoalescerShortLastSegmentClosesChain(t *testing.T) { func TestCoalescerPSHFinalizesChain(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -355,7 +362,7 @@ func TestCoalescerPSHFinalizesChain(t *testing.T) { // coalescer drops it the sender's push signal never reaches the receiver. func TestCoalescerPropagatesPSHFromAppended(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) // Seed has no PSH; second segment carries PSH and seals the chain. if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { @@ -383,7 +390,7 @@ func TestCoalescerPropagatesPSHFromAppended(t *testing.T) { func TestCoalescerRejectsDifferentFlow(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) p1 := buildTCPv4(1000, tcpAck, pay) p2 := buildTCPv4(2200, tcpAck, pay) @@ -405,7 +412,7 @@ func TestCoalescerRejectsDifferentFlow(t *testing.T) { func TestCoalescerRejectsIPOptions(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 500) pkt := buildTCPv4(1000, tcpAck, pay) // Bump IHL to 6 to simulate 4 bytes of IP options. Don't actually add @@ -425,7 +432,7 @@ func TestCoalescerRejectsIPOptions(t *testing.T) { func TestCoalescerCapBySegments(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 512) seq := uint32(1000) for i := 0; i < tcpCoalesceMaxSegs+5; i++ { @@ -449,7 +456,7 @@ func TestCoalescerCapBySegments(t *testing.T) { // flows coalesce independently in a single Flush. func TestCoalescerMultipleFlowsInSameBatch(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) // Flow A: sport 1000. Flow B: sport 3000. @@ -506,7 +513,7 @@ func TestCoalescerMultipleFlowsInSameBatch(t *testing.T) { // writing passthrough packets synchronously. func TestCoalescerPreservesArrivalOrder(t *testing.T) { w := &orderedFakeWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) // Sequence: coalesceable TCP, ICMP (passthrough), coalesceable TCP on // a different flow. Expected emit order: gso(X), plain(ICMP), gso(Y). pay := make([]byte, 1200) @@ -549,7 +556,12 @@ func (w *orderedFakeWriter) Write(p []byte) (int, error) { return len(p), nil } -func (w *orderedFakeWriter) WriteGSO(hdr []byte, transportHdr []byte, pays [][]byte, _ tio.GSOProto) error { +// Read and Close exist solely to satisfy tio.Queue; order tests never +// invoke them. +func (w *orderedFakeWriter) Read(_ []wire.TunPacket, _ []byte) (int, error) { return 0, nil } +func (w *orderedFakeWriter) Close() error { return nil } + +func (w *orderedFakeWriter) WriteGSO(hdr []byte, transportHdr []byte, pays [][]byte, _ wire.GSOProto) error { w.events = append(w.events, "gso") return nil } @@ -574,7 +586,7 @@ func stringSliceEq(a, b []string) bool { // packet (SYN) mid-flow only flushes its own flow, not others. func TestCoalescerInterleavedFlowsPreserveOrdering(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) // Flow A two segments. @@ -679,7 +691,7 @@ func buildTCPv6(tcLow byte, seq uint32, flags byte, payload []byte) []byte { // retains ECE on the wire. func TestCoalescerCoalescesEceFlow(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) flags := byte(tcpAck | tcpEce) if err := c.Commit(buildTCPv4(1000, flags, pay)); err != nil { @@ -708,7 +720,7 @@ func TestCoalescerCoalescesEceFlow(t *testing.T) { // in-flow segment seeds a new slot rather than extending the prior burst. func TestCoalescerCwrSealsFlow(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -741,7 +753,7 @@ func TestCoalescerCwrSealsFlow(t *testing.T) { // a CE-echoing window or none. func TestCoalescerEceMismatchReseeds(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4(1000, tcpAck|tcpEce, pay)); err != nil { t.Fatal(err) @@ -766,7 +778,7 @@ func TestCoalescerEceMismatchReseeds(t *testing.T) { // CE-marked packet still coalesces, and the merged superpacket carries CE. func TestCoalescerMergesCEMark(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) if err := c.Commit(buildTCPv4WithToS(ecnECT0, 1000, tcpAck, pay)); err != nil { t.Fatal(err) @@ -797,7 +809,7 @@ func TestCoalescerMergesCEMark(t *testing.T) { // headersMatch did not also relax DSCP — different DSCP must still split. func TestCoalescerDscpMismatchReseeds(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) // Same ECN (Not-ECT), different DSCP (0x10 vs 0x20 in upper 6 bits). tosA := byte(0x10<<2) | ecnNotECT @@ -820,7 +832,7 @@ func TestCoalescerDscpMismatchReseeds(t *testing.T) { // TestCoalescerCoalescesEceFlow. func TestCoalescerIPv6CoalescesEceFlow(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) flags := byte(tcpAck | tcpEce) if err := c.Commit(buildTCPv6(0, 1000, flags, pay)); err != nil { @@ -851,7 +863,7 @@ func TestCoalescerIPv6CoalescesEceFlow(t *testing.T) { // seen had the wire never reordered. func TestCoalescerSortsReorderedSeedsAndMerges(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) // Arrival order: seq 1000, 3400, 2200. The 3400 seeds a separate slot // because 3400 != nextSeq=2200, then 2200 fails to extend the 3400 slot @@ -887,7 +899,7 @@ func TestCoalescerSortsReorderedSeedsAndMerges(t *testing.T) { // without any cross-flow contamination. func TestCoalescerSortAcrossFlowsMergesEachIndependently(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) // Flow A (sport 1000) seq 100, 1300; flow B (sport 3000) seq 500, 1700. // Arrival: A.1300, B.1700, A.100, B.500 — every flow reordered. @@ -938,7 +950,7 @@ func TestCoalescerSortAcrossFlowsMergesEachIndependently(t *testing.T) { // boundary by an arbitrary number of segments. func TestCoalescerSortKeepsPSHBoundary(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) // Seq 1000 (no PSH) + 2200 (PSH) → seal one slot with PSH set. // Seq 3400 (no PSH) is contiguous to 3400 from seq 2200+1200; without @@ -966,7 +978,7 @@ func TestCoalescerSortKeepsPSHBoundary(t *testing.T) { // is sorted/merged independently. func TestCoalescerSortKeepsPassthroughBarrier(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) // First two segments seed S1 (then a 3400 reorder seeds S2). if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil { @@ -999,7 +1011,7 @@ func TestCoalescerSortKeepsPassthroughBarrier(t *testing.T) { // TestCoalescerMergesCEMark. ECN bits live in TC[1:0] = byte 1 mask 0x30. func TestCoalescerIPv6MergesCEMark(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewTCPCoalescer(w, test.NewLogger(), NewArena(0)) + c := NewTCPCoalescer(w, test.NewLogger(), util.NewArena(0)) pay := make([]byte, 1200) // tcLow is the low 4 bits of TC; ECN occupies the bottom 2 of those. if err := c.Commit(buildTCPv6(ecnECT0, 1000, tcpAck, pay)); err != nil { diff --git a/overlay/batch/udp_coalesce_test.go b/overlay/batch/udp_coalesce_test.go index 368afafc..068a668c 100644 --- a/overlay/batch/udp_coalesce_test.go +++ b/overlay/batch/udp_coalesce_test.go @@ -3,6 +3,8 @@ package batch import ( "encoding/binary" "testing" + + "github.com/slackhq/nebula/util" ) // buildUDPv4 builds a minimal IPv4+UDP packet with the given payload and ports. @@ -60,7 +62,7 @@ func buildUDPv6(sport, dport uint16, payload []byte) []byte { func TestUDPCoalescerPassthroughWhenGSOUnavailable(t *testing.T) { w := &fakeTunWriter{gsoEnabled: false} - c := NewUDPCoalescer(w, NewArena(0)) + c := NewUDPCoalescer(w, util.NewArena(0)) pkt := buildUDPv4(1000, 53, make([]byte, 100)) if err := c.Commit(pkt); err != nil { t.Fatal(err) @@ -78,7 +80,7 @@ func TestUDPCoalescerPassthroughWhenGSOUnavailable(t *testing.T) { func TestUDPCoalescerNonUDPPassthrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w, NewArena(0)) + c := NewUDPCoalescer(w, util.NewArena(0)) // ICMP packet pkt := make([]byte, 28) pkt[0] = 0x45 @@ -99,7 +101,7 @@ func TestUDPCoalescerNonUDPPassthrough(t *testing.T) { func TestUDPCoalescerSeedThenFlushAlone(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w, NewArena(0)) + c := NewUDPCoalescer(w, util.NewArena(0)) pkt := buildUDPv4(1000, 53, make([]byte, 800)) if err := c.Commit(pkt); err != nil { t.Fatal(err) @@ -116,7 +118,7 @@ func TestUDPCoalescerSeedThenFlushAlone(t *testing.T) { func TestUDPCoalescerCoalescesEqualSized(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w, NewArena(0)) + c := NewUDPCoalescer(w, util.NewArena(0)) pay := make([]byte, 1200) for i := 0; i < 3; i++ { if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil { @@ -156,7 +158,7 @@ func TestUDPCoalescerCoalescesEqualSized(t *testing.T) { // Last segment may be shorter, sealing the chain. func TestUDPCoalescerShortLastSegmentSeals(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w, NewArena(0)) + c := NewUDPCoalescer(w, util.NewArena(0)) full := make([]byte, 1200) tail := make([]byte, 600) if err := c.Commit(buildUDPv4(1000, 53, full)); err != nil { @@ -189,7 +191,7 @@ func TestUDPCoalescerShortLastSegmentSeals(t *testing.T) { // A larger-than-gsoSize packet cannot extend the slot — it reseeds. func TestUDPCoalescerLargerThanSeedReseeds(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w, NewArena(0)) + c := NewUDPCoalescer(w, util.NewArena(0)) if err := c.Commit(buildUDPv4(1000, 53, make([]byte, 800))); err != nil { t.Fatal(err) } @@ -207,7 +209,7 @@ func TestUDPCoalescerLargerThanSeedReseeds(t *testing.T) { // Different 5-tuples must not coalesce. func TestUDPCoalescerDifferentFlowsKeepSeparate(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w, NewArena(0)) + c := NewUDPCoalescer(w, util.NewArena(0)) pay := make([]byte, 800) if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil { t.Fatal(err) @@ -238,7 +240,7 @@ func TestUDPCoalescerDifferentFlowsKeepSeparate(t *testing.T) { // Caps at udpCoalesceMaxSegs. func TestUDPCoalescerCapsAtMaxSegs(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w, NewArena(0)) + c := NewUDPCoalescer(w, util.NewArena(0)) pay := make([]byte, 100) for i := 0; i < udpCoalesceMaxSegs+5; i++ { if err := c.Commit(buildUDPv4(1000, 53, pay)); err != nil { @@ -264,7 +266,7 @@ func TestUDPCoalescerCapsAtMaxSegs(t *testing.T) { // CE marks on appended segments must be merged into the seed's IP TOS. func TestUDPCoalescerMergesCEMark(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w, NewArena(0)) + c := NewUDPCoalescer(w, util.NewArena(0)) pay := make([]byte, 800) pkt0 := buildUDPv4(1000, 53, pay) // ECN=00 pkt1 := buildUDPv4(1000, 53, pay) @@ -293,7 +295,7 @@ func TestUDPCoalescerMergesCEMark(t *testing.T) { // IPv6 path: same flow, equal-sized → coalesced. func TestUDPCoalescerIPv6Coalesces(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w, NewArena(0)) + c := NewUDPCoalescer(w, util.NewArena(0)) pay := make([]byte, 1200) for i := 0; i < 3; i++ { if err := c.Commit(buildUDPv6(1000, 53, pay)); err != nil { @@ -329,7 +331,7 @@ func TestUDPCoalescerIPv6Coalesces(t *testing.T) { // DSCP differences must reseed (headers don't match outside ECN). func TestUDPCoalescerDSCPMismatchReseeds(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w, NewArena(0)) + c := NewUDPCoalescer(w, util.NewArena(0)) pay := make([]byte, 800) pkt0 := buildUDPv4(1000, 53, pay) pkt1 := buildUDPv4(1000, 53, pay) @@ -351,7 +353,7 @@ func TestUDPCoalescerDSCPMismatchReseeds(t *testing.T) { // Fragmented IPv4 must not be coalesced. func TestUDPCoalescerFragmentedIPv4PassesThrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w, NewArena(0)) + c := NewUDPCoalescer(w, util.NewArena(0)) pkt := buildUDPv4(1000, 53, make([]byte, 200)) binary.BigEndian.PutUint16(pkt[6:8], 0x2000) // MF=1 if err := c.Commit(pkt); err != nil { @@ -368,7 +370,7 @@ func TestUDPCoalescerFragmentedIPv4PassesThrough(t *testing.T) { // IPv4 with options is not admissible (we require IHL=5). func TestUDPCoalescerIPv4WithOptionsPassesThrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} - c := NewUDPCoalescer(w, NewArena(0)) + c := NewUDPCoalescer(w, util.NewArena(0)) pkt := buildUDPv4(1000, 53, make([]byte, 200)) pkt[0] = 0x46 // IHL = 6 (24-byte IPv4 header — has options) if err := c.Commit(pkt); err != nil { diff --git a/overlay/tio/queueset_gso_linux.go b/overlay/tio/queueset_gso_linux.go index 4914df88..fba008b8 100644 --- a/overlay/tio/queueset_gso_linux.go +++ b/overlay/tio/queueset_gso_linux.go @@ -62,6 +62,10 @@ func (c *offloadQueueSet) wakeForShutdown() error { } func (c *offloadQueueSet) Close() error { + if c.shutdownFd < 0 { + return nil + } + errs := []error{} // Signal all readers blocked in poll to wake up and exit @@ -75,5 +79,12 @@ func (c *offloadQueueSet) Close() error { } } + // All Offloads reference shutdownFd in their pollfd arrays, so close it + // only after every Offload.Close has returned. + if err := unix.Close(c.shutdownFd); err != nil { + errs = append(errs, err) + } + c.shutdownFd = -1 + return errors.Join(errs...) } diff --git a/overlay/tio/segment_bench_test.go b/overlay/tio/segment_bench_test.go index 13713010..a992613b 100644 --- a/overlay/tio/segment_bench_test.go +++ b/overlay/tio/segment_bench_test.go @@ -2,7 +2,11 @@ package tio -import "testing" +import ( + "testing" + + "github.com/slackhq/nebula/wire" +) // fakeBatch stands in for batch.TxBatcher inside the bench — same shape // of pointer-capturing closure that sendInsideMessage builds. @@ -21,27 +25,27 @@ type fakeIface struct { } // BenchmarkSegmentSuperpacketAllocsTSO measures allocation per -// SegmentSuperpacket call when a closure captures pointer-bearing -// receivers — the realistic shape of sendInsideMessage's closure. +// PerSegment call when a closure captures pointer-bearing receivers — the +// realistic shape of sendInsideMessage's closure. func BenchmarkSegmentSuperpacketAllocsTSO(b *testing.B) { const mss = 1400 const numSeg = 32 pkt := buildTSOv6(mss*numSeg, mss) - gso := GSOInfo{ + gso := wire.GSOInfo{ Size: mss, HdrLen: 60, // 40 (IPv6) + 20 (TCP) CsumStart: 40, - Proto: GSOProtoTCP, + Proto: wire.GSOProtoTCP, } - p := Packet{Bytes: pkt, GSO: gso} + p := wire.TunPacket{Bytes: pkt, Meta: gso} hi := &fakeHostInfo{remoteIndexId: 0xdeadbeef} f := &fakeIface{rebindCount: 7, hi: hi} fb := &fakeBatch{} - // SegmentSuperpacket consumes pkt destructively; refresh from a master - // copy each iter (matches the production pattern where every TUN read - // hands the segmenter a fresh kernel-supplied buffer). + // PerSegment consumes pkt destructively; refresh from a master copy + // each iter (matches the production pattern where every TUN read hands + // the segmenter a fresh kernel-supplied buffer). master := append([]byte(nil), pkt...) work := make([]byte, len(pkt)) p.Bytes = work @@ -50,7 +54,7 @@ func BenchmarkSegmentSuperpacketAllocsTSO(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { copy(work, master) - err := SegmentSuperpacket(p, func(seg []byte) error { + err := p.PerSegment(func(seg []byte) error { out := fb.Reserve(16 + len(seg) + 16) out[0] = byte(f.rebindCount) out[1] = byte(hi.counter) @@ -59,7 +63,7 @@ func BenchmarkSegmentSuperpacketAllocsTSO(b *testing.B) { return nil }) if err != nil { - b.Fatalf("SegmentSuperpacket: %v", err) + b.Fatalf("PerSegment: %v", err) } } } diff --git a/overlay/tio/segment_other.go b/overlay/tio/segment_other.go deleted file mode 100644 index 6bae0030..00000000 --- a/overlay/tio/segment_other.go +++ /dev/null @@ -1,22 +0,0 @@ -//go:build !linux || android || e2e_testing - -package tio - -import "fmt" - -func protoFromGSOType(_ uint8) (GSOProto, error) { - return 0, fmt.Errorf("GSO unsupported") -} - -// SegmentSuperpacket invokes fn once per segment of pkt. On non-Linux -// builds (and Android/e2e_testing) this package does not provide a Queue -// implementation, so any caller that does construct a Packet here can only -// be operating on non-superpacket bytes and the stub forwards them -// directly. A non-zero GSO field is a programming error from the caller -// and returns an explicit error rather than silently misbehaving. -func SegmentSuperpacket(pkt Packet, fn func(seg []byte) error) error { - if pkt.GSO.IsSuperpacket() { - return fmt.Errorf("tio: GSO superpacket on platform without segmentation support") - } - return fn(pkt.Bytes) -} diff --git a/overlay/tio/tio.go b/overlay/tio/tio.go index 08c5060c..97d9b91a 100644 --- a/overlay/tio/tio.go +++ b/overlay/tio/tio.go @@ -63,18 +63,16 @@ type Queue interface { // in pays except possibly the last is exactly the same size. proto picks // the L4 protocol so the writer knows which GSOType / CsumOffset to set. // -// Callers should also consult CapsProvider (via SupportsGSO or -// QueueCapabilities) for the per-protocol negotiated capability; an -// implementation of GSOWriter is necessary but not sufficient since USO -// may not have been negotiated even when TSO was. +// Callers should also consult Queue.Capabilities (via SupportsGSO) for +// the per-protocol negotiated capability; an implementation of GSOWriter +// is necessary but not sufficient since USO may not have been negotiated +// even when TSO was. type GSOWriter interface { WriteGSO(hdr []byte, transportHdr []byte, pays [][]byte, proto wire.GSOProto) error } // SupportsGSO reports whether w implements GSOWriter and the underlying -// queue advertises the negotiated capability for `want`. A writer that -// implements GSOWriter but not CapsProvider is treated as permissive -// (used by tests and fakes that don't negotiate). +// queue advertises the negotiated capability for `want` via Capabilities. func SupportsGSO(w Queue, want wire.GSOProto) (GSOWriter, bool) { gw, ok := w.(GSOWriter) if !ok { diff --git a/overlay/tio/tio_gso_linux.go b/overlay/tio/tio_gso_linux.go index 1a0b7b95..798f2c50 100644 --- a/overlay/tio/tio_gso_linux.go +++ b/overlay/tio/tio_gso_linux.go @@ -16,26 +16,14 @@ import ( "github.com/slackhq/nebula/overlay/tio/virtio" ) -// tunRxBufSize is the per-Read worst-case footprint inside rxBuf: one -// kernel-supplied packet body, which is at most ~64 KiB (tunReadBufSize). -// Segmentation happens at encrypt time on a per-routine MTU-sized scratch -// (see SegmentSuperpacket), so rxBuf only holds raw kernel-supplied bytes. -// We round up to give comfortable margin for the drain headroom check -// below. +// tunRxBufSize is the per-Read worst-case footprint for one kernel-supplied +// packet body, which is at most ~64 KiB (tunReadBufSize). Segmentation +// happens at encrypt time via wire.TunPacket.PerSegment on a per-routine +// MTU-sized scratch, so the caller-supplied read buffer only holds raw +// kernel-supplied bytes. Used by Read's drain loop to gate further reads +// on whether the remaining buffer can still hold one worst-case packet. const tunRxBufSize = 64 * 1024 -// tunRxBufCap is the total size we allocate for the per-reader rx -// buffer. With reads landing directly in rxBuf, each drain iteration -// consumes up to tunRxBufSize of headroom for the kernel-supplied bytes. -// Sized to two such iterations so the initial blocking read plus one -// drain read both fit without partial-drop. -const tunRxBufCap = tunRxBufSize * 2 - -// tunDrainCap caps how many packets a single Read will accumulate via -// the post-wake drain loop. Sized to soak up a burst of small ACKs while -// bounding how much work a single caller holds before handing off. -const tunDrainCap = 64 - // gsoMaxIovs caps the iovec budget WriteGSO assembles per call: 3 fixed // entries (virtio_net_hdr, IP hdr, transport hdr) plus up to gsoMaxIovs-3 // payload fragments. Sized comfortably above the typical kernel GSO @@ -50,7 +38,7 @@ const gsoMaxIovs = 256 // CHECKSUM_UNNECESSARY so the receiving network stack skips L4 checksum // verification. All packets that reach the plain Write paths already carry // a valid L4 checksum (either supplied by a remote peer whose ciphertext we -// AEAD-authenticated, produced by segmentTCPYield/segmentUDPYield during +// AEAD-authenticated, produced by virtio.SegmentTCP/SegmentUDP during // superpacket segmentation, or built locally by CreateRejectPacket), so // trusting them is safe. var validVnetHdr = [virtio.Size]byte{unix.VIRTIO_NET_HDR_F_DATA_VALID} @@ -71,12 +59,12 @@ type Offload struct { // readVnetScratch holds the 10-byte virtio_net_hdr split off the front of // every TUN read via readv(2). Decoupling the header from the packet body - // lets us read the body directly into rxBuf at the current rxOff with - // no userspace copy on the GSO_NONE fast path. + // lets us read the body directly into the caller-supplied mem at the + // current rxOff with no userspace copy on the GSO_NONE fast path. readVnetScratch [virtio.Size]byte // readIovs is the readv(2) iovec scratch wired once at construction — // iovec[0] points at readVnetScratch; iovec[1].Base/Len is updated per - // read to address the current rxBuf slot. + // read to address the caller-supplied mem slot. readIovs [2]unix.Iovec // usoEnabled records whether the kernel agreed to TUN_F_USO* on this FD, @@ -120,7 +108,8 @@ func newOffload(fd int, shutdownFd int, usoEnabled bool) (*Offload, error) { out.gsoIovs[0].SetLen(virtio.Size) // readIovs[0] is wired once to the virtio_net_hdr scratch; per-read we - // only repoint readIovs[1] at the next rxBuf slot (see readPacket). + // only repoint readIovs[1] at the next caller-supplied mem slot + // (see readPacket). out.readIovs[0].Base = &out.readVnetScratch[0] out.readIovs[0].SetLen(virtio.Size) @@ -182,17 +171,16 @@ func (r *Offload) blockOnWrite() error { } // readPacket issues a single readv(2) splitting the virtio_net_hdr off -// into readVnetScratch and reading the packet body directly into rxBuf at -// the current rxOff. Returns the body length (zero virtio header bytes, -// just the IP packet/superpacket). block controls whether EAGAIN is -// retried via poll: the initial read of a drain blocks; subsequent drain -// reads do not. +// into readVnetScratch and reading the packet body directly into mem. +// Returns the body length (zero virtio header bytes, just the IP +// packet/superpacket). block controls whether EAGAIN is retried via poll: +// the initial read of a drain blocks; subsequent drain reads do not. // -// The body iovec capacity is always tunReadBufSize; callers (the Read -// drain loop) gate entry on tunRxBufCap-rxOff >= tunRxBufSize, sized to -// hold one worst-case kernel-supplied packet body. Without that gate the -// body iovec could be smaller than the next inbound packet and the -// kernel would truncate. +// The body iovec capacity is always tunReadBufSize; the Read drain loop +// gates entry on len(mem)-rxOff >= tunRxBufSize, sized to hold one +// worst-case kernel-supplied packet body. Without that gate the body +// iovec could be smaller than the next inbound packet and the kernel +// would truncate. func (r *Offload) readPacket(mem []byte, block bool) (int, error) { for { r.readIovs[1].Base = &mem[0] @@ -223,16 +211,16 @@ func (r *Offload) readPacket(mem []byte, block bool) (int, error) { } } -// Read returns one or more packets from the tun. Each Packet either -// carries a single ready-to-use IP datagram (GSO zero) or a TSO/USO -// superpacket plus the GSOInfo a caller needs to segment it (see -// SegmentSuperpacket). The first read blocks via poll; once the fd is -// known readable we drain additional packets non-blocking until the -// kernel queue is empty (EAGAIN), we've collected tunDrainCap packets, -// or we're out of rxBuf headroom. This amortizes the poll wake over -// bursts of small packets (e.g. TCP ACKs). Packet.Bytes slices point -// into the Offload's internal buffer and are only valid until the next -// Read or Close on this Queue. +// Read returns one or more packets from the tun. Each wire.TunPacket +// either carries a single ready-to-use IP datagram (GSO zero) or a TSO/USO +// superpacket plus the wire.GSOInfo a caller needs to segment it (see +// wire.TunPacket.PerSegment). The first read blocks via poll; once the fd +// is known readable we drain additional packets non-blocking until the +// kernel queue is empty (EAGAIN), p is full, or mem no longer has room +// for another worst-case packet (tunRxBufSize). This amortizes the poll +// wake over bursts of small packets (e.g. TCP ACKs). The Bytes slices on +// returned packets point into the caller-supplied mem and are only valid +// until the next Read or Close on this Queue. func (r *Offload) Read(p []wire.TunPacket, mem []byte) (int, error) { maxP := len(p) maxM := len(mem) @@ -255,9 +243,9 @@ func (r *Offload) Read(p []wire.TunPacket, mem []byte) (int, error) { break } - // Drain: non-blocking reads until the kernel queue is empty, the drain - // cap is reached, or rxBuf no longer has room for another worst-case - // kernel-supplied packet (tunRxBufSize). + // Drain: non-blocking reads until the kernel queue is empty, p is full, + // or mem no longer has room for another worst-case kernel-supplied + // packet (tunRxBufSize). for len(p) < maxP && maxM-rxOff >= tunRxBufSize { n, err := r.readPacket(mem[rxOff:], false) if err != nil { @@ -279,13 +267,12 @@ func (r *Offload) Read(p []wire.TunPacket, mem []byte) (int, error) { return len(p), nil } -// decodeRead processes the packet sitting in rxBuf at rxOff (length -// pktLen). The bytes stay in rxBuf — for GSO_NONE we slice them as a -// regular IP datagram (running finishChecksum if NEEDS_CSUM is set); -// for TSO/USO superpackets we attach the corrected GSO metadata so the -// caller can segment lazily at encrypt time. rxOff advances past the -// kernel-supplied body and nothing else, since segmentation no longer -// writes back into rxBuf. +// decodeRead processes the packet sitting at mem[:pktLen]. The bytes stay +// in mem — for GSO_NONE we slice them as a regular IP datagram (running +// finishChecksum if NEEDS_CSUM is set); for TSO/USO superpackets we attach +// the corrected GSO metadata so the caller can segment lazily at encrypt +// time. The caller advances its own rxOff past the kernel-supplied body +// and nothing else, since segmentation no longer writes back into mem. func (r *Offload) decodeRead(p []wire.TunPacket, mem []byte, pktLen int) ([]wire.TunPacket, error) { if pktLen <= 0 { return p, fmt.Errorf("short tun read: %d", pktLen) @@ -307,8 +294,8 @@ func (r *Offload) decodeRead(p []wire.TunPacket, mem []byte, pktLen int) ([]wire // GSO superpacket: validate, fix the kernel-supplied HdrLen on the // FORWARD path (CorrectHdrLen), pick the L4 protocol, and attach - // the metadata. The bytes stay in rxBuf untouched, segmentation - // happens in SegmentSuperpacket at encrypt time. + // the metadata. The bytes stay in mem untouched; segmentation + // happens in wire.TunPacket.PerSegment at encrypt time. if err := virtio.CheckValid(body, hdr); err != nil { return p, err } diff --git a/overlay/tio/tun_linux_offload_test.go b/overlay/tio/tun_linux_offload_test.go index 1cf64925..caf24d83 100644 --- a/overlay/tio/tun_linux_offload_test.go +++ b/overlay/tio/tun_linux_offload_test.go @@ -12,6 +12,7 @@ import ( "gvisor.dev/gvisor/pkg/tcpip/checksum" "github.com/slackhq/nebula/overlay/tio/virtio" + "github.com/slackhq/nebula/wire" ) // testSegScratchSize is a generous segmentation scratch sized to fit any @@ -26,12 +27,11 @@ func verifyChecksum(b []byte, pseudo uint16) bool { } // segmentForTest is the test-only counterpart to the production -// SegmentSuperpacket path. It handles GSO_NONE (with optional +// wire.TunPacket.PerSegment path. It handles GSO_NONE (with optional // finishChecksum) inline and dispatches GSO superpackets through -// SegmentSuperpacket, draining each yielded segment into a -// freshly-copied [][]byte slot so callers can iterate after the call -// returns. Tests pre-set hdr.HdrLen correctly, so correctHdrLen is not -// invoked here. +// PerSegment, draining each yielded segment into a freshly-copied [][]byte +// slot so callers can iterate after the call returns. Tests pre-set +// hdr.HdrLen correctly, so correctHdrLen is not invoked here. func segmentForTest(pkt []byte, hdr virtio.Hdr, out *[][]byte, scratch []byte) error { if hdr.GSOType == unix.VIRTIO_NET_HDR_GSO_NONE { cp := append([]byte(nil), pkt...) @@ -47,13 +47,16 @@ func segmentForTest(pkt []byte, hdr virtio.Hdr, out *[][]byte, scratch []byte) e if err != nil { return err } - gso := GSOInfo{ - Size: hdr.GSOSize, - HdrLen: hdr.HdrLen, - CsumStart: hdr.CsumStart, - Proto: proto, + p := wire.TunPacket{ + Bytes: pkt, + Meta: wire.GSOInfo{ + Size: hdr.GSOSize, + HdrLen: hdr.HdrLen, + CsumStart: hdr.CsumStart, + Proto: proto, + }, } - return SegmentSuperpacket(Packet{Bytes: pkt, GSO: gso}, func(seg []byte) error { + return p.PerSegment(func(seg []byte) error { *out = append(*out, append([]byte(nil), seg...)) return nil }) @@ -592,8 +595,8 @@ func BenchmarkSegmentTCPv4(b *testing.B) { scratch := make([]byte, testSegScratchSize) out := make([][]byte, 0, 64) - // SegmentSuperpacket consumes its input destructively; restore - // pkt from a master copy each iteration. The restore mirrors the + // PerSegment consumes its input destructively; restore pkt from + // a master copy each iteration. The restore mirrors the // kernel→userspace copy that hands a fresh GSO blob to the // segmenter in production, so it's representative cost rather // than bench overhead. @@ -673,24 +676,21 @@ func buildTSOv6(payLen, gso int) []byte { return pkt } -// TestDecodeReadFitsMaxTSOAtDrainThreshold proves the rxBuf sizing is -// correct: when rxOff is at the maximum value the drain headroom check -// allows, decodeRead must still be able to absorb a worst-case 64KiB -// TSO superpacket without dropping the burst. With segmentation deferred -// to encrypt time, decodeRead writes only the kernel-supplied bytes into -// rxBuf, so the size requirement is just "fit one worst-case input." +// TestDecodeReadFitsMaxTSO proves decodeRead can absorb a worst-case +// 64KiB TSO superpacket without dropping it. With segmentation deferred to +// encrypt time, decodeRead writes nothing — it just slices the +// caller-supplied mem and attaches GSO metadata — so the size requirement +// is just "fit one worst-case input." // // Regression history: in a prior layout the rx buffer doubled as the // segmentation output, a near-threshold drain read returned "scratch too // small", the whole 45-segment TSO burst was dropped, and the remote's TCP -// fast-retransmit collapsed cwnd. Keeping this test in the new layout -// guards against re-introducing a drain headroom shortfall. -func TestDecodeReadFitsMaxTSOAtDrainThreshold(t *testing.T) { +// fast-retransmit collapsed cwnd. Keeping this test guards against +// re-introducing per-call sizing assumptions inside decodeRead. +func TestDecodeReadFitsMaxTSO(t *testing.T) { const ipv6HdrLen = 40 const tcpHdrLen = 20 const headerLen = ipv6HdrLen + tcpHdrLen - // Maximum TUN read body. The tunReadBufSize cap on readv's body iovec - // is what bounds the kernel's superpacket length. pktLen := tunReadBufSize payLen := pktLen - headerLen const targetSegs = 64 @@ -701,16 +701,12 @@ func TestDecodeReadFitsMaxTSOAtDrainThreshold(t *testing.T) { t.Fatalf("buildTSOv6 produced %d bytes, want %d", len(pkt), pktLen) } - o := &Offload{ - rxBuf: make([]byte, tunRxBufCap), - } - // rxOff at the maximum value the drain headroom check permits before - // it would refuse another read. Any drain-time read up to this - // threshold MUST still process correctly. - o.rxOff = tunRxBufCap - tunRxBufSize - - // Stage the body in rxBuf as if readv(2) just placed it there. - copy(o.rxBuf[o.rxOff:], pkt) + o := &Offload{} + // mem is sized exactly to one worst-case packet — the caller-side + // invariant the drain loop in Read enforces. decodeRead must process + // the burst within that window. + mem := make([]byte, pktLen) + copy(mem, pkt) // Encode the matching virtio_net_hdr. hdr := virtio.Hdr{ @@ -723,50 +719,42 @@ func TestDecodeReadFitsMaxTSOAtDrainThreshold(t *testing.T) { } hdr.Encode(o.readVnetScratch[:]) - startRxOff := o.rxOff - if err := o.decodeRead(pktLen); err != nil { - t.Fatalf("decodeRead at drain threshold returned %v — rxBuf sizing regression: "+ + var pkts []wire.TunPacket + pkts, err := o.decodeRead(pkts, mem, pktLen) + if err != nil { + t.Fatalf("decodeRead returned %v — sizing regression: "+ "tunRxBufSize=%d must hold one worst-case input (%d)", err, tunRxBufSize, pktLen) } - if len(o.pending) != 1 { - t.Fatalf("got %d packets, want 1 superpacket entry", len(o.pending)) + if len(pkts) != 1 { + t.Fatalf("got %d packets, want 1 superpacket entry", len(pkts)) } - got := o.pending[0] - if !got.GSO.IsSuperpacket() { - t.Fatalf("expected superpacket GSO metadata, got %+v", got.GSO) + got := pkts[0] + if !got.Meta.IsSuperpacket() { + t.Fatalf("expected superpacket GSO metadata, got %+v", got.Meta) } - if got.GSO.Proto != GSOProtoTCP { - t.Errorf("GSO.Proto=%d want TCP", got.GSO.Proto) + if got.Meta.Proto != wire.GSOProtoTCP { + t.Errorf("Meta.Proto=%d want TCP", got.Meta.Proto) } - if got.GSO.Size != uint16(gsoSize) { - t.Errorf("GSO.Size=%d want %d", got.GSO.Size, gsoSize) + if got.Meta.Size != uint16(gsoSize) { + t.Errorf("Meta.Size=%d want %d", got.Meta.Size, gsoSize) } - if got.GSO.HdrLen != uint16(headerLen) { - t.Errorf("GSO.HdrLen=%d want %d", got.GSO.HdrLen, headerLen) + if got.Meta.HdrLen != uint16(headerLen) { + t.Errorf("Meta.HdrLen=%d want %d", got.Meta.HdrLen, headerLen) } - if got.GSO.CsumStart != uint16(ipv6HdrLen) { - t.Errorf("GSO.CsumStart=%d want %d", got.GSO.CsumStart, ipv6HdrLen) + if got.Meta.CsumStart != uint16(ipv6HdrLen) { + t.Errorf("Meta.CsumStart=%d want %d", got.Meta.CsumStart, ipv6HdrLen) } if len(got.Bytes) != pktLen { t.Errorf("len(Bytes)=%d want %d", len(got.Bytes), pktLen) } - // rxOff advances exactly by the kernel-supplied body length — no - // segmentation output to account for any more. - if o.rxOff != startRxOff+pktLen { - t.Errorf("rxOff=%d want %d", o.rxOff, startRxOff+pktLen) - } - if o.rxOff > tunRxBufCap { - t.Fatalf("rxOff=%d overran rxBuf (cap=%d)", o.rxOff, tunRxBufCap) - } - // Validate that segmenting the returned superpacket reproduces the // expected per-segment IPv6 payload length and TCP checksum. wantSegs := (payLen + gsoSize - 1) / gsoSize gotSegs := 0 - if err := SegmentSuperpacket(got, func(seg []byte) error { + if err := got.PerSegment(func(seg []byte) error { defer func() { gotSegs++ }() if len(seg) < headerLen+1 { t.Errorf("seg %d too short: %d", gotSegs, len(seg)) @@ -786,7 +774,7 @@ func TestDecodeReadFitsMaxTSOAtDrainThreshold(t *testing.T) { } return nil }); err != nil { - t.Fatalf("SegmentSuperpacket: %v", err) + t.Fatalf("PerSegment: %v", err) } if gotSegs != wantSegs { t.Fatalf("got %d segments, want %d", gotSegs, wantSegs) diff --git a/util/arena.go b/util/arena.go index 0f7328de..2b4211b7 100644 --- a/util/arena.go +++ b/util/arena.go @@ -1,158 +1,7 @@ package util -import ( - "bytes" - "encoding/binary" -) - -// flowKey identifies a transport flow by {src, dst, sport, dport, family}. -// Comparable, so map lookups and linear scans over the slot list stay tight. -// Shared by the TCP and UDP coalescers; each coalescer keeps its own -// openSlots map, so a TCP and UDP flow on the same 5-tuple-without-proto -// never alias. -type flowKey struct { - src, dst [16]byte - sport, dport uint16 - isV6 bool -} - -// initialSlots is the starting capacity of the slot pool. One flow per -// packet is the worst case so this matches a typical carrier-side -// recvmmsg batch on the encrypted UDP socket. -const initialSlots = 64 - -// parsedIP is the IP-level result of parseIPPrologue. The caller layers -// L4-specific parsing (TCP / UDP) on top. -type parsedIP struct { - fk flowKey - ipHdrLen int - // pkt is the original buffer trimmed to the IP-declared total length. - // Anything below the IP layer (transport parsers) should slice into - // pkt rather than the unbounded original. - pkt []byte -} - -// parseIPPrologue extracts the IP-level fields the coalescers care about: -// IHL/payload length, version, src/dst addresses, and the L4 protocol byte. -// Returns ok=false for malformed input, IPv4 with options or fragmentation, -// or IPv6 with extension headers (all rejected by both coalescers in -// identical ways before this refactor). -// -// On success, p.pkt is len-trimmed to the IP-declared length so callers -// don't have to repeat the trim. wantProto is the IANA protocol number to -// require (6 for TCP, 17 for UDP); ok=false for any other value. -func parseIPPrologue(pkt []byte, wantProto byte) (parsedIP, bool) { - var p parsedIP - if len(pkt) < 20 { - return p, false - } - v := pkt[0] >> 4 - switch v { - case 4: - ihl := int(pkt[0]&0x0f) * 4 - if ihl != 20 { - return p, false - } - if pkt[9] != wantProto { - return p, false - } - // Reject actual fragmentation (MF or non-zero frag offset). - if binary.BigEndian.Uint16(pkt[6:8])&0x3fff != 0 { - return p, false - } - totalLen := int(binary.BigEndian.Uint16(pkt[2:4])) - if totalLen > len(pkt) || totalLen < ihl { - return p, false - } - p.ipHdrLen = 20 - p.fk.isV6 = false - copy(p.fk.src[:4], pkt[12:16]) - copy(p.fk.dst[:4], pkt[16:20]) - p.pkt = pkt[:totalLen] - case 6: - if len(pkt) < 40 { - return p, false - } - if pkt[6] != wantProto { - return p, false - } - payloadLen := int(binary.BigEndian.Uint16(pkt[4:6])) - if 40+payloadLen > len(pkt) { - return p, false - } - p.ipHdrLen = 40 - p.fk.isV6 = true - copy(p.fk.src[:], pkt[8:24]) - copy(p.fk.dst[:], pkt[24:40]) - p.pkt = pkt[:40+payloadLen] - default: - return p, false - } - return p, true -} - -// ipHeadersMatch compares the IP portion of two packet header prefixes for -// byte-for-byte equality on every field that must be identical across -// coalesced segments. Size/IPID/IPCsum and the 2-bit IP-level ECN field are -// masked out — the appendPayload step merges CE into the seed. -// -// The transport (L4) portion of the header is checked separately by the -// per-protocol matcher. -func ipHeadersMatch(a, b []byte, isV6 bool) bool { - if isV6 { - // IPv6: byte 0 = version/TC[7:4], byte 1 = TC[3:0]/flow[19:16], - // bytes [2:4] = flow[15:0], [6:8] = next_hdr/hop, [8:40] = src+dst. - // ECN lives in TC[1:0] = byte 1 mask 0x30. Skip [4:6] payload_len. - if a[0] != b[0] { - return false - } - if a[1]&^0x30 != b[1]&^0x30 { - return false - } - if !bytes.Equal(a[2:4], b[2:4]) { - return false - } - if !bytes.Equal(a[6:40], b[6:40]) { - return false - } - return true - } - // IPv4: byte 0 = version/IHL, byte 1 = DSCP(6)|ECN(2), - // [6:10] flags/fragoff/TTL/proto, [12:20] src+dst. - // Skip [2:4] total len, [4:6] id, [10:12] csum. - if a[0] != b[0] { - return false - } - if a[1]&^0x03 != b[1]&^0x03 { - return false - } - if !bytes.Equal(a[6:10], b[6:10]) { - return false - } - if !bytes.Equal(a[12:20], b[12:20]) { - return false - } - return true -} - -// mergeECNIntoSeed ORs the 2-bit IP-level ECN field of pkt's IP header -// onto the seed's IP header, so a CE mark on any coalesced segment -// propagates to the final superpacket. (CE is 0b11; ORing yields CE if -// any segment carried it.) Used by both TCP and UDP coalescers, so the -// invariant lives in one place. -func mergeECNIntoSeed(seedHdr, pktHdr []byte, isV6 bool) { - if isV6 { - seedHdr[1] |= pktHdr[1] & 0x30 - } else { - seedHdr[1] |= pktHdr[1] & 0x03 - } -} - // Arena is an injectable byte-slab that hands out non-overlapping borrowed -// slices via Reserve and releases them in bulk via Reset. Coalescers take -// an *Arena at construction so the caller controls the slab lifetime and -// can share one slab across multiple coalescers (MultiCoalescer hands the -// same *Arena to every lane so the lanes don't carry their own backings). +// slices via Reserve and releases them in bulk via Reset. // // Arena is not safe for concurrent use. // diff --git a/wire/wire_linux.go b/wire/wire_linux.go index eab006df..234e7714 100644 --- a/wire/wire_linux.go +++ b/wire/wire_linux.go @@ -6,17 +6,15 @@ import ( "github.com/slackhq/nebula/overlay/tio/virtio" ) -// PerSegment invokes fn once per segment of pkt. For non-GSO pkts -// fn is called once with pkt.Bytes (no segmentation, no copy). For GSO/USO -// superpackets fn is called once per segment with a slice of pkt.Bytes +// PerSegment invokes fn once per segment of t. For non-GSO packets fn is +// called once with t.Bytes (no segmentation, no copy). For GSO/USO +// superpackets fn is called once per segment with a slice of t.Bytes // holding that segment's plaintext (a freshly-patched L3+L4 header sliced -// in front of the original payload chunk). The slide is destructive: pkt is +// in front of the original payload chunk). The slide is destructive: t is // consumed by this call and its bytes are in an undefined state when -// PerSegment returns. Callers must not retain pkt or any earlier -// seg slice past fn's return for that segment. The scratch parameter is -// unused on the destructive path and kept only for cross-platform -// signature compatibility. Aborts and returns the first error from fn or -// from per-segment construction. +// PerSegment returns. Callers must not retain t or any earlier seg slice +// past fn's return for that segment. Aborts and returns the first error +// from fn or from per-segment construction. func (t *TunPacket) PerSegment(fn func(seg []byte) error) error { if !t.Meta.IsSuperpacket() { return fn(t.Bytes)