More stable e2e test harness, better for benchmarking (#1702)
Some checks failed
gofmt / Run gofmt (push) Failing after 2s
smoke-extra / Run extra smoke tests (push) Failing after 2s
smoke / Run multi node smoke test (push) Failing after 3s
Build and test / Build all and test on ubuntu-linux (push) Failing after 2s
Build and test / Build and test on linux with boringcrypto (push) Failing after 2s
Build and test / Build and test on linux with pkcs11 (push) Failing after 3s
Build and test / Build and test on macos-latest (push) Has been cancelled
Build and test / Build and test on windows-latest (push) Has been cancelled

This commit is contained in:
Nate Brown
2026-05-04 10:12:58 -05:00
committed by GitHub
parent 33c2d7277c
commit b7e9939e92
8 changed files with 418 additions and 180 deletions

View File

@@ -5,8 +5,6 @@ package nebula
import ( import (
"net/netip" "net/netip"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/slackhq/nebula/header" "github.com/slackhq/nebula/header"
"github.com/slackhq/nebula/overlay" "github.com/slackhq/nebula/overlay"
"github.com/slackhq/nebula/udp" "github.com/slackhq/nebula/udp"
@@ -22,7 +20,9 @@ func (c *Control) WaitForType(msgType header.MessageType, subType header.Message
panic(err) panic(err)
} }
pipeTo.InjectUDPPacket(p) pipeTo.InjectUDPPacket(p)
if h.Type == msgType && h.Subtype == subType { match := h.Type == msgType && h.Subtype == subType
p.Release()
if match {
return return
} }
} }
@@ -38,7 +38,9 @@ func (c *Control) WaitForTypeByIndex(toIndex uint32, msgType header.MessageType,
panic(err) panic(err)
} }
pipeTo.InjectUDPPacket(p) pipeTo.InjectUDPPacket(p)
if h.RemoteIndex == toIndex && h.Type == msgType && h.Subtype == subType { match := h.RemoteIndex == toIndex && h.Type == msgType && h.Subtype == subType
p.Release()
if match {
return return
} }
} }
@@ -90,65 +92,15 @@ func (c *Control) GetTunTxChan() <-chan []byte {
return c.f.inside.(*overlay.TestTun).TxPackets return c.f.inside.(*overlay.TestTun).TxPackets
} }
// InjectUDPPacket will inject a packet into the udp side of nebula // InjectUDPPacket injects a packet into the udp side. We copy internally so the caller keeps ownership of p.
// The copy comes from the freelist so steady-state alloc is zero.
func (c *Control) InjectUDPPacket(p *udp.Packet) { func (c *Control) InjectUDPPacket(p *udp.Packet) {
c.f.outside.(*udp.TesterConn).Send(p) c.f.outside.(*udp.TesterConn).Send(p.Copy())
} }
// InjectTunUDPPacket puts a udp packet on the tun interface. Using UDP here because it's a simpler protocol // InjectTunPacket pushes an IP packet onto the tun interface.
func (c *Control) InjectTunUDPPacket(toAddr netip.Addr, toPort uint16, fromAddr netip.Addr, fromPort uint16, data []byte) { func (c *Control) InjectTunPacket(packet []byte) {
serialize := make([]gopacket.SerializableLayer, 0) c.f.inside.(*overlay.TestTun).Send(packet)
var netLayer gopacket.NetworkLayer
if toAddr.Is6() {
if !fromAddr.Is6() {
panic("Cant send ipv6 to ipv4")
}
ip := &layers.IPv6{
Version: 6,
NextHeader: layers.IPProtocolUDP,
SrcIP: fromAddr.Unmap().AsSlice(),
DstIP: toAddr.Unmap().AsSlice(),
}
serialize = append(serialize, ip)
netLayer = ip
} else {
if !fromAddr.Is4() {
panic("Cant send ipv4 to ipv6")
}
ip := &layers.IPv4{
Version: 4,
TTL: 64,
Protocol: layers.IPProtocolUDP,
SrcIP: fromAddr.Unmap().AsSlice(),
DstIP: toAddr.Unmap().AsSlice(),
}
serialize = append(serialize, ip)
netLayer = ip
}
udp := layers.UDP{
SrcPort: layers.UDPPort(fromPort),
DstPort: layers.UDPPort(toPort),
}
err := udp.SetNetworkLayerForChecksum(netLayer)
if err != nil {
panic(err)
}
buffer := gopacket.NewSerializeBuffer()
opt := gopacket.SerializeOptions{
ComputeChecksums: true,
FixLengths: true,
}
serialize = append(serialize, &udp, gopacket.Payload(data))
err = gopacket.SerializeLayers(buffer, opt, serialize...)
if err != nil {
panic(err)
}
c.f.inside.(*overlay.TestTun).Send(buffer.Bytes())
} }
func (c *Control) GetVpnAddrs() []netip.Addr { func (c *Control) GetVpnAddrs() []netip.Addr {

View File

@@ -47,7 +47,7 @@ func TestHandshakeRetransmitDuplicate(t *testing.T) {
defer r.RenderFlow() defer r.RenderFlow()
t.Log("Trigger handshake from me to them") t.Log("Trigger handshake from me to them")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")))
t.Log("Grab my msg1") t.Log("Grab my msg1")
msg1 := myControl.GetFromUDP(true) msg1 := myControl.GetFromUDP(true)
@@ -97,7 +97,7 @@ func TestHandshakeTruncatedPacketRecovery(t *testing.T) {
defer r.RenderFlow() defer r.RenderFlow()
t.Log("Trigger handshake") t.Log("Trigger handshake")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")))
t.Log("Get msg1 and deliver to responder") t.Log("Get msg1 and deliver to responder")
msg1 := myControl.GetFromUDP(true) msg1 := myControl.GetFromUDP(true)
@@ -146,7 +146,7 @@ func TestHandshakeOrphanedMsg2Dropped(t *testing.T) {
defer r.RenderFlow() defer r.RenderFlow()
t.Log("Complete a normal handshake") t.Log("Complete a normal handshake")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")))
r.RouteForAllUntilTxTun(theirControl) r.RouteForAllUntilTxTun(theirControl)
assertTunnel(t, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), myControl, theirControl, r) assertTunnel(t, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), myControl, theirControl, r)
@@ -248,7 +248,7 @@ func TestHandshakeLateResponse(t *testing.T) {
theirControl.Start() theirControl.Start()
t.Log("Trigger handshake from me") t.Log("Trigger handshake from me")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")))
t.Log("Grab msg1 but don't deliver") t.Log("Grab msg1 but don't deliver")
msg1 := myControl.GetFromUDP(true) msg1 := myControl.GetFromUDP(true)
@@ -292,7 +292,7 @@ func TestHandshakeSelfConnectionRejected(t *testing.T) {
myControl.Start() myControl.Start()
t.Log("Trigger handshake from me") t.Log("Trigger handshake from me")
myControl.InjectTunUDPPacket(netip.MustParseAddr("10.128.0.2"), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")) myControl.InjectTunPacket(BuildTunUDPPacket(netip.MustParseAddr("10.128.0.2"), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")))
msg1 := myControl.GetFromUDP(true) msg1 := myControl.GetFromUDP(true)
t.Log("Drain any handshake retransmits before injecting") t.Log("Drain any handshake retransmits before injecting")
@@ -375,7 +375,7 @@ func TestHandshakeRemoteAllowList(t *testing.T) {
defer r.RenderFlow() defer r.RenderFlow()
t.Log("Trigger handshake from them") t.Log("Trigger handshake from them")
theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi")) theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi")))
msg1 := theirControl.GetFromUDP(true) msg1 := theirControl.GetFromUDP(true)
t.Log("Rewrite the source to a blocked IP and inject") t.Log("Rewrite the source to a blocked IP and inject")
@@ -426,7 +426,7 @@ func TestHandshakeAlreadySeenPreferredRemote(t *testing.T) {
defer r.RenderFlow() defer r.RenderFlow()
t.Log("Complete a normal handshake via the router") t.Log("Complete a normal handshake via the router")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi")))
r.RouteForAllUntilTxTun(theirControl) r.RouteForAllUntilTxTun(theirControl)
assertTunnel(t, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), myControl, theirControl, r) assertTunnel(t, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), myControl, theirControl, r)
@@ -437,7 +437,7 @@ func TestHandshakeAlreadySeenPreferredRemote(t *testing.T) {
originalRemote := hi.CurrentRemote originalRemote := hi.CurrentRemote
t.Log("Re-trigger traffic to cause a new handshake attempt (ErrAlreadySeen)") t.Log("Re-trigger traffic to cause a new handshake attempt (ErrAlreadySeen)")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("roam")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("roam")))
r.RouteForAllUntilTxTun(theirControl) r.RouteForAllUntilTxTun(theirControl)
t.Log("Verify tunnel still works") t.Log("Verify tunnel still works")
@@ -475,8 +475,8 @@ func TestHandshakeWrongResponderPacketStore(t *testing.T) {
evilControl.Start() evilControl.Start()
t.Log("Send multiple packets to them (cached during handshake)") t.Log("Send multiple packets to them (cached during handshake)")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("packet1")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("packet1")))
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("packet2")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("packet2")))
t.Log("Route until evil tunnel is closed") t.Log("Route until evil tunnel is closed")
h := &header.H{} h := &header.H{}
@@ -540,7 +540,7 @@ func TestHandshakeRelayComplete(t *testing.T) {
theirControl.Start() theirControl.Start()
t.Log("Trigger handshake via relay") t.Log("Trigger handshake via relay")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi via relay")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi via relay")))
p := r.RouteForAllUntilTxTun(theirControl) p := r.RouteForAllUntilTxTun(theirControl)
assertUdpPacket(t, []byte("Hi via relay"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80) assertUdpPacket(t, []byte("Hi via relay"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80)
@@ -568,7 +568,7 @@ func TestHandshakeRelayComplete(t *testing.T) {
} }
// NOTE: Relay V1 cert + IPv6 rejection is not tested here because // NOTE: Relay V1 cert + IPv6 rejection is not tested here because
// InjectTunUDPPacket from a V4 node to a V6 address panics in the test // BuildTunUDPPacket from a V4 node to a V6 address panics in the test
// framework. The check is in handshake_manager.go handleOutbound relay // framework. The check is in handshake_manager.go handleOutbound relay
// logic (lines ~304-313): if the relay host has a V1 cert and either // logic (lines ~304-313): if the relay host has a V1 cert and either
// address is IPv6, the relay is skipped. // address is IPv6, the relay is skipped.

View File

@@ -16,6 +16,7 @@ import (
"github.com/slackhq/nebula/cert_test" "github.com/slackhq/nebula/cert_test"
"github.com/slackhq/nebula/e2e/router" "github.com/slackhq/nebula/e2e/router"
"github.com/slackhq/nebula/header" "github.com/slackhq/nebula/header"
"github.com/slackhq/nebula/overlay"
"github.com/slackhq/nebula/udp" "github.com/slackhq/nebula/udp"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@@ -39,11 +40,22 @@ func BenchmarkHotPath(b *testing.B) {
r.CancelFlowLogs() r.CancelFlowLogs()
assertTunnel(b, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), myControl, theirControl, r) assertTunnel(b, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), myControl, theirControl, r)
// Pre-build the IP packet bytes once so the bench measures the data plane,
// not gopacket SerializeLayers overhead.
prebuilt := BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))
// EnableFanIn switches the router to a 0-alloc routing path. Required
// for hot-path benchmarks; would conflict with GetFromUDP-using tests.
r.EnableFanIn()
b.ResetTimer() b.ResetTimer()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(prebuilt)
_ = r.RouteForAllUntilTxTun(theirControl) // Release the TUN-side bytes back to the harness freelist; the bench
// just confirms a packet arrived, the contents aren't inspected.
overlay.ReleaseTunBuf(r.RouteForAllUntilTxTun(theirControl))
} }
myControl.Stop() myControl.Stop()
@@ -71,11 +83,15 @@ func BenchmarkHotPathRelay(b *testing.B) {
theirControl.Start() theirControl.Start()
assertTunnel(b, theirVpnIpNet[0].Addr(), myVpnIpNet[0].Addr(), theirControl, myControl, r) assertTunnel(b, theirVpnIpNet[0].Addr(), myVpnIpNet[0].Addr(), theirControl, myControl, r)
prebuilt := BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))
r.EnableFanIn()
b.ResetTimer() b.ResetTimer()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(prebuilt)
_ = r.RouteForAllUntilTxTun(theirControl) overlay.ReleaseTunBuf(r.RouteForAllUntilTxTun(theirControl))
} }
myControl.Stop() myControl.Stop()
@@ -97,7 +113,7 @@ func TestGoodHandshake(t *testing.T) {
theirControl.Start() theirControl.Start()
t.Log("Send a udp packet through to begin standing up the tunnel, this should come out the other side") t.Log("Send a udp packet through to begin standing up the tunnel, this should come out the other side")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
t.Log("Have them consume my stage 0 packet. They have a tunnel now") t.Log("Have them consume my stage 0 packet. They have a tunnel now")
theirControl.InjectUDPPacket(myControl.GetFromUDP(true)) theirControl.InjectUDPPacket(myControl.GetFromUDP(true))
@@ -191,7 +207,7 @@ func TestWrongResponderHandshake(t *testing.T) {
evilControl.Start() evilControl.Start()
t.Log("Start the handshake process, we will route until we see the evil tunnel closed") t.Log("Start the handshake process, we will route until we see the evil tunnel closed")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
h := &header.H{} h := &header.H{}
r.RouteForAllExitFunc(func(p *udp.Packet, c *nebula.Control) router.ExitType { r.RouteForAllExitFunc(func(p *udp.Packet, c *nebula.Control) router.ExitType {
@@ -273,7 +289,7 @@ func TestWrongResponderHandshakeStaticHostMap(t *testing.T) {
evilControl.Start() evilControl.Start()
t.Log("Start the handshake process, we will route until we see the evil tunnel closed") t.Log("Start the handshake process, we will route until we see the evil tunnel closed")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
h := &header.H{} h := &header.H{}
r.RouteForAllExitFunc(func(p *udp.Packet, c *nebula.Control) router.ExitType { r.RouteForAllExitFunc(func(p *udp.Packet, c *nebula.Control) router.ExitType {
@@ -352,8 +368,8 @@ func TestStage1Race(t *testing.T) {
theirControl.Start() theirControl.Start()
t.Log("Trigger a handshake to start on both me and them") t.Log("Trigger a handshake to start on both me and them")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")) theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")))
t.Log("Get both stage 1 handshake packets") t.Log("Get both stage 1 handshake packets")
myHsForThem := myControl.GetFromUDP(true) myHsForThem := myControl.GetFromUDP(true)
@@ -430,7 +446,7 @@ func TestUncleanShutdownRaceLoser(t *testing.T) {
theirControl.Start() theirControl.Start()
r.Log("Trigger a handshake from me to them") r.Log("Trigger a handshake from me to them")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
p := r.RouteForAllUntilTxTun(theirControl) p := r.RouteForAllUntilTxTun(theirControl)
assertUdpPacket(t, []byte("Hi from me"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80) assertUdpPacket(t, []byte("Hi from me"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80)
@@ -441,7 +457,7 @@ func TestUncleanShutdownRaceLoser(t *testing.T) {
myHostmap.Indexes = map[uint32]*nebula.HostInfo{} myHostmap.Indexes = map[uint32]*nebula.HostInfo{}
myHostmap.RemoteIndexes = map[uint32]*nebula.HostInfo{} myHostmap.RemoteIndexes = map[uint32]*nebula.HostInfo{}
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me again")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me again")))
p = r.RouteForAllUntilTxTun(theirControl) p = r.RouteForAllUntilTxTun(theirControl)
assertUdpPacket(t, []byte("Hi from me again"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80) assertUdpPacket(t, []byte("Hi from me again"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80)
@@ -480,7 +496,7 @@ func TestUncleanShutdownRaceWinner(t *testing.T) {
theirControl.Start() theirControl.Start()
r.Log("Trigger a handshake from me to them") r.Log("Trigger a handshake from me to them")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
p := r.RouteForAllUntilTxTun(theirControl) p := r.RouteForAllUntilTxTun(theirControl)
assertUdpPacket(t, []byte("Hi from me"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80) assertUdpPacket(t, []byte("Hi from me"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80)
@@ -492,7 +508,7 @@ func TestUncleanShutdownRaceWinner(t *testing.T) {
theirHostmap.Indexes = map[uint32]*nebula.HostInfo{} theirHostmap.Indexes = map[uint32]*nebula.HostInfo{}
theirHostmap.RemoteIndexes = map[uint32]*nebula.HostInfo{} theirHostmap.RemoteIndexes = map[uint32]*nebula.HostInfo{}
theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them again")) theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them again")))
p = r.RouteForAllUntilTxTun(myControl) p = r.RouteForAllUntilTxTun(myControl)
assertUdpPacket(t, []byte("Hi from them again"), p, theirVpnIpNet[0].Addr(), myVpnIpNet[0].Addr(), 80, 80) assertUdpPacket(t, []byte("Hi from them again"), p, theirVpnIpNet[0].Addr(), myVpnIpNet[0].Addr(), 80, 80)
r.RenderHostmaps("Derp hostmaps", myControl, theirControl) r.RenderHostmaps("Derp hostmaps", myControl, theirControl)
@@ -535,7 +551,7 @@ func TestRelays(t *testing.T) {
theirControl.Start() theirControl.Start()
t.Log("Trigger a handshake from me to them via the relay") t.Log("Trigger a handshake from me to them via the relay")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
p := r.RouteForAllUntilTxTun(theirControl) p := r.RouteForAllUntilTxTun(theirControl)
r.Log("Assert the tunnel works") r.Log("Assert the tunnel works")
@@ -565,7 +581,7 @@ func TestRelaysDontCareAboutIps(t *testing.T) {
theirControl.Start() theirControl.Start()
t.Log("Trigger a handshake from me to them via the relay") t.Log("Trigger a handshake from me to them via the relay")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
p := r.RouteForAllUntilTxTun(theirControl) p := r.RouteForAllUntilTxTun(theirControl)
r.Log("Assert the tunnel works") r.Log("Assert the tunnel works")
@@ -595,14 +611,14 @@ func TestReestablishRelays(t *testing.T) {
theirControl.Start() theirControl.Start()
t.Log("Trigger a handshake from me to them via the relay") t.Log("Trigger a handshake from me to them via the relay")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
p := r.RouteForAllUntilTxTun(theirControl) p := r.RouteForAllUntilTxTun(theirControl)
r.Log("Assert the tunnel works") r.Log("Assert the tunnel works")
assertUdpPacket(t, []byte("Hi from me"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80) assertUdpPacket(t, []byte("Hi from me"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80)
t.Log("Ensure packet traversal from them to me via the relay") t.Log("Ensure packet traversal from them to me via the relay")
theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")) theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")))
p = r.RouteForAllUntilTxTun(myControl) p = r.RouteForAllUntilTxTun(myControl)
r.Log("Assert the tunnel works") r.Log("Assert the tunnel works")
@@ -617,7 +633,7 @@ func TestReestablishRelays(t *testing.T) {
for curIndexes >= start { for curIndexes >= start {
curIndexes = len(myControl.GetHostmap().Indexes) curIndexes = len(myControl.GetHostmap().Indexes)
r.Logf("Wait for the dead index to go away:start=%v indexes, current=%v indexes", start, curIndexes) r.Logf("Wait for the dead index to go away:start=%v indexes, current=%v indexes", start, curIndexes)
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me should fail")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me should fail")))
r.RouteForAllExitFunc(func(p *udp.Packet, c *nebula.Control) router.ExitType { r.RouteForAllExitFunc(func(p *udp.Packet, c *nebula.Control) router.ExitType {
return router.RouteAndExit return router.RouteAndExit
@@ -634,7 +650,7 @@ func TestReestablishRelays(t *testing.T) {
myControl.InjectLightHouseAddr(relayVpnIpNet[0].Addr(), relayUdpAddr) myControl.InjectLightHouseAddr(relayVpnIpNet[0].Addr(), relayUdpAddr)
myControl.InjectRelays(theirVpnIpNet[0].Addr(), []netip.Addr{relayVpnIpNet[0].Addr()}) myControl.InjectRelays(theirVpnIpNet[0].Addr(), []netip.Addr{relayVpnIpNet[0].Addr()})
relayControl.InjectLightHouseAddr(theirVpnIpNet[0].Addr(), theirUdpAddr) relayControl.InjectLightHouseAddr(theirVpnIpNet[0].Addr(), theirUdpAddr)
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
p = r.RouteForAllUntilTxTun(theirControl) p = r.RouteForAllUntilTxTun(theirControl)
r.Log("Assert the tunnel works") r.Log("Assert the tunnel works")
@@ -669,7 +685,7 @@ func TestReestablishRelays(t *testing.T) {
t.Log("Assert the tunnel works the other way, too") t.Log("Assert the tunnel works the other way, too")
for { for {
t.Log("RouteForAllUntilTxTun") t.Log("RouteForAllUntilTxTun")
theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")) theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")))
p = r.RouteForAllUntilTxTun(myControl) p = r.RouteForAllUntilTxTun(myControl)
r.Log("Assert the tunnel works") r.Log("Assert the tunnel works")
@@ -739,8 +755,8 @@ func TestStage1RaceRelays(t *testing.T) {
assertTunnel(t, theirVpnIpNet[0].Addr(), relayVpnIpNet[0].Addr(), theirControl, relayControl, r) assertTunnel(t, theirVpnIpNet[0].Addr(), relayVpnIpNet[0].Addr(), theirControl, relayControl, r)
r.Log("Trigger a handshake from both them and me via relay to them and me") r.Log("Trigger a handshake from both them and me via relay to them and me")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")) theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")))
r.Log("Wait for a packet from them to me") r.Log("Wait for a packet from them to me")
p := r.RouteForAllUntilTxTun(myControl) p := r.RouteForAllUntilTxTun(myControl)
@@ -787,8 +803,8 @@ func TestStage1RaceRelays2(t *testing.T) {
assertTunnel(t, theirVpnIpNet[0].Addr(), relayVpnIpNet[0].Addr(), theirControl, relayControl, r) assertTunnel(t, theirVpnIpNet[0].Addr(), relayVpnIpNet[0].Addr(), theirControl, relayControl, r)
r.Log("Trigger a handshake from both them and me via relay to them and me") r.Log("Trigger a handshake from both them and me via relay to them and me")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")) theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")))
//r.RouteUntilAfterMsgType(myControl, header.Control, header.MessageNone) //r.RouteUntilAfterMsgType(myControl, header.Control, header.MessageNone)
//r.RouteUntilAfterMsgType(theirControl, header.Control, header.MessageNone) //r.RouteUntilAfterMsgType(theirControl, header.Control, header.MessageNone)
@@ -852,7 +868,7 @@ func TestRehandshakingRelays(t *testing.T) {
theirControl.Start() theirControl.Start()
t.Log("Trigger a handshake from me to them via the relay") t.Log("Trigger a handshake from me to them via the relay")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
p := r.RouteForAllUntilTxTun(theirControl) p := r.RouteForAllUntilTxTun(theirControl)
r.Log("Assert the tunnel works") r.Log("Assert the tunnel works")
@@ -957,7 +973,7 @@ func TestRehandshakingRelaysPrimary(t *testing.T) {
theirControl.Start() theirControl.Start()
t.Log("Trigger a handshake from me to them via the relay") t.Log("Trigger a handshake from me to them via the relay")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
p := r.RouteForAllUntilTxTun(theirControl) p := r.RouteForAllUntilTxTun(theirControl)
r.Log("Assert the tunnel works") r.Log("Assert the tunnel works")
@@ -1259,8 +1275,8 @@ func TestRaceRegression(t *testing.T) {
//them rx stage:2 initiatorIndex=120607833 responderIndex=4209862089 //them rx stage:2 initiatorIndex=120607833 responderIndex=4209862089
t.Log("Start both handshakes") t.Log("Start both handshakes")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")) theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them")))
t.Log("Get both stage 1") t.Log("Get both stage 1")
myStage1ForThem := myControl.GetFromUDP(true) myStage1ForThem := myControl.GetFromUDP(true)
@@ -1476,7 +1492,7 @@ func TestGoodHandshakeUnsafeDest(t *testing.T) {
theirControl.Start() theirControl.Start()
t.Log("Send a udp packet through to begin standing up the tunnel, this should come out the other side") t.Log("Send a udp packet through to begin standing up the tunnel, this should come out the other side")
myControl.InjectTunUDPPacket(spookyDest, 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(spookyDest, 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me")))
t.Log("Have them consume my stage 0 packet. They have a tunnel now") t.Log("Have them consume my stage 0 packet. They have a tunnel now")
theirControl.InjectUDPPacket(myControl.GetFromUDP(true)) theirControl.InjectUDPPacket(myControl.GetFromUDP(true))
@@ -1504,7 +1520,7 @@ func TestGoodHandshakeUnsafeDest(t *testing.T) {
assertUdpPacket(t, []byte("Hi from me"), myCachedPacket, myVpnIpNet[0].Addr(), spookyDest, 80, 80) assertUdpPacket(t, []byte("Hi from me"), myCachedPacket, myVpnIpNet[0].Addr(), spookyDest, 80, 80)
//reply //reply
theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, spookyDest, 80, []byte("Hi from the spookyman")) theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnIpNet[0].Addr(), 80, spookyDest, 80, []byte("Hi from the spookyman")))
//wait for reply //wait for reply
theirControl.WaitForType(1, 0, myControl) theirControl.WaitForType(1, 0, myControl)
theirCachedPacket := myControl.GetFromTun(true) theirCachedPacket := myControl.GetFromTun(true)

View File

@@ -294,12 +294,12 @@ func deadline(t *testing.T, seconds time.Duration) doneCb {
func assertTunnel(t testing.TB, vpnIpA, vpnIpB netip.Addr, controlA, controlB *nebula.Control, r *router.R) { func assertTunnel(t testing.TB, vpnIpA, vpnIpB netip.Addr, controlA, controlB *nebula.Control, r *router.R) {
// Send a packet from them to me // Send a packet from them to me
controlB.InjectTunUDPPacket(vpnIpA, 80, vpnIpB, 90, []byte("Hi from B")) controlB.InjectTunPacket(BuildTunUDPPacket(vpnIpA, 80, vpnIpB, 90, []byte("Hi from B")))
bPacket := r.RouteForAllUntilTxTun(controlA) bPacket := r.RouteForAllUntilTxTun(controlA)
assertUdpPacket(t, []byte("Hi from B"), bPacket, vpnIpB, vpnIpA, 90, 80) assertUdpPacket(t, []byte("Hi from B"), bPacket, vpnIpB, vpnIpA, 90, 80)
// And once more from me to them // And once more from me to them
controlA.InjectTunUDPPacket(vpnIpB, 80, vpnIpA, 90, []byte("Hello from A")) controlA.InjectTunPacket(BuildTunUDPPacket(vpnIpB, 80, vpnIpA, 90, []byte("Hello from A")))
aPacket := r.RouteForAllUntilTxTun(controlB) aPacket := r.RouteForAllUntilTxTun(controlB)
assertUdpPacket(t, []byte("Hello from A"), aPacket, vpnIpA, vpnIpB, 90, 80) assertUdpPacket(t, []byte("Hello from A"), aPacket, vpnIpA, vpnIpB, 90, 80)
} }
@@ -408,3 +408,58 @@ func testLogLevelName() string {
} }
return "info" return "info"
} }
// BuildTunUDPPacket assembles an IP+UDP packet suitable for Control.InjectTunPacket.
// Using UDP here because it's a simpler protocol.
func BuildTunUDPPacket(toAddr netip.Addr, toPort uint16, fromAddr netip.Addr, fromPort uint16, data []byte) []byte {
serialize := make([]gopacket.SerializableLayer, 0)
var netLayer gopacket.NetworkLayer
if toAddr.Is6() {
if !fromAddr.Is6() {
panic("Cant send ipv6 to ipv4")
}
ip := &layers.IPv6{
Version: 6,
NextHeader: layers.IPProtocolUDP,
SrcIP: fromAddr.Unmap().AsSlice(),
DstIP: toAddr.Unmap().AsSlice(),
}
serialize = append(serialize, ip)
netLayer = ip
} else {
if !fromAddr.Is4() {
panic("Cant send ipv4 to ipv6")
}
ip := &layers.IPv4{
Version: 4,
TTL: 64,
Protocol: layers.IPProtocolUDP,
SrcIP: fromAddr.Unmap().AsSlice(),
DstIP: toAddr.Unmap().AsSlice(),
}
serialize = append(serialize, ip)
netLayer = ip
}
udp := layers.UDP{
SrcPort: layers.UDPPort(fromPort),
DstPort: layers.UDPPort(toPort),
}
if err := udp.SetNetworkLayerForChecksum(netLayer); err != nil {
panic(err)
}
buffer := gopacket.NewSerializeBuffer()
opt := gopacket.SerializeOptions{
ComputeChecksums: true,
FixLengths: true,
}
serialize = append(serialize, &udp, gopacket.Payload(data))
if err := gopacket.SerializeLayers(buffer, opt, serialize...); err != nil {
panic(err)
}
return buffer.Bytes()
}

View File

@@ -13,6 +13,7 @@ import (
"regexp" "regexp"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@@ -24,6 +25,19 @@ import (
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
) )
// outNatKey is the (from, to) pair used by outNat. Comparable struct, so it works as a map key without the
// allocation cost of a string-concat key.
type outNatKey struct {
from, to netip.AddrPort
}
// fannedPacket pairs a UDP TX packet with its source control so the router can route it after popping from
// the fan-in channel.
type fannedPacket struct {
from *nebula.Control
pkt *udp.Packet
}
type R struct { type R struct {
// Simple map of the ip:port registered on a control to the control // Simple map of the ip:port registered on a control to the control
// Basically a router, right? // Basically a router, right?
@@ -34,12 +48,28 @@ type R struct {
// A last used map, if an inbound packet hit the inNat map then // A last used map, if an inbound packet hit the inNat map then
// all return packets should use the same last used inbound address for the outbound sender // all return packets should use the same last used inbound address for the outbound sender
// map[from address + ":" + to address] => ip:port to rewrite in the udp packet to receiver outNat map[outNatKey]netip.AddrPort
outNat map[string]netip.AddrPort
// A map of vpn ip to the nebula control it belongs to // A map of vpn ip to the nebula control it belongs to
vpnControls map[netip.Addr]*nebula.Control vpnControls map[netip.Addr]*nebula.Control
// Cached select infrastructure for RouteForAllUntilTxTun.
// The controls map is immutable after NewR so the cases are good for the test lifetime.
// We only rebuild if a different receiver is asked.
selRecvCtl *nebula.Control
selCases []reflect.SelectCase
selCtls []*nebula.Control
// Optional fan-in mode for hot-path benchmarks: one forwarder goroutine per control drains UDP TX into udpFanIn,
// so RouteForAllUntilTxTun can do a fixed 2-way native select instead of paying reflect.Select per call.
// Off by default (would otherwise interleave with tests that use GetFromUDP directly on the same control).
// Enabled by EnableFanIn.
udpFanIn chan fannedPacket
stopFanIn chan struct{}
fanInWG sync.WaitGroup
fanInMu sync.Mutex
fanInOn atomic.Bool
ignoreFlows []ignoreFlow ignoreFlows []ignoreFlow
flow []flowEntry flow []flowEntry
@@ -119,7 +149,7 @@ func NewR(t testing.TB, controls ...*nebula.Control) *R {
controls: make(map[netip.AddrPort]*nebula.Control), controls: make(map[netip.AddrPort]*nebula.Control),
vpnControls: make(map[netip.Addr]*nebula.Control), vpnControls: make(map[netip.Addr]*nebula.Control),
inNat: make(map[netip.AddrPort]*nebula.Control), inNat: make(map[netip.AddrPort]*nebula.Control),
outNat: make(map[string]netip.AddrPort), outNat: make(map[outNatKey]netip.AddrPort),
flow: []flowEntry{}, flow: []flowEntry{},
ignoreFlows: []ignoreFlow{}, ignoreFlows: []ignoreFlow{},
fn: filepath.Join("mermaid", fmt.Sprintf("%s.md", t.Name())), fn: filepath.Join("mermaid", fmt.Sprintf("%s.md", t.Name())),
@@ -153,8 +183,10 @@ func NewR(t testing.TB, controls ...*nebula.Control) *R {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-clockSource.C: case <-clockSource.C:
r.Lock()
r.renderHostmaps("clock tick") r.renderHostmaps("clock tick")
r.renderFlow() r.renderFlow()
r.Unlock()
} }
} }
}() }()
@@ -180,15 +212,21 @@ func (r *R) AddRoute(ip netip.Addr, port uint16, c *nebula.Control) {
// RenderFlow renders the packet flow seen up until now and stops further automatic renders from happening. // RenderFlow renders the packet flow seen up until now and stops further automatic renders from happening.
func (r *R) RenderFlow() { func (r *R) RenderFlow() {
r.cancelRender() r.cancelRender()
r.Lock()
defer r.Unlock()
r.renderFlow() r.renderFlow()
} }
// CancelFlowLogs stops flow logs from being tracked and destroys any logs already collected // CancelFlowLogs stops flow logs from being tracked and destroys any logs already collected
func (r *R) CancelFlowLogs() { func (r *R) CancelFlowLogs() {
r.cancelRender() r.cancelRender()
r.Lock()
r.flow = nil r.flow = nil
r.Unlock()
} }
// renderFlow writes the flow log to disk. Caller must hold r.Lock. renderFlow reads r.flow / r.additionalGraphs and
// the *packet pointers stashed inside, all of which are mutated under the same lock by routing paths.
func (r *R) renderFlow() { func (r *R) renderFlow() {
if r.flow == nil { if r.flow == nil {
return return
@@ -434,68 +472,157 @@ func (r *R) RouteUntilTxTun(sender *nebula.Control, receiver *nebula.Control) []
panic("No control for udp tx " + a.String()) panic("No control for udp tx " + a.String())
} }
fp := r.unlockedInjectFlow(sender, c, p, false) fp := r.unlockedInjectFlow(sender, c, p, false)
c.InjectUDPPacket(p) c.InjectUDPPacket(p) // copies internally; original is ours to release
fp.WasReceived() fp.WasReceived()
r.Unlock() r.Unlock()
p.Release()
} }
} }
} }
// RouteForAllUntilTxTun will route for everyone and return when a packet is seen on receivers tun // RouteForAllUntilTxTun will route for everyone and return when a packet is seen on the receiver's tun.
// If the router doesn't have the nebula controller for that address, we panic // If a control's UDP TX address can't be matched to a registered control, we panic.
//
// For allocation-sensitive callers (hot-path benchmarks, in particular relay
// benches with 3+ controls), call EnableFanIn() first.
func (r *R) RouteForAllUntilTxTun(receiver *nebula.Control) []byte { func (r *R) RouteForAllUntilTxTun(receiver *nebula.Control) []byte {
if r.fanInOn.Load() {
return r.routeFanIn(receiver)
}
return r.routeReflect(receiver)
}
// routeFanIn is the alloc-free path used when EnableFanIn is in effect.
func (r *R) routeFanIn(receiver *nebula.Control) []byte {
tunTx := receiver.GetTunTxChan()
for {
select {
case p := <-tunTx:
r.Lock()
if r.flow != nil {
np := udp.Packet{Data: make([]byte, len(p))}
copy(np.Data, p)
r.unlockedInjectFlow(receiver, receiver, &np, true)
}
r.Unlock()
return p
case fp := <-r.udpFanIn:
r.routeUDP(fp.from, fp.pkt)
}
}
}
// routeReflect is the default reflect.Select-based path. Pays the boxing allocation per call but doesn't interfere
// with tests that pull packets directly from controls' UDP TX channels via GetFromUDP.
func (r *R) routeReflect(receiver *nebula.Control) []byte {
sc, cm := r.selectCasesFor(receiver)
for {
x, rx, _ := reflect.Select(sc)
if x == 0 {
p := rx.Interface().([]byte)
r.Lock()
if r.flow != nil {
np := udp.Packet{Data: make([]byte, len(p))}
copy(np.Data, p)
r.unlockedInjectFlow(cm[x], cm[x], &np, true)
}
r.Unlock()
return p
}
r.routeUDP(cm[x], rx.Interface().(*udp.Packet))
}
}
// EnableFanIn switches RouteForAllUntilTxTun to the alloc-free fan-in path.
// One forwarder goroutine per registered control drains UDP TX into a shared channel that RouteForAllUntilTxTun selects
// on alongside the receiver's TUN TX channel.
func (r *R) EnableFanIn() {
r.fanInMu.Lock()
defer r.fanInMu.Unlock()
if r.fanInOn.Load() {
return
}
r.udpFanIn = make(chan fannedPacket, 32)
r.stopFanIn = make(chan struct{})
for _, c := range r.controls {
r.startFanInWorker(c)
}
r.fanInOn.Store(true)
r.t.Cleanup(r.stopFanInWorkers)
}
// startFanInWorker spawns a goroutine that drains c's UDP TX into r.udpFanIn.
func (r *R) startFanInWorker(c *nebula.Control) {
r.fanInWG.Add(1)
udpTx := c.GetUDPTxChan()
go func() {
defer r.fanInWG.Done()
for {
select {
case <-r.stopFanIn:
return
case p := <-udpTx:
select {
case <-r.stopFanIn:
p.Release()
return
case r.udpFanIn <- fannedPacket{from: c, pkt: p}:
}
}
}
}()
}
// stopFanInWorkers signals the fan-in goroutines to exit and waits for them.
func (r *R) stopFanInWorkers() {
r.fanInMu.Lock()
wasOn := r.fanInOn.Swap(false)
r.fanInMu.Unlock()
if !wasOn {
return
}
close(r.stopFanIn)
r.fanInWG.Wait()
}
// routeUDP forwards a UDP TX packet from the named source control to the destination control derived from p.To,
// releasing the source packet after InjectUDPPacket has copied its bytes into a fresh pool slot.
func (r *R) routeUDP(from *nebula.Control, p *udp.Packet) {
r.Lock()
defer r.Unlock()
a := from.GetUDPAddr()
c := r.getControl(a, p.To, p)
if c == nil {
panic(fmt.Sprintf("No control for udp tx %s", p.To))
}
fp := r.unlockedInjectFlow(from, c, p, false)
c.InjectUDPPacket(p) // copies internally; original is ours to release
fp.WasReceived()
p.Release()
}
// selectCasesFor returns the SelectCase array used by routeReflect: one slot for the receiver's TUN TX channel followed
// by one per control's UDP TX channel. Cached for the test lifetime, only rebuilt if the receiver changes.
func (r *R) selectCasesFor(receiver *nebula.Control) ([]reflect.SelectCase, []*nebula.Control) {
r.Lock()
defer r.Unlock()
if r.selRecvCtl == receiver && r.selCases != nil {
return r.selCases, r.selCtls
}
sc := make([]reflect.SelectCase, len(r.controls)+1) sc := make([]reflect.SelectCase, len(r.controls)+1)
cm := make([]*nebula.Control, len(r.controls)+1) cm := make([]*nebula.Control, len(r.controls)+1)
sc[0] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(receiver.GetTunTxChan())}
i := 0 cm[0] = receiver
sc[i] = reflect.SelectCase{ i := 1
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(receiver.GetTunTxChan()),
Send: reflect.Value{},
}
cm[i] = receiver
i++
for _, c := range r.controls { for _, c := range r.controls {
sc[i] = reflect.SelectCase{ sc[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.GetUDPTxChan())}
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c.GetUDPTxChan()),
Send: reflect.Value{},
}
cm[i] = c cm[i] = c
i++ i++
} }
r.selRecvCtl = receiver
for { r.selCases = sc
x, rx, _ := reflect.Select(sc) r.selCtls = cm
r.Lock() return sc, cm
if x == 0 {
// we are the tun tx, we can exit
p := rx.Interface().([]byte)
np := udp.Packet{Data: make([]byte, len(p))}
copy(np.Data, p)
r.unlockedInjectFlow(cm[x], cm[x], &np, true)
r.Unlock()
return p
} else {
// we are a udp tx, route and continue
p := rx.Interface().(*udp.Packet)
a := cm[x].GetUDPAddr()
c := r.getControl(a, p.To, p)
if c == nil {
r.Unlock()
panic(fmt.Sprintf("No control for udp tx %s", p.To))
}
fp := r.unlockedInjectFlow(cm[x], c, p, false)
c.InjectUDPPacket(p)
fp.WasReceived()
}
r.Unlock()
}
} }
// RouteExitFunc will call the whatDo func with each udp packet from sender. // RouteExitFunc will call the whatDo func with each udp packet from sender.
@@ -522,6 +649,7 @@ func (r *R) RouteExitFunc(sender *nebula.Control, whatDo ExitFunc) {
switch e { switch e {
case ExitNow: case ExitNow:
r.Unlock() r.Unlock()
p.Release()
return return
case RouteAndExit: case RouteAndExit:
@@ -529,6 +657,7 @@ func (r *R) RouteExitFunc(sender *nebula.Control, whatDo ExitFunc) {
receiver.InjectUDPPacket(p) receiver.InjectUDPPacket(p)
fp.WasReceived() fp.WasReceived()
r.Unlock() r.Unlock()
p.Release()
return return
case KeepRouting: case KeepRouting:
@@ -541,6 +670,7 @@ func (r *R) RouteExitFunc(sender *nebula.Control, whatDo ExitFunc) {
} }
r.Unlock() r.Unlock()
p.Release()
} }
} }
@@ -641,6 +771,7 @@ func (r *R) RouteForAllExitFunc(whatDo ExitFunc) {
switch e { switch e {
case ExitNow: case ExitNow:
r.Unlock() r.Unlock()
p.Release()
return return
case RouteAndExit: case RouteAndExit:
@@ -648,6 +779,7 @@ func (r *R) RouteForAllExitFunc(whatDo ExitFunc) {
receiver.InjectUDPPacket(p) receiver.InjectUDPPacket(p)
fp.WasReceived() fp.WasReceived()
r.Unlock() r.Unlock()
p.Release()
return return
case KeepRouting: case KeepRouting:
@@ -659,6 +791,7 @@ func (r *R) RouteForAllExitFunc(whatDo ExitFunc) {
panic(fmt.Sprintf("Unknown exitFunc return: %v", e)) panic(fmt.Sprintf("Unknown exitFunc return: %v", e))
} }
r.Unlock() r.Unlock()
p.Release()
} }
} }
@@ -702,19 +835,20 @@ func (r *R) FlushAll() {
} }
receiver.InjectUDPPacket(p) receiver.InjectUDPPacket(p)
r.Unlock() r.Unlock()
p.Release()
} }
} }
// getControl performs or seeds NAT translation and returns the control for toAddr, p from fields may change // getControl performs or seeds NAT translation and returns the control for toAddr, p from fields may change
// This is an internal router function, the caller must hold the lock // This is an internal router function, the caller must hold the lock
func (r *R) getControl(fromAddr, toAddr netip.AddrPort, p *udp.Packet) *nebula.Control { func (r *R) getControl(fromAddr, toAddr netip.AddrPort, p *udp.Packet) *nebula.Control {
if newAddr, ok := r.outNat[fromAddr.String()+":"+toAddr.String()]; ok { if newAddr, ok := r.outNat[outNatKey{from: fromAddr, to: toAddr}]; ok {
p.From = newAddr p.From = newAddr
} }
c, ok := r.inNat[toAddr] c, ok := r.inNat[toAddr]
if ok { if ok {
r.outNat[c.GetUDPAddr().String()+":"+fromAddr.String()] = toAddr r.outNat[outNatKey{from: c.GetUDPAddr(), to: fromAddr}] = toAddr
return c return c
} }

View File

@@ -355,14 +355,14 @@ func TestCrossStackRelaysWork(t *testing.T) {
theirControl.Start() theirControl.Start()
t.Log("Trigger a handshake from me to them via the relay") t.Log("Trigger a handshake from me to them via the relay")
myControl.InjectTunUDPPacket(theirVpnV6.Addr(), 80, myVpnV6.Addr(), 80, []byte("Hi from me")) myControl.InjectTunPacket(BuildTunUDPPacket(theirVpnV6.Addr(), 80, myVpnV6.Addr(), 80, []byte("Hi from me")))
p := r.RouteForAllUntilTxTun(theirControl) p := r.RouteForAllUntilTxTun(theirControl)
r.Log("Assert the tunnel works") r.Log("Assert the tunnel works")
assertUdpPacket(t, []byte("Hi from me"), p, myVpnV6.Addr(), theirVpnV6.Addr(), 80, 80) assertUdpPacket(t, []byte("Hi from me"), p, myVpnV6.Addr(), theirVpnV6.Addr(), 80, 80)
t.Log("reply?") t.Log("reply?")
theirControl.InjectTunUDPPacket(myVpnV6.Addr(), 80, theirVpnV6.Addr(), 80, []byte("Hi from them")) theirControl.InjectTunPacket(BuildTunUDPPacket(myVpnV6.Addr(), 80, theirVpnV6.Addr(), 80, []byte("Hi from them")))
p = r.RouteForAllUntilTxTun(myControl) p = r.RouteForAllUntilTxTun(myControl)
assertUdpPacket(t, []byte("Hi from them"), p, theirVpnV6.Addr(), myVpnV6.Addr(), 80, 80) assertUdpPacket(t, []byte("Hi from them"), p, theirVpnV6.Addr(), myVpnV6.Addr(), 80, 80)

View File

@@ -15,6 +15,7 @@ import (
"github.com/gaissmai/bart" "github.com/gaissmai/bart"
"github.com/slackhq/nebula/config" "github.com/slackhq/nebula/config"
"github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/routing"
"github.com/slackhq/nebula/udp"
) )
type TestTun struct { type TestTun struct {
@@ -54,9 +55,12 @@ func newTunFromFd(_ *config.C, _ *slog.Logger, _ int, _ []netip.Prefix) (*TestTu
return nil, fmt.Errorf("newTunFromFd not supported") return nil, fmt.Errorf("newTunFromFd not supported")
} }
// Send will place a byte array onto the receive queue for nebula to consume // Send will place a byte array onto the receive queue for nebula to consume.
// These are unencrypted ip layer frames destined for another nebula node. // These are unencrypted ip layer frames destined for another nebula node.
// packets should exit the udp side, capture them with udpConn.Get // packets should exit the udp side, capture them with udpConn.Get.
//
// Send copies the input via the freelist, so the caller is free to mutate
// or reuse it after the call returns.
func (t *TestTun) Send(packet []byte) { func (t *TestTun) Send(packet []byte) {
if t.closed.Load() { if t.closed.Load() {
return return
@@ -65,7 +69,9 @@ func (t *TestTun) Send(packet []byte) {
if t.l.Enabled(context.Background(), slog.LevelDebug) { if t.l.Enabled(context.Background(), slog.LevelDebug) {
t.l.Debug("Tun receiving injected packet", "dataLen", len(packet)) t.l.Debug("Tun receiving injected packet", "dataLen", len(packet))
} }
t.rxPackets <- packet buf := acquireTunBuf(len(packet))
copy(buf, packet)
t.rxPackets <- buf
} }
// Get will pull an unencrypted ip layer frame from the transmit queue // Get will pull an unencrypted ip layer frame from the transmit queue
@@ -110,12 +116,44 @@ func (t *TestTun) Write(b []byte) (n int, err error) {
return 0, io.ErrClosedPipe return 0, io.ErrClosedPipe
} }
packet := make([]byte, len(b), len(b)) packet := acquireTunBuf(len(b))
copy(packet, b) copy(packet, b)
t.TxPackets <- packet t.TxPackets <- packet
return len(b), nil return len(b), nil
} }
// ReleaseTunBuf returns a slice from TxPackets to the harness freelist, don't use the bytes after the call.
// Channel-backed instead of sync.Pool because putting a []byte in a sync.Pool escapes the slice header to heap.
func ReleaseTunBuf(b []byte) {
if b == nil {
return
}
select {
case tunBufFreelist <- b:
default:
// Freelist full; drop the buffer for the GC.
}
}
// tunBufFreelist retains the backing arrays for TestTun.Write so steady-state allocation drops to zero once the
// freelist has saturated for the current MTU.
var tunBufFreelist = make(chan []byte, 64)
func acquireTunBuf(n int) []byte {
var b []byte
select {
case b = <-tunBufFreelist:
default:
b = make([]byte, 0, udp.MTU)
}
if cap(b) < n {
b = make([]byte, n)
} else {
b = b[:n]
}
return b
}
func (t *TestTun) Close() error { func (t *TestTun) Close() error {
if t.closed.CompareAndSwap(false, true) { if t.closed.CompareAndSwap(false, true) {
close(t.rxPackets) close(t.rxPackets)
@@ -129,8 +167,14 @@ func (t *TestTun) Read(b []byte) (int, error) {
if !ok { if !ok {
return 0, os.ErrClosed return 0, os.ErrClosed
} }
n := len(p)
copy(b, p) copy(b, p)
return len(p), nil // Send always pushes a freelist-acquired slice, return it once we've copied the bytes into the caller's buffer.
select {
case tunBufFreelist <- p:
default:
}
return n, nil
} }
func (t *TestTun) SupportsMultiqueue() bool { func (t *TestTun) SupportsMultiqueue() bool {

View File

@@ -21,17 +21,48 @@ type Packet struct {
Data []byte Data []byte
} }
// Copy returns a fresh *Packet (from the freelist) with a duplicate Data buffer.
func (u *Packet) Copy() *Packet { func (u *Packet) Copy() *Packet {
n := &Packet{ n := acquirePacket()
To: u.To, n.To = u.To
From: u.From, n.From = u.From
Data: make([]byte, len(u.Data)), if cap(n.Data) < len(u.Data) {
n.Data = make([]byte, len(u.Data))
} else {
n.Data = n.Data[:len(u.Data)]
} }
copy(n.Data, u.Data) copy(n.Data, u.Data)
return n return n
} }
// Release returns p to the harness packet freelist.
// Callers that pull a *Packet from Get / TxPackets must Release when done.
// Channel-backed instead of sync.Pool because sync.Pool's per-P caches drain badly under cross-goroutine Get/Put,
// and putting a []byte in a Pool escapes the slice header to heap.
func (p *Packet) Release() {
if p == nil {
return
}
p.Data = p.Data[:0]
select {
case packetFreelist <- p:
default:
// Freelist full; drop the *Packet for the GC.
}
}
// packetFreelist retains *Packet structs (and their backing Data arrays) so steady-state allocation drops to zero.
var packetFreelist = make(chan *Packet, 64)
func acquirePacket() *Packet {
select {
case p := <-packetFreelist:
return p
default:
return &Packet{}
}
}
type TesterConn struct { type TesterConn struct {
Addr netip.AddrPort Addr netip.AddrPort
@@ -64,13 +95,15 @@ func NewListener(l *slog.Logger, ip netip.Addr, port int, _ bool, _ int) (Conn,
// this is an encrypted packet or a handshake message in most cases // this is an encrypted packet or a handshake message in most cases
// packets were transmitted from another nebula node, you can send them with Tun.Send // packets were transmitted from another nebula node, you can send them with Tun.Send
func (u *TesterConn) Send(packet *Packet) { func (u *TesterConn) Send(packet *Packet) {
h := &header.H{} if u.l.Enabled(context.Background(), slog.LevelDebug) {
// Parse the header only under debug logging, otherwise the
// allocation would show up in every Send call.
var h header.H
if err := h.Parse(packet.Data); err != nil { if err := h.Parse(packet.Data); err != nil {
panic(err) panic(err)
} }
if u.l.Enabled(context.Background(), slog.LevelDebug) {
u.l.Debug("UDP receiving injected packet", u.l.Debug("UDP receiving injected packet",
"header", h, "header", &h,
"udpAddr", packet.From, "udpAddr", packet.From,
"dataLen", len(packet.Data), "dataLen", len(packet.Data),
) )
@@ -107,15 +140,18 @@ func (u *TesterConn) Get(block bool) *Packet {
//********************************************************************************************************************// //********************************************************************************************************************//
func (u *TesterConn) WriteTo(b []byte, addr netip.AddrPort) error { func (u *TesterConn) WriteTo(b []byte, addr netip.AddrPort) error {
p := &Packet{ p := acquirePacket()
Data: make([]byte, len(b), len(b)), if cap(p.Data) < len(b) {
From: u.Addr, p.Data = make([]byte, len(b))
To: addr, } else {
p.Data = p.Data[:len(b)]
} }
copy(p.Data, b) copy(p.Data, b)
p.From = u.Addr
p.To = addr
select { select {
case <-u.done: case <-u.done:
p.Release()
return io.ErrClosedPipe return io.ErrClosedPipe
case u.TxPackets <- p: case u.TxPackets <- p:
return nil return nil
@@ -129,6 +165,7 @@ func (u *TesterConn) ListenOut(r EncReader) error {
return os.ErrClosed return os.ErrClosed
case p := <-u.RxPackets: case p := <-u.RxPackets:
r(p.From, p.Data) r(p.From, p.Data)
p.Release()
} }
} }
} }