diff --git a/control_tester.go b/control_tester.go index f927140b..728ac649 100644 --- a/control_tester.go +++ b/control_tester.go @@ -5,8 +5,6 @@ package nebula import ( "net/netip" - "github.com/google/gopacket" - "github.com/google/gopacket/layers" "github.com/slackhq/nebula/header" "github.com/slackhq/nebula/overlay" "github.com/slackhq/nebula/udp" @@ -22,7 +20,9 @@ func (c *Control) WaitForType(msgType header.MessageType, subType header.Message panic(err) } pipeTo.InjectUDPPacket(p) - if h.Type == msgType && h.Subtype == subType { + match := h.Type == msgType && h.Subtype == subType + p.Release() + if match { return } } @@ -38,7 +38,9 @@ func (c *Control) WaitForTypeByIndex(toIndex uint32, msgType header.MessageType, panic(err) } pipeTo.InjectUDPPacket(p) - if h.RemoteIndex == toIndex && h.Type == msgType && h.Subtype == subType { + match := h.RemoteIndex == toIndex && h.Type == msgType && h.Subtype == subType + p.Release() + if match { return } } @@ -90,65 +92,15 @@ func (c *Control) GetTunTxChan() <-chan []byte { return c.f.inside.(*overlay.TestTun).TxPackets } -// InjectUDPPacket will inject a packet into the udp side of nebula +// InjectUDPPacket injects a packet into the udp side. We copy internally so the caller keeps ownership of p. +// The copy comes from the freelist so steady-state alloc is zero. func (c *Control) InjectUDPPacket(p *udp.Packet) { - c.f.outside.(*udp.TesterConn).Send(p) + c.f.outside.(*udp.TesterConn).Send(p.Copy()) } -// InjectTunUDPPacket puts a udp packet on the tun interface. Using UDP here because it's a simpler protocol -func (c *Control) InjectTunUDPPacket(toAddr netip.Addr, toPort uint16, fromAddr netip.Addr, fromPort uint16, data []byte) { - serialize := make([]gopacket.SerializableLayer, 0) - var netLayer gopacket.NetworkLayer - if toAddr.Is6() { - if !fromAddr.Is6() { - panic("Cant send ipv6 to ipv4") - } - ip := &layers.IPv6{ - Version: 6, - NextHeader: layers.IPProtocolUDP, - SrcIP: fromAddr.Unmap().AsSlice(), - DstIP: toAddr.Unmap().AsSlice(), - } - serialize = append(serialize, ip) - netLayer = ip - } else { - if !fromAddr.Is4() { - panic("Cant send ipv4 to ipv6") - } - - ip := &layers.IPv4{ - Version: 4, - TTL: 64, - Protocol: layers.IPProtocolUDP, - SrcIP: fromAddr.Unmap().AsSlice(), - DstIP: toAddr.Unmap().AsSlice(), - } - serialize = append(serialize, ip) - netLayer = ip - } - - udp := layers.UDP{ - SrcPort: layers.UDPPort(fromPort), - DstPort: layers.UDPPort(toPort), - } - err := udp.SetNetworkLayerForChecksum(netLayer) - if err != nil { - panic(err) - } - - buffer := gopacket.NewSerializeBuffer() - opt := gopacket.SerializeOptions{ - ComputeChecksums: true, - FixLengths: true, - } - - serialize = append(serialize, &udp, gopacket.Payload(data)) - err = gopacket.SerializeLayers(buffer, opt, serialize...) - if err != nil { - panic(err) - } - - c.f.inside.(*overlay.TestTun).Send(buffer.Bytes()) +// InjectTunPacket pushes an IP packet onto the tun interface. +func (c *Control) InjectTunPacket(packet []byte) { + c.f.inside.(*overlay.TestTun).Send(packet) } func (c *Control) GetVpnAddrs() []netip.Addr { diff --git a/e2e/handshake_manager_test.go b/e2e/handshake_manager_test.go index 1c6ebacc..b06564d1 100644 --- a/e2e/handshake_manager_test.go +++ b/e2e/handshake_manager_test.go @@ -47,7 +47,7 @@ func TestHandshakeRetransmitDuplicate(t *testing.T) { defer r.RenderFlow() t.Log("Trigger handshake from me to them") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi"))) t.Log("Grab my msg1") msg1 := myControl.GetFromUDP(true) @@ -97,7 +97,7 @@ func TestHandshakeTruncatedPacketRecovery(t *testing.T) { defer r.RenderFlow() t.Log("Trigger handshake") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi"))) t.Log("Get msg1 and deliver to responder") msg1 := myControl.GetFromUDP(true) @@ -146,7 +146,7 @@ func TestHandshakeOrphanedMsg2Dropped(t *testing.T) { defer r.RenderFlow() t.Log("Complete a normal handshake") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi"))) r.RouteForAllUntilTxTun(theirControl) assertTunnel(t, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), myControl, theirControl, r) @@ -248,7 +248,7 @@ func TestHandshakeLateResponse(t *testing.T) { theirControl.Start() t.Log("Trigger handshake from me") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi"))) t.Log("Grab msg1 but don't deliver") msg1 := myControl.GetFromUDP(true) @@ -292,7 +292,7 @@ func TestHandshakeSelfConnectionRejected(t *testing.T) { myControl.Start() t.Log("Trigger handshake from me") - myControl.InjectTunUDPPacket(netip.MustParseAddr("10.128.0.2"), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")) + myControl.InjectTunPacket(BuildTunUDPPacket(netip.MustParseAddr("10.128.0.2"), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi"))) msg1 := myControl.GetFromUDP(true) t.Log("Drain any handshake retransmits before injecting") @@ -375,7 +375,7 @@ func TestHandshakeRemoteAllowList(t *testing.T) { defer r.RenderFlow() t.Log("Trigger handshake from them") - theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi")) + theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi"))) msg1 := theirControl.GetFromUDP(true) t.Log("Rewrite the source to a blocked IP and inject") @@ -426,7 +426,7 @@ func TestHandshakeAlreadySeenPreferredRemote(t *testing.T) { defer r.RenderFlow() t.Log("Complete a normal handshake via the router") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi"))) r.RouteForAllUntilTxTun(theirControl) assertTunnel(t, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), myControl, theirControl, r) @@ -437,7 +437,7 @@ func TestHandshakeAlreadySeenPreferredRemote(t *testing.T) { originalRemote := hi.CurrentRemote t.Log("Re-trigger traffic to cause a new handshake attempt (ErrAlreadySeen)") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("roam")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("roam"))) r.RouteForAllUntilTxTun(theirControl) t.Log("Verify tunnel still works") @@ -475,8 +475,8 @@ func TestHandshakeWrongResponderPacketStore(t *testing.T) { evilControl.Start() t.Log("Send multiple packets to them (cached during handshake)") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("packet1")) - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("packet2")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("packet1"))) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("packet2"))) t.Log("Route until evil tunnel is closed") h := &header.H{} @@ -540,7 +540,7 @@ func TestHandshakeRelayComplete(t *testing.T) { theirControl.Start() t.Log("Trigger handshake via relay") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi via relay")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi via relay"))) p := r.RouteForAllUntilTxTun(theirControl) assertUdpPacket(t, []byte("Hi via relay"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80) @@ -568,7 +568,7 @@ func TestHandshakeRelayComplete(t *testing.T) { } // NOTE: Relay V1 cert + IPv6 rejection is not tested here because -// InjectTunUDPPacket from a V4 node to a V6 address panics in the test +// BuildTunUDPPacket from a V4 node to a V6 address panics in the test // framework. The check is in handshake_manager.go handleOutbound relay // logic (lines ~304-313): if the relay host has a V1 cert and either // address is IPv6, the relay is skipped. diff --git a/e2e/handshakes_test.go b/e2e/handshakes_test.go index 43fa72f2..d0b9543c 100644 --- a/e2e/handshakes_test.go +++ b/e2e/handshakes_test.go @@ -16,6 +16,7 @@ import ( "github.com/slackhq/nebula/cert_test" "github.com/slackhq/nebula/e2e/router" "github.com/slackhq/nebula/header" + "github.com/slackhq/nebula/overlay" "github.com/slackhq/nebula/udp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -39,11 +40,22 @@ func BenchmarkHotPath(b *testing.B) { r.CancelFlowLogs() assertTunnel(b, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), myControl, theirControl, r) + + // Pre-build the IP packet bytes once so the bench measures the data plane, + // not gopacket SerializeLayers overhead. + prebuilt := BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) + + // EnableFanIn switches the router to a 0-alloc routing path. Required + // for hot-path benchmarks; would conflict with GetFromUDP-using tests. + r.EnableFanIn() + b.ResetTimer() for n := 0; n < b.N; n++ { - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) - _ = r.RouteForAllUntilTxTun(theirControl) + myControl.InjectTunPacket(prebuilt) + // Release the TUN-side bytes back to the harness freelist; the bench + // just confirms a packet arrived, the contents aren't inspected. + overlay.ReleaseTunBuf(r.RouteForAllUntilTxTun(theirControl)) } myControl.Stop() @@ -71,11 +83,15 @@ func BenchmarkHotPathRelay(b *testing.B) { theirControl.Start() assertTunnel(b, theirVpnIpNet[0].Addr(), myVpnIpNet[0].Addr(), theirControl, myControl, r) + + prebuilt := BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) + r.EnableFanIn() + b.ResetTimer() for n := 0; n < b.N; n++ { - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) - _ = r.RouteForAllUntilTxTun(theirControl) + myControl.InjectTunPacket(prebuilt) + overlay.ReleaseTunBuf(r.RouteForAllUntilTxTun(theirControl)) } myControl.Stop() @@ -97,7 +113,7 @@ func TestGoodHandshake(t *testing.T) { theirControl.Start() t.Log("Send a udp packet through to begin standing up the tunnel, this should come out the other side") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) t.Log("Have them consume my stage 0 packet. They have a tunnel now") theirControl.InjectUDPPacket(myControl.GetFromUDP(true)) @@ -191,7 +207,7 @@ func TestWrongResponderHandshake(t *testing.T) { evilControl.Start() t.Log("Start the handshake process, we will route until we see the evil tunnel closed") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) h := &header.H{} r.RouteForAllExitFunc(func(p *udp.Packet, c *nebula.Control) router.ExitType { @@ -273,7 +289,7 @@ func TestWrongResponderHandshakeStaticHostMap(t *testing.T) { evilControl.Start() t.Log("Start the handshake process, we will route until we see the evil tunnel closed") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) h := &header.H{} r.RouteForAllExitFunc(func(p *udp.Packet, c *nebula.Control) router.ExitType { @@ -352,8 +368,8 @@ func TestStage1Race(t *testing.T) { theirControl.Start() t.Log("Trigger a handshake to start on both me and them") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) - theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) + theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them"))) t.Log("Get both stage 1 handshake packets") myHsForThem := myControl.GetFromUDP(true) @@ -430,7 +446,7 @@ func TestUncleanShutdownRaceLoser(t *testing.T) { theirControl.Start() r.Log("Trigger a handshake from me to them") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) p := r.RouteForAllUntilTxTun(theirControl) assertUdpPacket(t, []byte("Hi from me"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80) @@ -441,7 +457,7 @@ func TestUncleanShutdownRaceLoser(t *testing.T) { myHostmap.Indexes = map[uint32]*nebula.HostInfo{} myHostmap.RemoteIndexes = map[uint32]*nebula.HostInfo{} - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me again")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me again"))) p = r.RouteForAllUntilTxTun(theirControl) assertUdpPacket(t, []byte("Hi from me again"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80) @@ -480,7 +496,7 @@ func TestUncleanShutdownRaceWinner(t *testing.T) { theirControl.Start() r.Log("Trigger a handshake from me to them") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) p := r.RouteForAllUntilTxTun(theirControl) assertUdpPacket(t, []byte("Hi from me"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80) @@ -492,7 +508,7 @@ func TestUncleanShutdownRaceWinner(t *testing.T) { theirHostmap.Indexes = map[uint32]*nebula.HostInfo{} theirHostmap.RemoteIndexes = map[uint32]*nebula.HostInfo{} - theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them again")) + theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them again"))) p = r.RouteForAllUntilTxTun(myControl) assertUdpPacket(t, []byte("Hi from them again"), p, theirVpnIpNet[0].Addr(), myVpnIpNet[0].Addr(), 80, 80) r.RenderHostmaps("Derp hostmaps", myControl, theirControl) @@ -535,7 +551,7 @@ func TestRelays(t *testing.T) { theirControl.Start() t.Log("Trigger a handshake from me to them via the relay") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) p := r.RouteForAllUntilTxTun(theirControl) r.Log("Assert the tunnel works") @@ -565,7 +581,7 @@ func TestRelaysDontCareAboutIps(t *testing.T) { theirControl.Start() t.Log("Trigger a handshake from me to them via the relay") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) p := r.RouteForAllUntilTxTun(theirControl) r.Log("Assert the tunnel works") @@ -595,14 +611,14 @@ func TestReestablishRelays(t *testing.T) { theirControl.Start() t.Log("Trigger a handshake from me to them via the relay") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) p := r.RouteForAllUntilTxTun(theirControl) r.Log("Assert the tunnel works") assertUdpPacket(t, []byte("Hi from me"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80) t.Log("Ensure packet traversal from them to me via the relay") - theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")) + theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them"))) p = r.RouteForAllUntilTxTun(myControl) r.Log("Assert the tunnel works") @@ -617,7 +633,7 @@ func TestReestablishRelays(t *testing.T) { for curIndexes >= start { curIndexes = len(myControl.GetHostmap().Indexes) r.Logf("Wait for the dead index to go away:start=%v indexes, current=%v indexes", start, curIndexes) - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me should fail")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me should fail"))) r.RouteForAllExitFunc(func(p *udp.Packet, c *nebula.Control) router.ExitType { return router.RouteAndExit @@ -634,7 +650,7 @@ func TestReestablishRelays(t *testing.T) { myControl.InjectLightHouseAddr(relayVpnIpNet[0].Addr(), relayUdpAddr) myControl.InjectRelays(theirVpnIpNet[0].Addr(), []netip.Addr{relayVpnIpNet[0].Addr()}) relayControl.InjectLightHouseAddr(theirVpnIpNet[0].Addr(), theirUdpAddr) - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) p = r.RouteForAllUntilTxTun(theirControl) r.Log("Assert the tunnel works") @@ -669,7 +685,7 @@ func TestReestablishRelays(t *testing.T) { t.Log("Assert the tunnel works the other way, too") for { t.Log("RouteForAllUntilTxTun") - theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")) + theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them"))) p = r.RouteForAllUntilTxTun(myControl) r.Log("Assert the tunnel works") @@ -739,8 +755,8 @@ func TestStage1RaceRelays(t *testing.T) { assertTunnel(t, theirVpnIpNet[0].Addr(), relayVpnIpNet[0].Addr(), theirControl, relayControl, r) r.Log("Trigger a handshake from both them and me via relay to them and me") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) - theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) + theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them"))) r.Log("Wait for a packet from them to me") p := r.RouteForAllUntilTxTun(myControl) @@ -787,8 +803,8 @@ func TestStage1RaceRelays2(t *testing.T) { assertTunnel(t, theirVpnIpNet[0].Addr(), relayVpnIpNet[0].Addr(), theirControl, relayControl, r) r.Log("Trigger a handshake from both them and me via relay to them and me") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) - theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) + theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them"))) //r.RouteUntilAfterMsgType(myControl, header.Control, header.MessageNone) //r.RouteUntilAfterMsgType(theirControl, header.Control, header.MessageNone) @@ -852,7 +868,7 @@ func TestRehandshakingRelays(t *testing.T) { theirControl.Start() t.Log("Trigger a handshake from me to them via the relay") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) p := r.RouteForAllUntilTxTun(theirControl) r.Log("Assert the tunnel works") @@ -957,7 +973,7 @@ func TestRehandshakingRelaysPrimary(t *testing.T) { theirControl.Start() t.Log("Trigger a handshake from me to them via the relay") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) p := r.RouteForAllUntilTxTun(theirControl) r.Log("Assert the tunnel works") @@ -1259,8 +1275,8 @@ func TestRaceRegression(t *testing.T) { //them rx stage:2 initiatorIndex=120607833 responderIndex=4209862089 t.Log("Start both handshakes") - myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) - theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) + theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them"))) t.Log("Get both stage 1") myStage1ForThem := myControl.GetFromUDP(true) @@ -1476,7 +1492,7 @@ func TestGoodHandshakeUnsafeDest(t *testing.T) { theirControl.Start() t.Log("Send a udp packet through to begin standing up the tunnel, this should come out the other side") - myControl.InjectTunUDPPacket(spookyDest, 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) + myControl.InjectTunPacket(BuildTunUDPPacket(spookyDest, 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))) t.Log("Have them consume my stage 0 packet. They have a tunnel now") theirControl.InjectUDPPacket(myControl.GetFromUDP(true)) @@ -1504,7 +1520,7 @@ func TestGoodHandshakeUnsafeDest(t *testing.T) { assertUdpPacket(t, []byte("Hi from me"), myCachedPacket, myVpnIpNet[0].Addr(), spookyDest, 80, 80) //reply - theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, spookyDest, 80, []byte("Hi from the spookyman")) + theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, spookyDest, 80, []byte("Hi from the spookyman"))) //wait for reply theirControl.WaitForType(1, 0, myControl) theirCachedPacket := myControl.GetFromTun(true) diff --git a/e2e/helpers_test.go b/e2e/helpers_test.go index 381ae897..b555fbc4 100644 --- a/e2e/helpers_test.go +++ b/e2e/helpers_test.go @@ -294,12 +294,12 @@ func deadline(t *testing.T, seconds time.Duration) doneCb { func assertTunnel(t testing.TB, vpnIpA, vpnIpB netip.Addr, controlA, controlB *nebula.Control, r *router.R) { // Send a packet from them to me - controlB.InjectTunUDPPacket(vpnIpA, 80, vpnIpB, 90, []byte("Hi from B")) + controlB.InjectTunPacket(BuildTunUDPPacket(vpnIpA, 80, vpnIpB, 90, []byte("Hi from B"))) bPacket := r.RouteForAllUntilTxTun(controlA) assertUdpPacket(t, []byte("Hi from B"), bPacket, vpnIpB, vpnIpA, 90, 80) // And once more from me to them - controlA.InjectTunUDPPacket(vpnIpB, 80, vpnIpA, 90, []byte("Hello from A")) + controlA.InjectTunPacket(BuildTunUDPPacket(vpnIpB, 80, vpnIpA, 90, []byte("Hello from A"))) aPacket := r.RouteForAllUntilTxTun(controlB) assertUdpPacket(t, []byte("Hello from A"), aPacket, vpnIpA, vpnIpB, 90, 80) } @@ -408,3 +408,58 @@ func testLogLevelName() string { } return "info" } + +// BuildTunUDPPacket assembles an IP+UDP packet suitable for Control.InjectTunPacket. +// Using UDP here because it's a simpler protocol. +func BuildTunUDPPacket(toAddr netip.Addr, toPort uint16, fromAddr netip.Addr, fromPort uint16, data []byte) []byte { + serialize := make([]gopacket.SerializableLayer, 0) + var netLayer gopacket.NetworkLayer + if toAddr.Is6() { + if !fromAddr.Is6() { + panic("Cant send ipv6 to ipv4") + } + ip := &layers.IPv6{ + Version: 6, + NextHeader: layers.IPProtocolUDP, + SrcIP: fromAddr.Unmap().AsSlice(), + DstIP: toAddr.Unmap().AsSlice(), + } + serialize = append(serialize, ip) + netLayer = ip + } else { + if !fromAddr.Is4() { + panic("Cant send ipv4 to ipv6") + } + + ip := &layers.IPv4{ + Version: 4, + TTL: 64, + Protocol: layers.IPProtocolUDP, + SrcIP: fromAddr.Unmap().AsSlice(), + DstIP: toAddr.Unmap().AsSlice(), + } + serialize = append(serialize, ip) + netLayer = ip + } + + udp := layers.UDP{ + SrcPort: layers.UDPPort(fromPort), + DstPort: layers.UDPPort(toPort), + } + if err := udp.SetNetworkLayerForChecksum(netLayer); err != nil { + panic(err) + } + + buffer := gopacket.NewSerializeBuffer() + opt := gopacket.SerializeOptions{ + ComputeChecksums: true, + FixLengths: true, + } + + serialize = append(serialize, &udp, gopacket.Payload(data)) + if err := gopacket.SerializeLayers(buffer, opt, serialize...); err != nil { + panic(err) + } + + return buffer.Bytes() +} diff --git a/e2e/router/router.go b/e2e/router/router.go index c8264ab7..72012073 100644 --- a/e2e/router/router.go +++ b/e2e/router/router.go @@ -13,6 +13,7 @@ import ( "regexp" "sort" "sync" + "sync/atomic" "testing" "time" @@ -24,6 +25,19 @@ import ( "golang.org/x/exp/maps" ) +// outNatKey is the (from, to) pair used by outNat. Comparable struct, so it works as a map key without the +// allocation cost of a string-concat key. +type outNatKey struct { + from, to netip.AddrPort +} + +// fannedPacket pairs a UDP TX packet with its source control so the router can route it after popping from +// the fan-in channel. +type fannedPacket struct { + from *nebula.Control + pkt *udp.Packet +} + type R struct { // Simple map of the ip:port registered on a control to the control // Basically a router, right? @@ -34,12 +48,28 @@ type R struct { // A last used map, if an inbound packet hit the inNat map then // all return packets should use the same last used inbound address for the outbound sender - // map[from address + ":" + to address] => ip:port to rewrite in the udp packet to receiver - outNat map[string]netip.AddrPort + outNat map[outNatKey]netip.AddrPort // A map of vpn ip to the nebula control it belongs to vpnControls map[netip.Addr]*nebula.Control + // Cached select infrastructure for RouteForAllUntilTxTun. + // The controls map is immutable after NewR so the cases are good for the test lifetime. + // We only rebuild if a different receiver is asked. + selRecvCtl *nebula.Control + selCases []reflect.SelectCase + selCtls []*nebula.Control + + // Optional fan-in mode for hot-path benchmarks: one forwarder goroutine per control drains UDP TX into udpFanIn, + // so RouteForAllUntilTxTun can do a fixed 2-way native select instead of paying reflect.Select per call. + // Off by default (would otherwise interleave with tests that use GetFromUDP directly on the same control). + // Enabled by EnableFanIn. + udpFanIn chan fannedPacket + stopFanIn chan struct{} + fanInWG sync.WaitGroup + fanInMu sync.Mutex + fanInOn atomic.Bool + ignoreFlows []ignoreFlow flow []flowEntry @@ -119,7 +149,7 @@ func NewR(t testing.TB, controls ...*nebula.Control) *R { controls: make(map[netip.AddrPort]*nebula.Control), vpnControls: make(map[netip.Addr]*nebula.Control), inNat: make(map[netip.AddrPort]*nebula.Control), - outNat: make(map[string]netip.AddrPort), + outNat: make(map[outNatKey]netip.AddrPort), flow: []flowEntry{}, ignoreFlows: []ignoreFlow{}, fn: filepath.Join("mermaid", fmt.Sprintf("%s.md", t.Name())), @@ -153,8 +183,10 @@ func NewR(t testing.TB, controls ...*nebula.Control) *R { case <-ctx.Done(): return case <-clockSource.C: + r.Lock() r.renderHostmaps("clock tick") r.renderFlow() + r.Unlock() } } }() @@ -180,15 +212,21 @@ func (r *R) AddRoute(ip netip.Addr, port uint16, c *nebula.Control) { // RenderFlow renders the packet flow seen up until now and stops further automatic renders from happening. func (r *R) RenderFlow() { r.cancelRender() + r.Lock() + defer r.Unlock() r.renderFlow() } // CancelFlowLogs stops flow logs from being tracked and destroys any logs already collected func (r *R) CancelFlowLogs() { r.cancelRender() + r.Lock() r.flow = nil + r.Unlock() } +// renderFlow writes the flow log to disk. Caller must hold r.Lock. renderFlow reads r.flow / r.additionalGraphs and +// the *packet pointers stashed inside, all of which are mutated under the same lock by routing paths. func (r *R) renderFlow() { if r.flow == nil { return @@ -434,68 +472,157 @@ func (r *R) RouteUntilTxTun(sender *nebula.Control, receiver *nebula.Control) [] panic("No control for udp tx " + a.String()) } fp := r.unlockedInjectFlow(sender, c, p, false) - c.InjectUDPPacket(p) + c.InjectUDPPacket(p) // copies internally; original is ours to release fp.WasReceived() r.Unlock() + p.Release() } } } -// RouteForAllUntilTxTun will route for everyone and return when a packet is seen on receivers tun -// If the router doesn't have the nebula controller for that address, we panic +// RouteForAllUntilTxTun will route for everyone and return when a packet is seen on the receiver's tun. +// If a control's UDP TX address can't be matched to a registered control, we panic. +// +// For allocation-sensitive callers (hot-path benchmarks, in particular relay +// benches with 3+ controls), call EnableFanIn() first. func (r *R) RouteForAllUntilTxTun(receiver *nebula.Control) []byte { + if r.fanInOn.Load() { + return r.routeFanIn(receiver) + } + return r.routeReflect(receiver) +} + +// routeFanIn is the alloc-free path used when EnableFanIn is in effect. +func (r *R) routeFanIn(receiver *nebula.Control) []byte { + tunTx := receiver.GetTunTxChan() + for { + select { + case p := <-tunTx: + r.Lock() + if r.flow != nil { + np := udp.Packet{Data: make([]byte, len(p))} + copy(np.Data, p) + r.unlockedInjectFlow(receiver, receiver, &np, true) + } + r.Unlock() + return p + case fp := <-r.udpFanIn: + r.routeUDP(fp.from, fp.pkt) + } + } +} + +// routeReflect is the default reflect.Select-based path. Pays the boxing allocation per call but doesn't interfere +// with tests that pull packets directly from controls' UDP TX channels via GetFromUDP. +func (r *R) routeReflect(receiver *nebula.Control) []byte { + sc, cm := r.selectCasesFor(receiver) + for { + x, rx, _ := reflect.Select(sc) + if x == 0 { + p := rx.Interface().([]byte) + r.Lock() + if r.flow != nil { + np := udp.Packet{Data: make([]byte, len(p))} + copy(np.Data, p) + r.unlockedInjectFlow(cm[x], cm[x], &np, true) + } + r.Unlock() + return p + } + r.routeUDP(cm[x], rx.Interface().(*udp.Packet)) + } +} + +// EnableFanIn switches RouteForAllUntilTxTun to the alloc-free fan-in path. +// One forwarder goroutine per registered control drains UDP TX into a shared channel that RouteForAllUntilTxTun selects +// on alongside the receiver's TUN TX channel. +func (r *R) EnableFanIn() { + r.fanInMu.Lock() + defer r.fanInMu.Unlock() + if r.fanInOn.Load() { + return + } + r.udpFanIn = make(chan fannedPacket, 32) + r.stopFanIn = make(chan struct{}) + for _, c := range r.controls { + r.startFanInWorker(c) + } + r.fanInOn.Store(true) + r.t.Cleanup(r.stopFanInWorkers) +} + +// startFanInWorker spawns a goroutine that drains c's UDP TX into r.udpFanIn. +func (r *R) startFanInWorker(c *nebula.Control) { + r.fanInWG.Add(1) + udpTx := c.GetUDPTxChan() + go func() { + defer r.fanInWG.Done() + for { + select { + case <-r.stopFanIn: + return + case p := <-udpTx: + select { + case <-r.stopFanIn: + p.Release() + return + case r.udpFanIn <- fannedPacket{from: c, pkt: p}: + } + } + } + }() +} + +// stopFanInWorkers signals the fan-in goroutines to exit and waits for them. +func (r *R) stopFanInWorkers() { + r.fanInMu.Lock() + wasOn := r.fanInOn.Swap(false) + r.fanInMu.Unlock() + if !wasOn { + return + } + close(r.stopFanIn) + r.fanInWG.Wait() +} + +// routeUDP forwards a UDP TX packet from the named source control to the destination control derived from p.To, +// releasing the source packet after InjectUDPPacket has copied its bytes into a fresh pool slot. +func (r *R) routeUDP(from *nebula.Control, p *udp.Packet) { + r.Lock() + defer r.Unlock() + a := from.GetUDPAddr() + c := r.getControl(a, p.To, p) + if c == nil { + panic(fmt.Sprintf("No control for udp tx %s", p.To)) + } + fp := r.unlockedInjectFlow(from, c, p, false) + c.InjectUDPPacket(p) // copies internally; original is ours to release + fp.WasReceived() + p.Release() +} + +// selectCasesFor returns the SelectCase array used by routeReflect: one slot for the receiver's TUN TX channel followed +// by one per control's UDP TX channel. Cached for the test lifetime, only rebuilt if the receiver changes. +func (r *R) selectCasesFor(receiver *nebula.Control) ([]reflect.SelectCase, []*nebula.Control) { + r.Lock() + defer r.Unlock() + if r.selRecvCtl == receiver && r.selCases != nil { + return r.selCases, r.selCtls + } sc := make([]reflect.SelectCase, len(r.controls)+1) cm := make([]*nebula.Control, len(r.controls)+1) - - i := 0 - sc[i] = reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(receiver.GetTunTxChan()), - Send: reflect.Value{}, - } - cm[i] = receiver - - i++ + sc[0] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(receiver.GetTunTxChan())} + cm[0] = receiver + i := 1 for _, c := range r.controls { - sc[i] = reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(c.GetUDPTxChan()), - Send: reflect.Value{}, - } - + sc[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.GetUDPTxChan())} cm[i] = c i++ } - - for { - x, rx, _ := reflect.Select(sc) - r.Lock() - - if x == 0 { - // we are the tun tx, we can exit - p := rx.Interface().([]byte) - np := udp.Packet{Data: make([]byte, len(p))} - copy(np.Data, p) - - r.unlockedInjectFlow(cm[x], cm[x], &np, true) - r.Unlock() - return p - - } else { - // we are a udp tx, route and continue - p := rx.Interface().(*udp.Packet) - a := cm[x].GetUDPAddr() - c := r.getControl(a, p.To, p) - if c == nil { - r.Unlock() - panic(fmt.Sprintf("No control for udp tx %s", p.To)) - } - fp := r.unlockedInjectFlow(cm[x], c, p, false) - c.InjectUDPPacket(p) - fp.WasReceived() - } - r.Unlock() - } + r.selRecvCtl = receiver + r.selCases = sc + r.selCtls = cm + return sc, cm } // RouteExitFunc will call the whatDo func with each udp packet from sender. @@ -522,6 +649,7 @@ func (r *R) RouteExitFunc(sender *nebula.Control, whatDo ExitFunc) { switch e { case ExitNow: r.Unlock() + p.Release() return case RouteAndExit: @@ -529,6 +657,7 @@ func (r *R) RouteExitFunc(sender *nebula.Control, whatDo ExitFunc) { receiver.InjectUDPPacket(p) fp.WasReceived() r.Unlock() + p.Release() return case KeepRouting: @@ -541,6 +670,7 @@ func (r *R) RouteExitFunc(sender *nebula.Control, whatDo ExitFunc) { } r.Unlock() + p.Release() } } @@ -641,6 +771,7 @@ func (r *R) RouteForAllExitFunc(whatDo ExitFunc) { switch e { case ExitNow: r.Unlock() + p.Release() return case RouteAndExit: @@ -648,6 +779,7 @@ func (r *R) RouteForAllExitFunc(whatDo ExitFunc) { receiver.InjectUDPPacket(p) fp.WasReceived() r.Unlock() + p.Release() return case KeepRouting: @@ -659,6 +791,7 @@ func (r *R) RouteForAllExitFunc(whatDo ExitFunc) { panic(fmt.Sprintf("Unknown exitFunc return: %v", e)) } r.Unlock() + p.Release() } } @@ -702,19 +835,20 @@ func (r *R) FlushAll() { } receiver.InjectUDPPacket(p) r.Unlock() + p.Release() } } // getControl performs or seeds NAT translation and returns the control for toAddr, p from fields may change // This is an internal router function, the caller must hold the lock func (r *R) getControl(fromAddr, toAddr netip.AddrPort, p *udp.Packet) *nebula.Control { - if newAddr, ok := r.outNat[fromAddr.String()+":"+toAddr.String()]; ok { + if newAddr, ok := r.outNat[outNatKey{from: fromAddr, to: toAddr}]; ok { p.From = newAddr } c, ok := r.inNat[toAddr] if ok { - r.outNat[c.GetUDPAddr().String()+":"+fromAddr.String()] = toAddr + r.outNat[outNatKey{from: c.GetUDPAddr(), to: fromAddr}] = toAddr return c } diff --git a/e2e/tunnels_test.go b/e2e/tunnels_test.go index 63c655f3..697f25af 100644 --- a/e2e/tunnels_test.go +++ b/e2e/tunnels_test.go @@ -355,14 +355,14 @@ func TestCrossStackRelaysWork(t *testing.T) { theirControl.Start() t.Log("Trigger a handshake from me to them via the relay") - myControl.InjectTunUDPPacket(theirVpnV6.Addr(), 80, myVpnV6.Addr(), 80, []byte("Hi from me")) + myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnV6.Addr(), 80, myVpnV6.Addr(), 80, []byte("Hi from me"))) p := r.RouteForAllUntilTxTun(theirControl) r.Log("Assert the tunnel works") assertUdpPacket(t, []byte("Hi from me"), p, myVpnV6.Addr(), theirVpnV6.Addr(), 80, 80) t.Log("reply?") - theirControl.InjectTunUDPPacket(myVpnV6.Addr(), 80, theirVpnV6.Addr(), 80, []byte("Hi from them")) + theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnV6.Addr(), 80, theirVpnV6.Addr(), 80, []byte("Hi from them"))) p = r.RouteForAllUntilTxTun(myControl) assertUdpPacket(t, []byte("Hi from them"), p, theirVpnV6.Addr(), myVpnV6.Addr(), 80, 80) diff --git a/overlay/tun_tester.go b/overlay/tun_tester.go index b2c2a0ea..8acd83f0 100644 --- a/overlay/tun_tester.go +++ b/overlay/tun_tester.go @@ -15,6 +15,7 @@ import ( "github.com/gaissmai/bart" "github.com/slackhq/nebula/config" "github.com/slackhq/nebula/routing" + "github.com/slackhq/nebula/udp" ) type TestTun struct { @@ -54,9 +55,12 @@ func newTunFromFd(_ *config.C, _ *slog.Logger, _ int, _ []netip.Prefix) (*TestTu return nil, fmt.Errorf("newTunFromFd not supported") } -// Send will place a byte array onto the receive queue for nebula to consume +// Send will place a byte array onto the receive queue for nebula to consume. // These are unencrypted ip layer frames destined for another nebula node. -// packets should exit the udp side, capture them with udpConn.Get +// packets should exit the udp side, capture them with udpConn.Get. +// +// Send copies the input via the freelist, so the caller is free to mutate +// or reuse it after the call returns. func (t *TestTun) Send(packet []byte) { if t.closed.Load() { return @@ -65,7 +69,9 @@ func (t *TestTun) Send(packet []byte) { if t.l.Enabled(context.Background(), slog.LevelDebug) { t.l.Debug("Tun receiving injected packet", "dataLen", len(packet)) } - t.rxPackets <- packet + buf := acquireTunBuf(len(packet)) + copy(buf, packet) + t.rxPackets <- buf } // Get will pull an unencrypted ip layer frame from the transmit queue @@ -110,12 +116,44 @@ func (t *TestTun) Write(b []byte) (n int, err error) { return 0, io.ErrClosedPipe } - packet := make([]byte, len(b), len(b)) + packet := acquireTunBuf(len(b)) copy(packet, b) t.TxPackets <- packet return len(b), nil } +// ReleaseTunBuf returns a slice from TxPackets to the harness freelist, don't use the bytes after the call. +// Channel-backed instead of sync.Pool because putting a []byte in a sync.Pool escapes the slice header to heap. +func ReleaseTunBuf(b []byte) { + if b == nil { + return + } + select { + case tunBufFreelist <- b: + default: + // Freelist full; drop the buffer for the GC. + } +} + +// tunBufFreelist retains the backing arrays for TestTun.Write so steady-state allocation drops to zero once the +// freelist has saturated for the current MTU. +var tunBufFreelist = make(chan []byte, 64) + +func acquireTunBuf(n int) []byte { + var b []byte + select { + case b = <-tunBufFreelist: + default: + b = make([]byte, 0, udp.MTU) + } + if cap(b) < n { + b = make([]byte, n) + } else { + b = b[:n] + } + return b +} + func (t *TestTun) Close() error { if t.closed.CompareAndSwap(false, true) { close(t.rxPackets) @@ -129,8 +167,14 @@ func (t *TestTun) Read(b []byte) (int, error) { if !ok { return 0, os.ErrClosed } + n := len(p) copy(b, p) - return len(p), nil + // Send always pushes a freelist-acquired slice, return it once we've copied the bytes into the caller's buffer. + select { + case tunBufFreelist <- p: + default: + } + return n, nil } func (t *TestTun) SupportsMultiqueue() bool { diff --git a/udp/udp_tester.go b/udp/udp_tester.go index fcd0967c..f872e32a 100644 --- a/udp/udp_tester.go +++ b/udp/udp_tester.go @@ -21,17 +21,48 @@ type Packet struct { Data []byte } +// Copy returns a fresh *Packet (from the freelist) with a duplicate Data buffer. func (u *Packet) Copy() *Packet { - n := &Packet{ - To: u.To, - From: u.From, - Data: make([]byte, len(u.Data)), + n := acquirePacket() + n.To = u.To + n.From = u.From + if cap(n.Data) < len(u.Data) { + n.Data = make([]byte, len(u.Data)) + } else { + n.Data = n.Data[:len(u.Data)] } - copy(n.Data, u.Data) return n } +// Release returns p to the harness packet freelist. +// Callers that pull a *Packet from Get / TxPackets must Release when done. +// Channel-backed instead of sync.Pool because sync.Pool's per-P caches drain badly under cross-goroutine Get/Put, +// and putting a []byte in a Pool escapes the slice header to heap. +func (p *Packet) Release() { + if p == nil { + return + } + p.Data = p.Data[:0] + select { + case packetFreelist <- p: + default: + // Freelist full; drop the *Packet for the GC. + } +} + +// packetFreelist retains *Packet structs (and their backing Data arrays) so steady-state allocation drops to zero. +var packetFreelist = make(chan *Packet, 64) + +func acquirePacket() *Packet { + select { + case p := <-packetFreelist: + return p + default: + return &Packet{} + } +} + type TesterConn struct { Addr netip.AddrPort @@ -64,13 +95,15 @@ func NewListener(l *slog.Logger, ip netip.Addr, port int, _ bool, _ int) (Conn, // this is an encrypted packet or a handshake message in most cases // packets were transmitted from another nebula node, you can send them with Tun.Send func (u *TesterConn) Send(packet *Packet) { - h := &header.H{} - if err := h.Parse(packet.Data); err != nil { - panic(err) - } if u.l.Enabled(context.Background(), slog.LevelDebug) { + // Parse the header only under debug logging, otherwise the + // allocation would show up in every Send call. + var h header.H + if err := h.Parse(packet.Data); err != nil { + panic(err) + } u.l.Debug("UDP receiving injected packet", - "header", h, + "header", &h, "udpAddr", packet.From, "dataLen", len(packet.Data), ) @@ -107,15 +140,18 @@ func (u *TesterConn) Get(block bool) *Packet { //********************************************************************************************************************// func (u *TesterConn) WriteTo(b []byte, addr netip.AddrPort) error { - p := &Packet{ - Data: make([]byte, len(b), len(b)), - From: u.Addr, - To: addr, + p := acquirePacket() + if cap(p.Data) < len(b) { + p.Data = make([]byte, len(b)) + } else { + p.Data = p.Data[:len(b)] } - copy(p.Data, b) + p.From = u.Addr + p.To = addr select { case <-u.done: + p.Release() return io.ErrClosedPipe case u.TxPackets <- p: return nil @@ -129,6 +165,7 @@ func (u *TesterConn) ListenOut(r EncReader) error { return os.ErrClosed case p := <-u.RxPackets: r(p.From, p.Data) + p.Release() } } }