diff --git a/Makefile b/Makefile index d8058af..188ffea 100644 --- a/Makefile +++ b/Makefile @@ -66,6 +66,9 @@ e2evvv: e2ev e2evvvv: TEST_ENV += TEST_LOGS=3 e2evvvv: e2ev +e2e-bench: TEST_FLAGS = -bench=. -benchmem -run=^$ +e2e-bench: e2e + all: $(ALL:%=build/%/nebula) $(ALL:%=build/%/nebula-cert) release: $(ALL:%=build/nebula-%.tar.gz) diff --git a/e2e/handshakes_test.go b/e2e/handshakes_test.go index 0535204..b92d7e0 100644 --- a/e2e/handshakes_test.go +++ b/e2e/handshakes_test.go @@ -16,6 +16,30 @@ import ( "github.com/stretchr/testify/assert" ) +func BenchmarkHotPath(b *testing.B) { + ca, _, caKey, _ := newTestCaCert(time.Now(), time.Now().Add(10*time.Minute), []*net.IPNet{}, []*net.IPNet{}, []string{}) + myControl, _, _ := newSimpleServer(ca, caKey, "me", net.IP{10, 0, 0, 1}, nil) + theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them", net.IP{10, 0, 0, 2}, nil) + + // Put their info in our lighthouse + myControl.InjectLightHouseAddr(theirVpnIp, theirUdpAddr) + + // Start the servers + myControl.Start() + theirControl.Start() + + r := router.NewR(b, myControl, theirControl) + r.CancelFlowLogs() + + for n := 0; n < b.N; n++ { + myControl.InjectTunUDPPacket(theirVpnIp, 80, 80, []byte("Hi from me")) + _ = r.RouteForAllUntilTxTun(theirControl) + } + + myControl.Stop() + theirControl.Stop() +} + func TestGoodHandshake(t *testing.T) { ca, _, caKey, _ := newTestCaCert(time.Now(), time.Now().Add(10*time.Minute), []*net.IPNet{}, []*net.IPNet{}, []string{}) myControl, myVpnIp, myUdpAddr := newSimpleServer(ca, caKey, "me", net.IP{10, 0, 0, 1}, nil) diff --git a/e2e/helpers_test.go b/e2e/helpers_test.go index 2edcd0c..a378bea 100644 --- a/e2e/helpers_test.go +++ b/e2e/helpers_test.go @@ -7,7 +7,6 @@ import ( "crypto/rand" "fmt" "io" - "io/ioutil" "net" "os" "testing" @@ -304,7 +303,8 @@ func NewTestLogger() *logrus.Logger { v := os.Getenv("TEST_LOGS") if v == "" { - l.SetOutput(ioutil.Discard) + l.SetOutput(io.Discard) + l.SetLevel(logrus.PanicLevel) return l } diff --git a/e2e/router/router.go b/e2e/router/router.go index f55034a..7b916a0 100644 --- a/e2e/router/router.go +++ b/e2e/router/router.go @@ -47,7 +47,7 @@ type R struct { fn string cancelRender context.CancelFunc - t *testing.T + t testing.TB } type flowEntry struct { @@ -63,6 +63,12 @@ type packet struct { rx bool // the packet was received by a udp device } +func (p *packet) WasReceived() { + if p != nil { + p.rx = true + } +} + type ExitType int const ( @@ -79,7 +85,7 @@ type ExitFunc func(packet *udp.Packet, receiver *nebula.Control) ExitType // NewR creates a new router to pass packets in a controlled fashion between the provided controllers. // The packet flow will be recorded in a file within the mermaid directory under the same name as the test. // Renders will occur automatically, roughly every 100ms, until a call to RenderFlow() is made -func NewR(t *testing.T, controls ...*nebula.Control) *R { +func NewR(t testing.TB, controls ...*nebula.Control) *R { ctx, cancel := context.WithCancel(context.Background()) if err := os.MkdirAll("mermaid", 0755); err != nil { @@ -91,6 +97,7 @@ func NewR(t *testing.T, controls ...*nebula.Control) *R { vpnControls: make(map[iputil.VpnIp]*nebula.Control), inNat: make(map[string]*nebula.Control), outNat: make(map[string]net.UDPAddr), + flow: []flowEntry{}, fn: filepath.Join("mermaid", fmt.Sprintf("%s.md", t.Name())), t: t, cancelRender: cancel, @@ -148,14 +155,24 @@ func (r *R) RenderFlow() { r.renderFlow() } +// CancelFlowLogs stops flow logs from being tracked and destroys any logs already collected +func (r *R) CancelFlowLogs() { + r.cancelRender() + r.flow = nil +} + func (r *R) renderFlow() { + if r.flow == nil { + return + } + f, err := os.OpenFile(r.fn, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0644) if err != nil { panic(err) } var participants = map[string]struct{}{} - var participansVals []string + var participantsVals []string fmt.Fprintln(f, "```mermaid") fmt.Fprintln(f, "sequenceDiagram") @@ -172,7 +189,7 @@ func (r *R) renderFlow() { } participants[addr] = struct{}{} sanAddr := strings.Replace(addr, ":", "#58;", 1) - participansVals = append(participansVals, sanAddr) + participantsVals = append(participantsVals, sanAddr) fmt.Fprintf( f, " participant %s as Nebula: %s
UDP: %s\n", sanAddr, e.packet.from.GetVpnIp(), sanAddr, @@ -183,7 +200,7 @@ func (r *R) renderFlow() { h := &header.H{} for _, e := range r.flow { if e.packet == nil { - fmt.Fprintf(f, " note over %s: %s\n", strings.Join(participansVals, ", "), e.note) + fmt.Fprintf(f, " note over %s: %s\n", strings.Join(participantsVals, ", "), e.note) continue } @@ -222,6 +239,10 @@ func (r *R) InjectFlow(from, to *nebula.Control, p *udp.Packet) { } func (r *R) Log(arg ...any) { + if r.flow == nil { + return + } + r.Lock() r.flow = append(r.flow, flowEntry{note: fmt.Sprint(arg...)}) r.t.Log(arg...) @@ -229,6 +250,10 @@ func (r *R) Log(arg ...any) { } func (r *R) Logf(format string, arg ...any) { + if r.flow == nil { + return + } + r.Lock() r.flow = append(r.flow, flowEntry{note: fmt.Sprintf(format, arg...)}) r.t.Logf(format, arg...) @@ -236,14 +261,20 @@ func (r *R) Logf(format string, arg ...any) { } // unlockedInjectFlow is used by the router to record a packet has been transmitted, the packet is returned and -// should be marked as received AFTER it has been placed on the receivers channel +// should be marked as received AFTER it has been placed on the receivers channel. +// If flow logs have been disabled this function will return nil func (r *R) unlockedInjectFlow(from, to *nebula.Control, p *udp.Packet, tun bool) *packet { + if r.flow == nil { + return nil + } + fp := &packet{ from: from, to: to, packet: p.Copy(), tun: tun, } + r.flow = append(r.flow, flowEntry{packet: fp}) return fp } @@ -285,7 +316,7 @@ func (r *R) RouteUntilTxTun(sender *nebula.Control, receiver *nebula.Control) [] } fp := r.unlockedInjectFlow(sender, c, p, false) c.InjectUDPPacket(p) - fp.rx = true + fp.WasReceived() r.Unlock() } } @@ -344,7 +375,7 @@ func (r *R) RouteForAllUntilTxTun(receiver *nebula.Control) []byte { } fp := r.unlockedInjectFlow(cm[x], c, p, false) c.InjectUDPPacket(p) - fp.rx = true + fp.WasReceived() } r.Unlock() } @@ -381,14 +412,14 @@ func (r *R) RouteExitFunc(sender *nebula.Control, whatDo ExitFunc) { case RouteAndExit: fp := r.unlockedInjectFlow(sender, receiver, p, false) receiver.InjectUDPPacket(p) - fp.rx = true + fp.WasReceived() r.Unlock() return case KeepRouting: fp := r.unlockedInjectFlow(sender, receiver, p, false) receiver.InjectUDPPacket(p) - fp.rx = true + fp.WasReceived() default: panic(fmt.Sprintf("Unknown exitFunc return: %v", e)) @@ -439,7 +470,7 @@ func (r *R) InjectUDPPacket(sender, receiver *nebula.Control, packet *udp.Packet fp := r.unlockedInjectFlow(sender, receiver, packet, false) receiver.InjectUDPPacket(packet) - fp.rx = true + fp.WasReceived() } // RouteForUntilAfterToAddr will route for sender and return only after it sees and sends a packet destined for toAddr @@ -503,14 +534,14 @@ func (r *R) RouteForAllExitFunc(whatDo ExitFunc) { case RouteAndExit: fp := r.unlockedInjectFlow(cm[x], receiver, p, false) receiver.InjectUDPPacket(p) - fp.rx = true + fp.WasReceived() r.Unlock() return case KeepRouting: fp := r.unlockedInjectFlow(cm[x], receiver, p, false) receiver.InjectUDPPacket(p) - fp.rx = true + fp.WasReceived() default: panic(fmt.Sprintf("Unknown exitFunc return: %v", e)) diff --git a/inside.go b/inside.go index c005e2e..53856f9 100644 --- a/inside.go +++ b/inside.go @@ -14,7 +14,9 @@ import ( func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet, nb, out []byte, q int, localCache firewall.ConntrackCache) { err := newPacket(packet, false, fwPacket) if err != nil { - f.l.WithField("packet", packet).Debugf("Error while validating outbound packet: %s", err) + if f.l.Level >= logrus.DebugLevel { + f.l.WithField("packet", packet).Debugf("Error while validating outbound packet: %s", err) + } return } diff --git a/overlay/tun_tester.go b/overlay/tun_tester.go index 8cfb103..a4ee20b 100644 --- a/overlay/tun_tester.go +++ b/overlay/tun_tester.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net" + "os" "github.com/sirupsen/logrus" "github.com/slackhq/nebula/cidr" @@ -49,7 +50,9 @@ func newTunFromFd(_ *logrus.Logger, _ int, _ *net.IPNet, _ int, _ []Route, _ int // These are unencrypted ip layer frames destined for another nebula node. // packets should exit the udp side, capture them with udpConn.Get func (t *TestTun) Send(packet []byte) { - t.l.WithField("dataLen", len(packet)).Info("Tun receiving injected packet") + if t.l.Level >= logrus.InfoLevel { + t.l.WithField("dataLen", len(packet)).Info("Tun receiving injected packet") + } t.rxPackets <- packet } @@ -107,7 +110,10 @@ func (t *TestTun) Close() error { } func (t *TestTun) Read(b []byte) (int, error) { - p := <-t.rxPackets + p, ok := <-t.rxPackets + if !ok { + return 0, os.ErrClosed + } copy(b, p) return len(p), nil } diff --git a/udp/udp_tester.go b/udp/udp_tester.go index 8ea88b0..55213b8 100644 --- a/udp/udp_tester.go +++ b/udp/udp_tester.go @@ -62,10 +62,12 @@ func (u *Conn) Send(packet *Packet) { if err := h.Parse(packet.Data); err != nil { panic(err) } - u.l.WithField("header", h). - WithField("udpAddr", fmt.Sprintf("%v:%v", packet.FromIp, packet.FromPort)). - WithField("dataLen", len(packet.Data)). - Info("UDP receiving injected packet") + if u.l.Level >= logrus.InfoLevel { + u.l.WithField("header", h). + WithField("udpAddr", fmt.Sprintf("%v:%v", packet.FromIp, packet.FromPort)). + WithField("dataLen", len(packet.Data)). + Info("UDP receiving injected packet") + } u.RxPackets <- packet } @@ -114,7 +116,10 @@ func (u *Conn) ListenOut(r EncReader, lhf LightHouseHandlerFunc, cache *firewall nb := make([]byte, 12, 12) for { - p := <-u.RxPackets + p, ok := <-u.RxPackets + if !ok { + return + } ua.Port = p.FromPort copy(ua.IP, p.FromIp.To16()) r(ua, nil, plaintext[:0], p.Data, h, fwPacket, lhf, nb, q, cache.Get(u.l))