//go:build e2e_testing // +build e2e_testing package router import ( "context" "fmt" "net/netip" "os" "path/filepath" "reflect" "regexp" "sort" "sync" "sync/atomic" "testing" "time" "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/slackhq/nebula" "github.com/slackhq/nebula/header" "github.com/slackhq/nebula/udp" "golang.org/x/exp/maps" ) // outNatKey is the (from, to) pair used by outNat. Comparable struct, so it works as a map key without the // allocation cost of a string-concat key. type outNatKey struct { from, to netip.AddrPort } // fannedPacket pairs a UDP TX packet with its source control so the router can route it after popping from // the fan-in channel. type fannedPacket struct { from *nebula.Control pkt *udp.Packet } type R struct { // Simple map of the ip:port registered on a control to the control // Basically a router, right? controls map[netip.AddrPort]*nebula.Control // A map for inbound packets for a control that doesn't know about this address inNat map[netip.AddrPort]*nebula.Control // A last used map, if an inbound packet hit the inNat map then // all return packets should use the same last used inbound address for the outbound sender outNat map[outNatKey]netip.AddrPort // A map of vpn ip to the nebula control it belongs to vpnControls map[netip.Addr]*nebula.Control // Cached select infrastructure for RouteForAllUntilTxTun. // The controls map is immutable after NewR so the cases are good for the test lifetime. // We only rebuild if a different receiver is asked. selRecvCtl *nebula.Control selCases []reflect.SelectCase selCtls []*nebula.Control // Optional fan-in mode for hot-path benchmarks: one forwarder goroutine per control drains UDP TX into udpFanIn, // so RouteForAllUntilTxTun can do a fixed 2-way native select instead of paying reflect.Select per call. // Off by default (would otherwise interleave with tests that use GetFromUDP directly on the same control). // Enabled by EnableFanIn. udpFanIn chan fannedPacket stopFanIn chan struct{} fanInWG sync.WaitGroup fanInMu sync.Mutex fanInOn atomic.Bool ignoreFlows []ignoreFlow flow []flowEntry // A set of additional mermaid graphs to draw in the flow log markdown file // Currently consisting only of hostmap renders additionalGraphs []mermaidGraph // All interactions are locked to help serialize behavior sync.Mutex fn string cancelRender context.CancelFunc t testing.TB } type ignoreFlow struct { tun NullBool messageType header.MessageType subType header.MessageSubType //from //to } type mermaidGraph struct { title string content string } type NullBool struct { HasValue bool IsTrue bool } type flowEntry struct { note string packet *packet } type packet struct { from *nebula.Control to *nebula.Control packet *udp.Packet tun bool // a packet pulled off a tun device rx bool // the packet was received by a udp device } func (p *packet) WasReceived() { if p != nil { p.rx = true } } type ExitType int const ( // KeepRouting the function will get called again on the next packet KeepRouting ExitType = 0 // ExitNow does not route this packet and exits immediately ExitNow ExitType = 1 // RouteAndExit routes this packet and exits immediately afterwards RouteAndExit ExitType = 2 ) type ExitFunc func(packet *udp.Packet, receiver *nebula.Control) ExitType // NewR creates a new router to pass packets in a controlled fashion between the provided controllers. // The packet flow will be recorded in a file within the mermaid directory under the same name as the test. // Renders will occur automatically, roughly every 100ms, until a call to RenderFlow() is made func NewR(t testing.TB, controls ...*nebula.Control) *R { ctx, cancel := context.WithCancel(context.Background()) if err := os.MkdirAll("mermaid", 0755); err != nil { panic(err) } r := &R{ controls: make(map[netip.AddrPort]*nebula.Control), vpnControls: make(map[netip.Addr]*nebula.Control), inNat: make(map[netip.AddrPort]*nebula.Control), outNat: make(map[outNatKey]netip.AddrPort), flow: []flowEntry{}, ignoreFlows: []ignoreFlow{}, fn: filepath.Join("mermaid", fmt.Sprintf("%s.md", t.Name())), t: t, cancelRender: cancel, } // Try to remove our render file os.Remove(r.fn) for _, c := range controls { addr := c.GetUDPAddr() if _, ok := r.controls[addr]; ok { panic("Duplicate listen address: " + addr.String()) } for _, vpnAddr := range c.GetVpnAddrs() { r.vpnControls[vpnAddr] = c } r.controls[addr] = c } // Spin the renderer in case we go nuts and the test never completes go func() { clockSource := time.NewTicker(time.Millisecond * 100) defer clockSource.Stop() for { select { case <-ctx.Done(): return case <-clockSource.C: r.Lock() r.renderHostmaps("clock tick") r.renderFlow() r.Unlock() } } }() return r } // AddRoute will place the nebula controller at the ip and port specified. // It does not look at the addr attached to the instance. // If a route is used, this will behave like a NAT for the return path. // Rewriting the source ip:port to what was last sent to from the origin func (r *R) AddRoute(ip netip.Addr, port uint16, c *nebula.Control) { r.Lock() defer r.Unlock() inAddr := netip.AddrPortFrom(ip, port) if _, ok := r.inNat[inAddr]; ok { panic("Duplicate listen address inNat: " + inAddr.String()) } r.inNat[inAddr] = c } // RenderFlow renders the packet flow seen up until now and stops further automatic renders from happening. func (r *R) RenderFlow() { r.cancelRender() r.Lock() defer r.Unlock() r.renderFlow() } // CancelFlowLogs stops flow logs from being tracked and destroys any logs already collected func (r *R) CancelFlowLogs() { r.cancelRender() r.Lock() r.flow = nil r.Unlock() } // renderFlow writes the flow log to disk. Caller must hold r.Lock. renderFlow reads r.flow / r.additionalGraphs and // the *packet pointers stashed inside, all of which are mutated under the same lock by routing paths. func (r *R) renderFlow() { if r.flow == nil { return } f, err := os.OpenFile(r.fn, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0644) if err != nil { panic(err) } var participants = map[netip.AddrPort]struct{}{} var participantsVals []string fmt.Fprintln(f, "```mermaid") fmt.Fprintln(f, "sequenceDiagram") // Assemble participants for _, e := range r.flow { if e.packet == nil { continue } addr := e.packet.from.GetUDPAddr() if _, ok := participants[addr]; ok { continue } participants[addr] = struct{}{} sanAddr := normalizeName(addr.String()) participantsVals = append(participantsVals, sanAddr) fmt.Fprintf( f, " participant %s as Nebula: %s
UDP: %s\n", sanAddr, e.packet.from.GetVpnAddrs(), sanAddr, ) } if len(participantsVals) > 2 { // Get the first and last participantVals for notes participantsVals = []string{participantsVals[0], participantsVals[len(participantsVals)-1]} } // Print packets h := &header.H{} for _, e := range r.flow { if e.packet == nil { //fmt.Fprintf(f, " note over %s: %s\n", strings.Join(participantsVals, ", "), e.note) continue } p := e.packet if p.tun { fmt.Fprintln(f, r.formatUdpPacket(p)) } else { if err := h.Parse(p.packet.Data); err != nil { panic(err) } line := "--x" if p.rx { line = "->>" } fmt.Fprintf(f, " %s%s%s: %s(%s), index %v, counter: %v\n", normalizeName(p.from.GetUDPAddr().String()), line, normalizeName(p.to.GetUDPAddr().String()), h.TypeName(), h.SubTypeName(), h.RemoteIndex, h.MessageCounter, ) } } fmt.Fprintln(f, "```") for _, g := range r.additionalGraphs { fmt.Fprintf(f, "## %s\n", g.title) fmt.Fprintln(f, "```mermaid") fmt.Fprintln(f, g.content) fmt.Fprintln(f, "```") } } func normalizeName(s string) string { rx := regexp.MustCompile("[\\[\\]\\:]") return rx.ReplaceAllLiteralString(s, "_") } // IgnoreFlow tells the router to stop recording future flows that matches the provided criteria. // messageType and subType will target nebula underlay packets while tun will target nebula overlay packets // NOTE: This is a very broad system, if you set tun to true then no more tun traffic will be rendered func (r *R) IgnoreFlow(messageType header.MessageType, subType header.MessageSubType, tun NullBool) { r.Lock() defer r.Unlock() r.ignoreFlows = append(r.ignoreFlows, ignoreFlow{ tun, messageType, subType, }) } func (r *R) RenderHostmaps(title string, controls ...*nebula.Control) { r.Lock() defer r.Unlock() s := renderHostmaps(controls...) if len(r.additionalGraphs) > 0 { lastGraph := r.additionalGraphs[len(r.additionalGraphs)-1] if lastGraph.content == s && lastGraph.title == title { // Ignore this rendering if it matches the last rendering added // This is useful if you want to track rendering changes return } } r.additionalGraphs = append(r.additionalGraphs, mermaidGraph{ title: title, content: s, }) } func (r *R) renderHostmaps(title string) { c := maps.Values(r.controls) sort.SliceStable(c, func(i, j int) bool { return c[i].GetVpnAddrs()[0].Compare(c[j].GetVpnAddrs()[0]) > 0 }) s := renderHostmaps(c...) if len(r.additionalGraphs) > 0 { lastGraph := r.additionalGraphs[len(r.additionalGraphs)-1] if lastGraph.content == s { // Ignore this rendering if it matches the last rendering added // This is useful if you want to track rendering changes return } } r.additionalGraphs = append(r.additionalGraphs, mermaidGraph{ title: title, content: s, }) } // InjectFlow can be used to record packet flow if the test is handling the routing on its own. // The packet is assumed to have been received func (r *R) InjectFlow(from, to *nebula.Control, p *udp.Packet) { r.Lock() defer r.Unlock() r.unlockedInjectFlow(from, to, p, false) } func (r *R) Log(arg ...any) { if r.flow == nil { return } r.Lock() r.flow = append(r.flow, flowEntry{note: fmt.Sprint(arg...)}) r.t.Log(arg...) r.Unlock() } func (r *R) Logf(format string, arg ...any) { if r.flow == nil { return } r.Lock() r.flow = append(r.flow, flowEntry{note: fmt.Sprintf(format, arg...)}) r.t.Logf(format, arg...) r.Unlock() } // unlockedInjectFlow is used by the router to record a packet has been transmitted, the packet is returned and // should be marked as received AFTER it has been placed on the receivers channel. // If flow logs have been disabled this function will return nil func (r *R) unlockedInjectFlow(from, to *nebula.Control, p *udp.Packet, tun bool) *packet { if r.flow == nil { return nil } r.renderHostmaps(fmt.Sprintf("Packet %v", len(r.flow))) if len(r.ignoreFlows) > 0 { var h header.H err := h.Parse(p.Data) if err != nil { panic(err) } for _, i := range r.ignoreFlows { if !tun { if i.messageType == h.Type && i.subType == h.Subtype { return nil } } else if i.tun.HasValue && i.tun.IsTrue { return nil } } } fp := &packet{ from: from, to: to, packet: p.Copy(), tun: tun, } r.flow = append(r.flow, flowEntry{packet: fp}) return fp } // OnceFrom will route a single packet from sender then return // If the router doesn't have the nebula controller for that address, we panic func (r *R) OnceFrom(sender *nebula.Control) { r.RouteExitFunc(sender, func(*udp.Packet, *nebula.Control) ExitType { return RouteAndExit }) } // RouteUntilTxTun will route for sender and return when a packet is seen on receivers tun // If the router doesn't have the nebula controller for that address, we panic func (r *R) RouteUntilTxTun(sender *nebula.Control, receiver *nebula.Control) []byte { tunTx := receiver.GetTunTxChan() udpTx := sender.GetUDPTxChan() for { select { // Maybe we already have something on the tun for us case b := <-tunTx: r.Lock() np := udp.Packet{Data: make([]byte, len(b))} copy(np.Data, b) r.unlockedInjectFlow(receiver, receiver, &np, true) r.Unlock() return b // Nope, lets push the sender along case p := <-udpTx: r.Lock() a := sender.GetUDPAddr() c := r.getControl(a, p.To, p) if c == nil { r.Unlock() panic("No control for udp tx " + a.String()) } fp := r.unlockedInjectFlow(sender, c, p, false) c.InjectUDPPacket(p) // copies internally; original is ours to release fp.WasReceived() r.Unlock() p.Release() } } } // RouteForAllUntilTxTun will route for everyone and return when a packet is seen on the receiver's tun. // If a control's UDP TX address can't be matched to a registered control, we panic. // // For allocation-sensitive callers (hot-path benchmarks, in particular relay // benches with 3+ controls), call EnableFanIn() first. func (r *R) RouteForAllUntilTxTun(receiver *nebula.Control) []byte { if r.fanInOn.Load() { return r.routeFanIn(receiver) } return r.routeReflect(receiver) } // routeFanIn is the alloc-free path used when EnableFanIn is in effect. func (r *R) routeFanIn(receiver *nebula.Control) []byte { tunTx := receiver.GetTunTxChan() for { select { case p := <-tunTx: r.Lock() if r.flow != nil { np := udp.Packet{Data: make([]byte, len(p))} copy(np.Data, p) r.unlockedInjectFlow(receiver, receiver, &np, true) } r.Unlock() return p case fp := <-r.udpFanIn: r.routeUDP(fp.from, fp.pkt) } } } // routeReflect is the default reflect.Select-based path. Pays the boxing allocation per call but doesn't interfere // with tests that pull packets directly from controls' UDP TX channels via GetFromUDP. func (r *R) routeReflect(receiver *nebula.Control) []byte { sc, cm := r.selectCasesFor(receiver) for { x, rx, _ := reflect.Select(sc) if x == 0 { p := rx.Interface().([]byte) r.Lock() if r.flow != nil { np := udp.Packet{Data: make([]byte, len(p))} copy(np.Data, p) r.unlockedInjectFlow(cm[x], cm[x], &np, true) } r.Unlock() return p } r.routeUDP(cm[x], rx.Interface().(*udp.Packet)) } } // EnableFanIn switches RouteForAllUntilTxTun to the alloc-free fan-in path. // One forwarder goroutine per registered control drains UDP TX into a shared channel that RouteForAllUntilTxTun selects // on alongside the receiver's TUN TX channel. func (r *R) EnableFanIn() { r.fanInMu.Lock() defer r.fanInMu.Unlock() if r.fanInOn.Load() { return } r.udpFanIn = make(chan fannedPacket, 32) r.stopFanIn = make(chan struct{}) for _, c := range r.controls { r.startFanInWorker(c) } r.fanInOn.Store(true) r.t.Cleanup(r.stopFanInWorkers) } // startFanInWorker spawns a goroutine that drains c's UDP TX into r.udpFanIn. func (r *R) startFanInWorker(c *nebula.Control) { r.fanInWG.Add(1) udpTx := c.GetUDPTxChan() go func() { defer r.fanInWG.Done() for { select { case <-r.stopFanIn: return case p := <-udpTx: select { case <-r.stopFanIn: p.Release() return case r.udpFanIn <- fannedPacket{from: c, pkt: p}: } } } }() } // stopFanInWorkers signals the fan-in goroutines to exit and waits for them. func (r *R) stopFanInWorkers() { r.fanInMu.Lock() wasOn := r.fanInOn.Swap(false) r.fanInMu.Unlock() if !wasOn { return } close(r.stopFanIn) r.fanInWG.Wait() } // routeUDP forwards a UDP TX packet from the named source control to the destination control derived from p.To, // releasing the source packet after InjectUDPPacket has copied its bytes into a fresh pool slot. func (r *R) routeUDP(from *nebula.Control, p *udp.Packet) { r.Lock() defer r.Unlock() a := from.GetUDPAddr() c := r.getControl(a, p.To, p) if c == nil { panic(fmt.Sprintf("No control for udp tx %s", p.To)) } fp := r.unlockedInjectFlow(from, c, p, false) c.InjectUDPPacket(p) // copies internally; original is ours to release fp.WasReceived() p.Release() } // selectCasesFor returns the SelectCase array used by routeReflect: one slot for the receiver's TUN TX channel followed // by one per control's UDP TX channel. Cached for the test lifetime, only rebuilt if the receiver changes. func (r *R) selectCasesFor(receiver *nebula.Control) ([]reflect.SelectCase, []*nebula.Control) { r.Lock() defer r.Unlock() if r.selRecvCtl == receiver && r.selCases != nil { return r.selCases, r.selCtls } sc := make([]reflect.SelectCase, len(r.controls)+1) cm := make([]*nebula.Control, len(r.controls)+1) sc[0] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(receiver.GetTunTxChan())} cm[0] = receiver i := 1 for _, c := range r.controls { sc[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.GetUDPTxChan())} cm[i] = c i++ } r.selRecvCtl = receiver r.selCases = sc r.selCtls = cm return sc, cm } // RouteExitFunc will call the whatDo func with each udp packet from sender. // whatDo can return: // - exitNow: the packet will not be routed and this call will return immediately // - routeAndExit: this call will return immediately after routing the last packet from sender // - keepRouting: the packet will be routed and whatDo will be called again on the next packet from sender func (r *R) RouteExitFunc(sender *nebula.Control, whatDo ExitFunc) { h := &header.H{} for { p := sender.GetFromUDP(true) r.Lock() if err := h.Parse(p.Data); err != nil { panic(err) } receiver := r.getControl(sender.GetUDPAddr(), p.To, p) if receiver == nil { r.Unlock() panic("Can't RouteExitFunc for host: " + p.To.String()) } e := whatDo(p, receiver) switch e { case ExitNow: r.Unlock() p.Release() return case RouteAndExit: fp := r.unlockedInjectFlow(sender, receiver, p, false) receiver.InjectUDPPacket(p) fp.WasReceived() r.Unlock() p.Release() return case KeepRouting: fp := r.unlockedInjectFlow(sender, receiver, p, false) receiver.InjectUDPPacket(p) fp.WasReceived() default: panic(fmt.Sprintf("Unknown exitFunc return: %v", e)) } r.Unlock() p.Release() } } // RouteUntilAfterMsgType will route for sender until a message type is seen and sent from sender // If the router doesn't have the nebula controller for that address, we panic func (r *R) RouteUntilAfterMsgType(sender *nebula.Control, msgType header.MessageType, subType header.MessageSubType) { h := &header.H{} r.RouteExitFunc(sender, func(p *udp.Packet, r *nebula.Control) ExitType { if err := h.Parse(p.Data); err != nil { panic(err) } if h.Type == msgType && h.Subtype == subType { return RouteAndExit } return KeepRouting }) } func (r *R) RouteForAllUntilAfterMsgTypeTo(receiver *nebula.Control, msgType header.MessageType, subType header.MessageSubType) { h := &header.H{} r.RouteForAllExitFunc(func(p *udp.Packet, r *nebula.Control) ExitType { if r != receiver { return KeepRouting } if err := h.Parse(p.Data); err != nil { panic(err) } if h.Type == msgType && h.Subtype == subType { return RouteAndExit } return KeepRouting }) } func (r *R) InjectUDPPacket(sender, receiver *nebula.Control, packet *udp.Packet) { r.Lock() defer r.Unlock() fp := r.unlockedInjectFlow(sender, receiver, packet, false) receiver.InjectUDPPacket(packet) fp.WasReceived() } // RouteForUntilAfterToAddr will route for sender and return only after it sees and sends a packet destined for toAddr // finish can be any of the exitType values except `keepRouting`, the default value is `routeAndExit` // If the router doesn't have the nebula controller for that address, we panic func (r *R) RouteForUntilAfterToAddr(sender *nebula.Control, toAddr netip.AddrPort, finish ExitType) { if finish == KeepRouting { finish = RouteAndExit } r.RouteExitFunc(sender, func(p *udp.Packet, r *nebula.Control) ExitType { if p.To == toAddr { return finish } return KeepRouting }) } // RouteForAllExitFunc will route for every registered controller and calls the whatDo func with each udp packet from // whatDo can return: // - exitNow: the packet will not be routed and this call will return immediately // - routeAndExit: this call will return immediately after routing the last packet from sender // - keepRouting: the packet will be routed and whatDo will be called again on the next packet from sender func (r *R) RouteForAllExitFunc(whatDo ExitFunc) { sc := make([]reflect.SelectCase, len(r.controls)) cm := make([]*nebula.Control, len(r.controls)) i := 0 for _, c := range r.controls { sc[i] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.GetUDPTxChan()), Send: reflect.Value{}, } cm[i] = c i++ } for { x, rx, _ := reflect.Select(sc) r.Lock() p := rx.Interface().(*udp.Packet) receiver := r.getControl(cm[x].GetUDPAddr(), p.To, p) if receiver == nil { r.Unlock() panic("Can't RouteForAllExitFunc for host: " + p.To.String()) } e := whatDo(p, receiver) switch e { case ExitNow: r.Unlock() p.Release() return case RouteAndExit: fp := r.unlockedInjectFlow(cm[x], receiver, p, false) receiver.InjectUDPPacket(p) fp.WasReceived() r.Unlock() p.Release() return case KeepRouting: fp := r.unlockedInjectFlow(cm[x], receiver, p, false) receiver.InjectUDPPacket(p) fp.WasReceived() default: panic(fmt.Sprintf("Unknown exitFunc return: %v", e)) } r.Unlock() p.Release() } } // FlushAll will route for every registered controller, exiting once there are no packets left to route func (r *R) FlushAll() { sc := make([]reflect.SelectCase, len(r.controls)) cm := make([]*nebula.Control, len(r.controls)) i := 0 for _, c := range r.controls { sc[i] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.GetUDPTxChan()), Send: reflect.Value{}, } cm[i] = c i++ } // Add a default case to exit when nothing is left to send sc = append(sc, reflect.SelectCase{ Dir: reflect.SelectDefault, Chan: reflect.Value{}, Send: reflect.Value{}, }) for { x, rx, ok := reflect.Select(sc) if !ok { return } r.Lock() p := rx.Interface().(*udp.Packet) receiver := r.getControl(cm[x].GetUDPAddr(), p.To, p) if receiver == nil { r.Unlock() panic("Can't FlushAll for host: " + p.To.String()) } receiver.InjectUDPPacket(p) r.Unlock() p.Release() } } // getControl performs or seeds NAT translation and returns the control for toAddr, p from fields may change // This is an internal router function, the caller must hold the lock func (r *R) getControl(fromAddr, toAddr netip.AddrPort, p *udp.Packet) *nebula.Control { if newAddr, ok := r.outNat[outNatKey{from: fromAddr, to: toAddr}]; ok { p.From = newAddr } c, ok := r.inNat[toAddr] if ok { r.outNat[outNatKey{from: c.GetUDPAddr(), to: fromAddr}] = toAddr return c } return r.controls[toAddr] } func (r *R) formatUdpPacket(p *packet) string { var packet gopacket.Packet var srcAddr netip.Addr packet = gopacket.NewPacket(p.packet.Data, layers.LayerTypeIPv6, gopacket.Lazy) if packet.ErrorLayer() == nil { v6 := packet.Layer(layers.LayerTypeIPv6).(*layers.IPv6) if v6 == nil { panic("not an ipv6 packet") } srcAddr, _ = netip.AddrFromSlice(v6.SrcIP) } else { packet = gopacket.NewPacket(p.packet.Data, layers.LayerTypeIPv4, gopacket.Lazy) v6 := packet.Layer(layers.LayerTypeIPv4).(*layers.IPv4) if v6 == nil { panic("not an ipv6 packet") } srcAddr, _ = netip.AddrFromSlice(v6.SrcIP) } from := "unknown" if c, ok := r.vpnControls[srcAddr]; ok { from = c.GetUDPAddr().String() } udpLayer := packet.Layer(layers.LayerTypeUDP).(*layers.UDP) if udpLayer == nil { panic("not a udp packet") } data := packet.ApplicationLayer() return fmt.Sprintf( " %s-->>%s: src port: %v
dest port: %v
data: \"%v\"\n", normalizeName(from), normalizeName(p.to.GetUDPAddr().String()), udpLayer.SrcPort, udpLayer.DstPort, string(data.Payload()), ) }