Compare commits

..

7 Commits

Author SHA1 Message Date
Wade Simmons
03ab9a1208 update synctrace 2025-06-12 15:22:54 -04:00
Wade Simmons
4258c1388c Merge remote-tracking branch 'origin/master' into synctrace 2025-06-12 13:50:39 -04:00
brad-defined
442a52879b Fix off by one error in IPv6 packet parser (#1419)
Some checks failed
gofmt / Run gofmt (push) Successful in 39s
smoke-extra / Run extra smoke tests (push) Failing after 21s
smoke / Run multi node smoke test (push) Failing after 1m21s
Build and test / Build all and test on ubuntu-linux (push) Failing after 18m35s
Build and test / Build and test on linux with boringcrypto (push) Failing after 2m36s
Build and test / Build and test on linux with pkcs11 (push) Failing after 2m28s
Build and test / Build and test on macos-latest (push) Has been cancelled
Build and test / Build and test on windows-latest (push) Has been cancelled
2025-06-11 15:15:15 -04:00
Wade Simmons
2a2b6424ed add new locks added to master 2025-04-02 11:57:02 -04:00
Wade Simmons
f896e2a863 Merge remote-tracking branch 'origin/master' into synctrace 2025-04-02 11:00:53 -04:00
Wade Simmons
4db6049684 update 2025-04-02 10:53:51 -04:00
Wade Simmons
8f1dc12618 synctrace WIP 2024-05-29 12:54:38 -04:00
18 changed files with 135 additions and 240 deletions

View File

@@ -61,7 +61,7 @@ ALL = $(ALL_LINUX) \
windows-arm64 windows-arm64
e2e: e2e:
$(TEST_ENV) go test -tags=e2e_testing -count=1 $(TEST_FLAGS) ./e2e $(TEST_ENV) go test -tags=synctrace,e2e_testing -count=1 $(TEST_FLAGS) ./e2e
e2ev: TEST_FLAGS += -v e2ev: TEST_FLAGS += -v
e2ev: e2e e2ev: e2e
@@ -215,6 +215,7 @@ ifeq ($(words $(MAKECMDGOALS)),1)
@$(MAKE) service ${.DEFAULT_GOAL} --no-print-directory @$(MAKE) service ${.DEFAULT_GOAL} --no-print-directory
endif endif
bin-docker: BUILD_ARGS = -tags=synctrace
bin-docker: bin build/linux-amd64/nebula build/linux-amd64/nebula-cert bin-docker: bin build/linux-amd64/nebula build/linux-amd64/nebula-cert
smoke-docker: bin-docker smoke-docker: bin-docker

View File

@@ -11,12 +11,12 @@ import (
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"sync"
"syscall" "syscall"
"time" "time"
"dario.cat/mergo" "dario.cat/mergo"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/wadey/synctrace"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
) )
@@ -27,13 +27,14 @@ type C struct {
oldSettings map[string]any oldSettings map[string]any
callbacks []func(*C) callbacks []func(*C)
l *logrus.Logger l *logrus.Logger
reloadLock sync.Mutex reloadLock synctrace.Mutex
} }
func NewC(l *logrus.Logger) *C { func NewC(l *logrus.Logger) *C {
return &C{ return &C{
Settings: make(map[string]any), Settings: make(map[string]any),
l: l, l: l,
reloadLock: synctrace.NewMutex("config-reload"),
} }
} }

View File

@@ -5,14 +5,13 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"net/netip" "net/netip"
"sync"
"time" "time"
"github.com/rcrowley/go-metrics" "github.com/rcrowley/go-metrics"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/slackhq/nebula/cert" "github.com/slackhq/nebula/cert"
"github.com/slackhq/nebula/config"
"github.com/slackhq/nebula/header" "github.com/slackhq/nebula/header"
"github.com/wadey/synctrace"
) )
type trafficDecision int type trafficDecision int
@@ -27,28 +26,16 @@ const (
sendTestPacket trafficDecision = 6 sendTestPacket trafficDecision = 6
) )
// LastCommunication tracks when we last communicated with a host
type LastCommunication struct {
timestamp time.Time
vpnIp netip.Addr // To help with logging
}
type connectionManager struct { type connectionManager struct {
in map[uint32]struct{} in map[uint32]struct{}
inLock *sync.RWMutex inLock synctrace.RWMutex
out map[uint32]struct{} out map[uint32]struct{}
outLock *sync.RWMutex outLock synctrace.RWMutex
// relayUsed holds which relay localIndexs are in use // relayUsed holds which relay localIndexs are in use
relayUsed map[uint32]struct{} relayUsed map[uint32]struct{}
relayUsedLock *sync.RWMutex relayUsedLock synctrace.RWMutex
// Track last communication with hosts
lastCommMap map[uint32]time.Time
lastCommLock *sync.RWMutex
inactivityTimer *LockingTimerWheel[uint32]
inactivityTimeout time.Duration
hostMap *HostMap hostMap *HostMap
trafficTimer *LockingTimerWheel[uint32] trafficTimer *LockingTimerWheel[uint32]
@@ -73,15 +60,12 @@ func newConnectionManager(ctx context.Context, l *logrus.Logger, intf *Interface
nc := &connectionManager{ nc := &connectionManager{
hostMap: intf.hostMap, hostMap: intf.hostMap,
in: make(map[uint32]struct{}), in: make(map[uint32]struct{}),
inLock: &sync.RWMutex{}, inLock: synctrace.NewRWMutex("connection-manager-in"),
out: make(map[uint32]struct{}), out: make(map[uint32]struct{}),
outLock: &sync.RWMutex{}, outLock: synctrace.NewRWMutex("connection-manager-out"),
relayUsed: make(map[uint32]struct{}), relayUsed: make(map[uint32]struct{}),
relayUsedLock: &sync.RWMutex{}, relayUsedLock: synctrace.NewRWMutex("connection-manager-relay-used"),
lastCommMap: make(map[uint32]time.Time), trafficTimer: NewLockingTimerWheel[uint32]("traffic-timer", time.Millisecond*500, max),
lastCommLock: &sync.RWMutex{},
inactivityTimeout: 1 * time.Minute, // Default inactivity timeout: 10 minutes
trafficTimer: NewLockingTimerWheel[uint32](time.Millisecond*500, max),
intf: intf, intf: intf,
pendingDeletion: make(map[uint32]struct{}), pendingDeletion: make(map[uint32]struct{}),
checkInterval: checkInterval, checkInterval: checkInterval,
@@ -91,31 +75,10 @@ func newConnectionManager(ctx context.Context, l *logrus.Logger, intf *Interface
l: l, l: l,
} }
// Initialize the inactivity timer wheel - make wheel duration slightly longer than the timeout
nc.inactivityTimer = NewLockingTimerWheel[uint32](time.Minute, nc.inactivityTimeout+time.Minute)
nc.Start(ctx) nc.Start(ctx)
return nc return nc
} }
func (n *connectionManager) updateLastCommunication(localIndex uint32) {
// Get host info to record VPN IP for better logging
hostInfo := n.hostMap.QueryIndex(localIndex)
if hostInfo == nil {
return
}
now := time.Now()
n.lastCommLock.Lock()
n.lastCommMap[localIndex] = now
n.lastCommLock.Unlock()
// Reset the inactivity timer for this host
n.inactivityTimer.m.Lock()
n.inactivityTimer.t.Add(localIndex, n.inactivityTimeout)
n.inactivityTimer.m.Unlock()
}
func (n *connectionManager) In(localIndex uint32) { func (n *connectionManager) In(localIndex uint32) {
n.inLock.RLock() n.inLock.RLock()
// If this already exists, return // If this already exists, return
@@ -127,9 +90,6 @@ func (n *connectionManager) In(localIndex uint32) {
n.inLock.Lock() n.inLock.Lock()
n.in[localIndex] = struct{}{} n.in[localIndex] = struct{}{}
n.inLock.Unlock() n.inLock.Unlock()
// Update last communication time
n.updateLastCommunication(localIndex)
} }
func (n *connectionManager) Out(localIndex uint32) { func (n *connectionManager) Out(localIndex uint32) {
@@ -143,9 +103,6 @@ func (n *connectionManager) Out(localIndex uint32) {
n.outLock.Lock() n.outLock.Lock()
n.out[localIndex] = struct{}{} n.out[localIndex] = struct{}{}
n.outLock.Unlock() n.outLock.Unlock()
// Update last communication time
n.updateLastCommunication(localIndex)
} }
func (n *connectionManager) RelayUsed(localIndex uint32) { func (n *connectionManager) RelayUsed(localIndex uint32) {
@@ -187,134 +144,6 @@ func (n *connectionManager) AddTrafficWatch(localIndex uint32) {
n.outLock.Unlock() n.outLock.Unlock()
} }
// checkInactiveTunnels checks for tunnels that have been inactive for too long and drops them
func (n *connectionManager) checkInactiveTunnels() {
now := time.Now()
// First, advance the timer wheel to the current time
n.inactivityTimer.m.Lock()
n.inactivityTimer.t.Advance(now)
n.inactivityTimer.m.Unlock()
// Check for expired timers (inactive connections)
for {
// Get the next expired tunnel
n.inactivityTimer.m.Lock()
localIndex, ok := n.inactivityTimer.t.Purge()
n.inactivityTimer.m.Unlock()
if !ok {
// No more expired timers
break
}
n.lastCommLock.RLock()
lastComm, exists := n.lastCommMap[localIndex]
n.lastCommLock.RUnlock()
if !exists {
// No last communication record, odd but skip
continue
}
// Calculate inactivity duration
inactiveDuration := now.Sub(lastComm)
// Check if we've exceeded the inactivity timeout
if inactiveDuration >= n.inactivityTimeout {
// Get the host info (if it still exists)
hostInfo := n.hostMap.QueryIndex(localIndex)
if hostInfo == nil {
// Host info is gone, remove from our tracking map
n.lastCommLock.Lock()
delete(n.lastCommMap, localIndex)
n.lastCommLock.Unlock()
continue
}
// Log the inactivity and drop the tunnel
n.l.WithField("vpnIp", hostInfo.vpnAddrs[0]).
WithField("localIndex", localIndex).
WithField("inactiveDuration", inactiveDuration).
WithField("timeout", n.inactivityTimeout).
Info("Dropping tunnel due to inactivity")
// Close the tunnel using the existing mechanism
n.intf.closeTunnel(hostInfo)
// Clean up our tracking map
n.lastCommLock.Lock()
delete(n.lastCommMap, localIndex)
n.lastCommLock.Unlock()
} else {
// Re-add to the timer wheel with the remaining time
remainingTime := n.inactivityTimeout - inactiveDuration
n.inactivityTimer.m.Lock()
n.inactivityTimer.t.Add(localIndex, remainingTime)
n.inactivityTimer.m.Unlock()
}
}
}
// CleanupDeletedHostInfos removes entries from our lastCommMap for hosts that no longer exist
func (n *connectionManager) CleanupDeletedHostInfos() {
n.lastCommLock.Lock()
defer n.lastCommLock.Unlock()
// Find indexes to delete
var toDelete []uint32
for localIndex := range n.lastCommMap {
if n.hostMap.QueryIndex(localIndex) == nil {
toDelete = append(toDelete, localIndex)
}
}
// Delete them
for _, localIndex := range toDelete {
delete(n.lastCommMap, localIndex)
}
if len(toDelete) > 0 && n.l.Level >= logrus.DebugLevel {
n.l.WithField("count", len(toDelete)).Debug("Cleaned up deleted host entries from lastCommMap")
}
}
// ReloadConfig updates the connection manager configuration
func (n *connectionManager) ReloadConfig(c *config.C) {
// Get the inactivity timeout from config
inactivityTimeout := c.GetDuration("timers.inactivity_timeout", 10*time.Minute)
// Only update if different
if inactivityTimeout != n.inactivityTimeout {
n.l.WithField("old", n.inactivityTimeout).
WithField("new", inactivityTimeout).
Info("Updating inactivity timeout")
n.inactivityTimeout = inactivityTimeout
// Recreate the inactivity timer wheel with the new timeout
n.inactivityTimer = NewLockingTimerWheel[uint32](time.Minute, n.inactivityTimeout+time.Minute)
// Re-add all existing hosts to the new timer wheel
n.lastCommLock.RLock()
for localIndex, lastComm := range n.lastCommMap {
// Calculate remaining time based on last communication
now := time.Now()
elapsed := now.Sub(lastComm)
// If the elapsed time exceeds the new timeout, this will be caught
// in the next inactivity check. Otherwise, add with remaining time.
if elapsed < n.inactivityTimeout {
remainingTime := n.inactivityTimeout - elapsed
n.inactivityTimer.m.Lock()
n.inactivityTimer.t.Add(localIndex, remainingTime)
n.inactivityTimer.m.Unlock()
}
}
n.lastCommLock.RUnlock()
}
}
func (n *connectionManager) Start(ctx context.Context) { func (n *connectionManager) Start(ctx context.Context) {
go n.Run(ctx) go n.Run(ctx)
} }
@@ -324,14 +153,6 @@ func (n *connectionManager) Run(ctx context.Context) {
clockSource := time.NewTicker(500 * time.Millisecond) clockSource := time.NewTicker(500 * time.Millisecond)
defer clockSource.Stop() defer clockSource.Stop()
// Create ticker for inactivity checks (every minute)
inactivityTicker := time.NewTicker(time.Minute)
defer inactivityTicker.Stop()
// Create ticker for cleanup (every 5 minutes)
cleanupTicker := time.NewTicker(5 * time.Minute)
defer cleanupTicker.Stop()
p := []byte("") p := []byte("")
nb := make([]byte, 12, 12) nb := make([]byte, 12, 12)
out := make([]byte, mtu) out := make([]byte, mtu)
@@ -351,14 +172,6 @@ func (n *connectionManager) Run(ctx context.Context) {
n.doTrafficCheck(localIndex, p, nb, out, now) n.doTrafficCheck(localIndex, p, nb, out, now)
} }
case <-inactivityTicker.C:
// Check for inactive tunnels
n.checkInactiveTunnels()
case <-cleanupTicker.C:
// Periodically clean up deleted hosts
n.CleanupDeletedHostInfos()
} }
} }
} }
@@ -672,12 +485,12 @@ func (n *connectionManager) sendPunch(hostinfo *HostInfo) {
if n.punchy.GetTargetEverything() { if n.punchy.GetTargetEverything() {
hostinfo.remotes.ForEach(n.hostMap.GetPreferredRanges(), func(addr netip.AddrPort, preferred bool) { hostinfo.remotes.ForEach(n.hostMap.GetPreferredRanges(), func(addr netip.AddrPort, preferred bool) {
n.metricsTxPunchy.Inc(1) n.metricsTxPunchy.Inc(1)
_ = n.intf.outside.WriteTo([]byte{1}, addr) n.intf.outside.WriteTo([]byte{1}, addr)
}) })
} else if hostinfo.remote.IsValid() { } else if hostinfo.remote.IsValid() {
n.metricsTxPunchy.Inc(1) n.metricsTxPunchy.Inc(1)
_ = n.intf.outside.WriteTo([]byte{1}, hostinfo.remote) n.intf.outside.WriteTo([]byte{1}, hostinfo.remote)
} }
} }

View File

@@ -4,13 +4,13 @@ import (
"crypto/rand" "crypto/rand"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync"
"sync/atomic" "sync/atomic"
"github.com/flynn/noise" "github.com/flynn/noise"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/slackhq/nebula/cert" "github.com/slackhq/nebula/cert"
"github.com/slackhq/nebula/noiseutil" "github.com/slackhq/nebula/noiseutil"
"github.com/wadey/synctrace"
) )
const ReplayWindow = 1024 const ReplayWindow = 1024
@@ -24,7 +24,7 @@ type ConnectionState struct {
initiator bool initiator bool
messageCounter atomic.Uint64 messageCounter atomic.Uint64
window *Bits window *Bits
writeLock sync.Mutex writeLock synctrace.Mutex
} }
func NewConnectionState(l *logrus.Logger, cs *CertState, crt cert.Certificate, initiator bool, pattern noise.HandshakePattern) (*ConnectionState, error) { func NewConnectionState(l *logrus.Logger, cs *CertState, crt cert.Certificate, initiator bool, pattern noise.HandshakePattern) (*ConnectionState, error) {
@@ -76,6 +76,7 @@ func NewConnectionState(l *logrus.Logger, cs *CertState, crt cert.Certificate, i
initiator: initiator, initiator: initiator,
window: b, window: b,
myCert: crt, myCert: crt,
writeLock: synctrace.NewMutex("connection-state"),
} }
// always start the counter from 2, as packet 1 and packet 2 are handshake packets. // always start the counter from 2, as packet 1 and packet 2 are handshake packets.
ci.messageCounter.Add(2) ci.messageCounter.Add(2)

View File

@@ -6,12 +6,12 @@ import (
"net/netip" "net/netip"
"strconv" "strconv"
"strings" "strings"
"sync"
"github.com/gaissmai/bart" "github.com/gaissmai/bart"
"github.com/miekg/dns" "github.com/miekg/dns"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/slackhq/nebula/config" "github.com/slackhq/nebula/config"
"github.com/wadey/synctrace"
) )
// This whole thing should be rewritten to use context // This whole thing should be rewritten to use context
@@ -21,7 +21,7 @@ var dnsServer *dns.Server
var dnsAddr string var dnsAddr string
type dnsRecords struct { type dnsRecords struct {
sync.RWMutex synctrace.RWMutex
l *logrus.Logger l *logrus.Logger
dnsMap4 map[string]netip.Addr dnsMap4 map[string]netip.Addr
dnsMap6 map[string]netip.Addr dnsMap6 map[string]netip.Addr
@@ -31,6 +31,7 @@ type dnsRecords struct {
func newDnsRecords(l *logrus.Logger, cs *CertState, hostMap *HostMap) *dnsRecords { func newDnsRecords(l *logrus.Logger, cs *CertState, hostMap *HostMap) *dnsRecords {
return &dnsRecords{ return &dnsRecords{
RWMutex: synctrace.NewRWMutex("dns-records"),
l: l, l: l,
dnsMap4: make(map[string]netip.Addr), dnsMap4: make(map[string]netip.Addr),
dnsMap6: make(map[string]netip.Addr), dnsMap6: make(map[string]netip.Addr),

View File

@@ -10,7 +10,6 @@ import (
"reflect" "reflect"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/gaissmai/bart" "github.com/gaissmai/bart"
@@ -19,6 +18,7 @@ import (
"github.com/slackhq/nebula/cert" "github.com/slackhq/nebula/cert"
"github.com/slackhq/nebula/config" "github.com/slackhq/nebula/config"
"github.com/slackhq/nebula/firewall" "github.com/slackhq/nebula/firewall"
"github.com/wadey/synctrace"
) )
type FirewallInterface interface { type FirewallInterface interface {
@@ -76,7 +76,7 @@ type firewallMetrics struct {
} }
type FirewallConntrack struct { type FirewallConntrack struct {
sync.Mutex synctrace.Mutex
Conns map[firewall.Packet]*conn Conns map[firewall.Packet]*conn
TimerWheel *TimerWheel[firewall.Packet] TimerWheel *TimerWheel[firewall.Packet]
@@ -164,6 +164,7 @@ func NewFirewall(l *logrus.Logger, tcpTimeout, UDPTimeout, defaultTimeout time.D
return &Firewall{ return &Firewall{
Conntrack: &FirewallConntrack{ Conntrack: &FirewallConntrack{
Mutex: synctrace.NewMutex("firewall-conntrack"),
Conns: make(map[firewall.Packet]*conn), Conns: make(map[firewall.Packet]*conn),
TimerWheel: NewTimerWheel[firewall.Packet](tmin, tmax), TimerWheel: NewTimerWheel[firewall.Packet](tmin, tmax),
}, },

5
go.mod
View File

@@ -24,6 +24,7 @@ require (
github.com/stefanberger/go-pkcs11uri v0.0.0-20230803200340-78284954bff6 github.com/stefanberger/go-pkcs11uri v0.0.0-20230803200340-78284954bff6
github.com/stretchr/testify v1.10.0 github.com/stretchr/testify v1.10.0
github.com/vishvananda/netlink v1.3.0 github.com/vishvananda/netlink v1.3.0
github.com/wadey/synctrace v0.0.0-20250612192159-94547ef50dfe
golang.org/x/crypto v0.37.0 golang.org/x/crypto v0.37.0
golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 golang.org/x/exp v0.0.0-20230725093048-515e97ebf090
golang.org/x/net v0.39.0 golang.org/x/net v0.39.0
@@ -42,12 +43,16 @@ require (
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/google/btree v1.1.2 // indirect github.com/google/btree v1.1.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/heimdalr/dag v1.4.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect github.com/prometheus/procfs v0.15.1 // indirect
github.com/timandy/routine v1.1.5 // indirect
github.com/vishvananda/netns v0.0.4 // indirect github.com/vishvananda/netns v0.0.4 // indirect
golang.org/x/mod v0.23.0 // indirect golang.org/x/mod v0.23.0 // indirect
golang.org/x/time v0.5.0 // indirect golang.org/x/time v0.5.0 // indirect

12
go.sum
View File

@@ -22,6 +22,8 @@ github.com/cyberdelia/go-metrics-graphite v0.0.0-20161219230853-39f87cc3b432/go.
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg= github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg=
github.com/flynn/noise v1.1.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/flynn/noise v1.1.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
github.com/gaissmai/bart v0.20.4 h1:Ik47r1fy3jRVU+1eYzKSW3ho2UgBVTVnUS8O993584U= github.com/gaissmai/bart v0.20.4 h1:Ik47r1fy3jRVU+1eYzKSW3ho2UgBVTVnUS8O993584U=
@@ -33,6 +35,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg=
github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
@@ -58,6 +62,10 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/heimdalr/dag v1.4.0 h1:zG3JA4RDVLc55k3AXAgfwa+EgBNZ0TkfOO3C29Ucpmg=
github.com/heimdalr/dag v1.4.0/go.mod h1:OCh6ghKmU0hPjtwMqWBoNxPmtRioKd1xSu7Zs4sbIqM=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
@@ -145,10 +153,14 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/timandy/routine v1.1.5 h1:LSpm7Iijwb9imIPlucl4krpr2EeCeAUvifiQ9Uf5X+M=
github.com/timandy/routine v1.1.5/go.mod h1:kXslgIosdY8LW0byTyPnenDgn4/azt2euufAq9rK51w=
github.com/vishvananda/netlink v1.3.0 h1:X7l42GfcV4S6E4vHTsw48qbrV+9PVojNfIhZcwQdrZk= github.com/vishvananda/netlink v1.3.0 h1:X7l42GfcV4S6E4vHTsw48qbrV+9PVojNfIhZcwQdrZk=
github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs= github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs=
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
github.com/wadey/synctrace v0.0.0-20250612192159-94547ef50dfe h1:dc8Q42VsX+ABr0drJw27f3smvGfcz7eB8rJx+IkVMAo=
github.com/wadey/synctrace v0.0.0-20250612192159-94547ef50dfe/go.mod h1:F2VCml4UxGPgAAqqm9T0ZfnVRWITrQS1EMZM+KCAm/Q=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=

View File

@@ -9,6 +9,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/slackhq/nebula/cert" "github.com/slackhq/nebula/cert"
"github.com/slackhq/nebula/header" "github.com/slackhq/nebula/header"
"github.com/wadey/synctrace"
) )
// NOISE IX Handshakes // NOISE IX Handshakes
@@ -249,6 +250,7 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
HandshakePacket: make(map[uint8][]byte, 0), HandshakePacket: make(map[uint8][]byte, 0),
lastHandshakeTime: hs.Details.Time, lastHandshakeTime: hs.Details.Time,
relayState: RelayState{ relayState: RelayState{
RWMutex: synctrace.NewRWMutex("relay-state"),
relays: map[netip.Addr]struct{}{}, relays: map[netip.Addr]struct{}{},
relayForByAddr: map[netip.Addr]*Relay{}, relayForByAddr: map[netip.Addr]*Relay{},
relayForByIdx: map[uint32]*Relay{}, relayForByIdx: map[uint32]*Relay{},

View File

@@ -8,7 +8,6 @@ import (
"errors" "errors"
"net/netip" "net/netip"
"slices" "slices"
"sync"
"time" "time"
"github.com/rcrowley/go-metrics" "github.com/rcrowley/go-metrics"
@@ -16,6 +15,7 @@ import (
"github.com/slackhq/nebula/cert" "github.com/slackhq/nebula/cert"
"github.com/slackhq/nebula/header" "github.com/slackhq/nebula/header"
"github.com/slackhq/nebula/udp" "github.com/slackhq/nebula/udp"
"github.com/wadey/synctrace"
) )
const ( const (
@@ -45,7 +45,7 @@ type HandshakeConfig struct {
type HandshakeManager struct { type HandshakeManager struct {
// Mutex for interacting with the vpnIps and indexes maps // Mutex for interacting with the vpnIps and indexes maps
sync.RWMutex synctrace.RWMutex
vpnIps map[netip.Addr]*HandshakeHostInfo vpnIps map[netip.Addr]*HandshakeHostInfo
indexes map[uint32]*HandshakeHostInfo indexes map[uint32]*HandshakeHostInfo
@@ -66,7 +66,7 @@ type HandshakeManager struct {
} }
type HandshakeHostInfo struct { type HandshakeHostInfo struct {
sync.Mutex synctrace.Mutex
startTime time.Time // Time that we first started trying with this handshake startTime time.Time // Time that we first started trying with this handshake
ready bool // Is the handshake ready ready bool // Is the handshake ready
@@ -104,6 +104,7 @@ func (hh *HandshakeHostInfo) cachePacket(l *logrus.Logger, t header.MessageType,
func NewHandshakeManager(l *logrus.Logger, mainHostMap *HostMap, lightHouse *LightHouse, outside udp.Conn, config HandshakeConfig) *HandshakeManager { func NewHandshakeManager(l *logrus.Logger, mainHostMap *HostMap, lightHouse *LightHouse, outside udp.Conn, config HandshakeConfig) *HandshakeManager {
return &HandshakeManager{ return &HandshakeManager{
RWMutex: synctrace.NewRWMutex("handshake-manager"),
vpnIps: map[netip.Addr]*HandshakeHostInfo{}, vpnIps: map[netip.Addr]*HandshakeHostInfo{},
indexes: map[uint32]*HandshakeHostInfo{}, indexes: map[uint32]*HandshakeHostInfo{},
mainHostMap: mainHostMap, mainHostMap: mainHostMap,
@@ -111,7 +112,7 @@ func NewHandshakeManager(l *logrus.Logger, mainHostMap *HostMap, lightHouse *Lig
outside: outside, outside: outside,
config: config, config: config,
trigger: make(chan netip.Addr, config.triggerBuffer), trigger: make(chan netip.Addr, config.triggerBuffer),
OutboundHandshakeTimer: NewLockingTimerWheel[netip.Addr](config.tryInterval, hsTimeout(config.retries, config.tryInterval)), OutboundHandshakeTimer: NewLockingTimerWheel[netip.Addr]("outbound-handshake-timer", config.tryInterval, hsTimeout(config.retries, config.tryInterval)),
messageMetrics: config.messageMetrics, messageMetrics: config.messageMetrics,
metricInitiated: metrics.GetOrRegisterCounter("handshake_manager.initiated", nil), metricInitiated: metrics.GetOrRegisterCounter("handshake_manager.initiated", nil),
metricTimedOut: metrics.GetOrRegisterCounter("handshake_manager.timed_out", nil), metricTimedOut: metrics.GetOrRegisterCounter("handshake_manager.timed_out", nil),
@@ -450,6 +451,7 @@ func (hm *HandshakeManager) StartHandshake(vpnAddr netip.Addr, cacheCb func(*Han
vpnAddrs: []netip.Addr{vpnAddr}, vpnAddrs: []netip.Addr{vpnAddr},
HandshakePacket: make(map[uint8][]byte, 0), HandshakePacket: make(map[uint8][]byte, 0),
relayState: RelayState{ relayState: RelayState{
RWMutex: synctrace.NewRWMutex("relay-state"),
relays: map[netip.Addr]struct{}{}, relays: map[netip.Addr]struct{}{},
relayForByAddr: map[netip.Addr]*Relay{}, relayForByAddr: map[netip.Addr]*Relay{},
relayForByIdx: map[uint32]*Relay{}, relayForByIdx: map[uint32]*Relay{},
@@ -457,6 +459,7 @@ func (hm *HandshakeManager) StartHandshake(vpnAddr netip.Addr, cacheCb func(*Han
} }
hh := &HandshakeHostInfo{ hh := &HandshakeHostInfo{
Mutex: synctrace.NewMutex("handshake-hostinfo"),
hostinfo: hostinfo, hostinfo: hostinfo,
startTime: time.Now(), startTime: time.Now(),
} }

View File

@@ -4,7 +4,6 @@ import (
"errors" "errors"
"net" "net"
"net/netip" "net/netip"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -14,6 +13,7 @@ import (
"github.com/slackhq/nebula/cert" "github.com/slackhq/nebula/cert"
"github.com/slackhq/nebula/config" "github.com/slackhq/nebula/config"
"github.com/slackhq/nebula/header" "github.com/slackhq/nebula/header"
"github.com/wadey/synctrace"
) )
// const ProbeLen = 100 // const ProbeLen = 100
@@ -53,20 +53,20 @@ type Relay struct {
} }
type HostMap struct { type HostMap struct {
sync.RWMutex //Because we concurrently read and write to our maps synctrace.RWMutex //Because we concurrently read and write to our maps
Indexes map[uint32]*HostInfo Indexes map[uint32]*HostInfo
Relays map[uint32]*HostInfo // Maps a Relay IDX to a Relay HostInfo object Relays map[uint32]*HostInfo // Maps a Relay IDX to a Relay HostInfo object
RemoteIndexes map[uint32]*HostInfo RemoteIndexes map[uint32]*HostInfo
Hosts map[netip.Addr]*HostInfo Hosts map[netip.Addr]*HostInfo
preferredRanges atomic.Pointer[[]netip.Prefix] preferredRanges atomic.Pointer[[]netip.Prefix]
l *logrus.Logger l *logrus.Logger
} }
// For synchronization, treat the pointed-to Relay struct as immutable. To edit the Relay // For synchronization, treat the pointed-to Relay struct as immutable. To edit the Relay
// struct, make a copy of an existing value, edit the fileds in the copy, and // struct, make a copy of an existing value, edit the fileds in the copy, and
// then store a pointer to the new copy in both realyForBy* maps. // then store a pointer to the new copy in both realyForBy* maps.
type RelayState struct { type RelayState struct {
sync.RWMutex synctrace.RWMutex
relays map[netip.Addr]struct{} // Set of vpnAddr's of Hosts to use as relays to access this peer relays map[netip.Addr]struct{} // Set of vpnAddr's of Hosts to use as relays to access this peer
// For data race avoidance, the contents of a *Relay are treated immutably. To update a *Relay, copy the existing data, // For data race avoidance, the contents of a *Relay are treated immutably. To update a *Relay, copy the existing data,
@@ -288,6 +288,7 @@ func NewHostMapFromConfig(l *logrus.Logger, c *config.C) *HostMap {
func newHostMap(l *logrus.Logger) *HostMap { func newHostMap(l *logrus.Logger) *HostMap {
return &HostMap{ return &HostMap{
RWMutex: synctrace.NewRWMutex("hostmap"),
Indexes: map[uint32]*HostInfo{}, Indexes: map[uint32]*HostInfo{},
Relays: map[uint32]*HostInfo{}, Relays: map[uint32]*HostInfo{},
RemoteIndexes: map[uint32]*HostInfo{}, RemoteIndexes: map[uint32]*HostInfo{},

View File

@@ -9,7 +9,6 @@ import (
"net/netip" "net/netip"
"slices" "slices"
"strconv" "strconv"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -21,15 +20,16 @@ import (
"github.com/slackhq/nebula/header" "github.com/slackhq/nebula/header"
"github.com/slackhq/nebula/udp" "github.com/slackhq/nebula/udp"
"github.com/slackhq/nebula/util" "github.com/slackhq/nebula/util"
"github.com/wadey/synctrace"
) )
var ErrHostNotKnown = errors.New("host not known") var ErrHostNotKnown = errors.New("host not known")
type LightHouse struct { type LightHouse struct {
//TODO: We need a timer wheel to kick out vpnAddrs that haven't reported in a long time //TODO: We need a timer wheel to kick out vpnAddrs that haven't reported in a long time
sync.RWMutex //Because we concurrently read and write to our maps synctrace.RWMutex //Because we concurrently read and write to our maps
ctx context.Context ctx context.Context
amLighthouse bool amLighthouse bool
myVpnNetworks []netip.Prefix myVpnNetworks []netip.Prefix
myVpnNetworksTable *bart.Lite myVpnNetworksTable *bart.Lite
@@ -96,6 +96,7 @@ func NewLightHouseFromConfig(ctx context.Context, l *logrus.Logger, c *config.C,
} }
h := LightHouse{ h := LightHouse{
RWMutex: synctrace.NewRWMutex("lighthouse"),
ctx: ctx, ctx: ctx,
amLighthouse: amLighthouse, amLighthouse: amLighthouse,
myVpnNetworks: cs.myVpnNetworks, myVpnNetworks: cs.myVpnNetworks,
@@ -472,6 +473,7 @@ func (lh *LightHouse) QueryServer(vpnAddr netip.Addr) {
return return
} }
synctrace.ChanDebugSend("lighthouse-querychan")
lh.queryChan <- vpnAddr lh.queryChan <- vpnAddr
} }
@@ -725,9 +727,11 @@ func (lh *LightHouse) startQueryWorker() {
nb := make([]byte, 12, 12) nb := make([]byte, 12, 12)
out := make([]byte, mtu) out := make([]byte, mtu)
synctrace.ChanDebugRecvLock("lighthouse-querychan")
for { for {
select { select {
case <-lh.ctx.Done(): case <-lh.ctx.Done():
synctrace.ChanDebugRecvUnlock("lighthouse-querychan")
return return
case addr := <-lh.queryChan: case addr := <-lh.queryChan:
lh.innerQueryServer(addr, nb, out) lh.innerQueryServer(addr, nb, out)

View File

@@ -312,12 +312,11 @@ func parseV6(data []byte, incoming bool, fp *firewall.Packet) error {
offset := ipv6.HeaderLen // Start at the end of the ipv6 header offset := ipv6.HeaderLen // Start at the end of the ipv6 header
next := 0 next := 0
for { for {
if dataLen < offset { if protoAt >= dataLen {
break break
} }
proto := layers.IPProtocol(data[protoAt]) proto := layers.IPProtocol(data[protoAt])
//fmt.Println(proto, protoAt)
switch proto { switch proto {
case layers.IPProtocolICMPv6, layers.IPProtocolESP, layers.IPProtocolNoNextHeader: case layers.IPProtocolICMPv6, layers.IPProtocolESP, layers.IPProtocolNoNextHeader:
fp.Protocol = uint8(proto) fp.Protocol = uint8(proto)
@@ -365,7 +364,7 @@ func parseV6(data []byte, incoming bool, fp *firewall.Packet) error {
case layers.IPProtocolAH: case layers.IPProtocolAH:
// Auth headers, used by IPSec, have a different meaning for header length // Auth headers, used by IPSec, have a different meaning for header length
if dataLen < offset+1 { if dataLen <= offset+1 {
break break
} }
@@ -373,7 +372,7 @@ func parseV6(data []byte, incoming bool, fp *firewall.Packet) error {
default: default:
// Normal ipv6 header length processing // Normal ipv6 header length processing
if dataLen < offset+1 { if dataLen <= offset+1 {
break break
} }

View File

@@ -117,6 +117,45 @@ func Test_newPacket_v6(t *testing.T) {
err = newPacket(buffer.Bytes(), true, p) err = newPacket(buffer.Bytes(), true, p)
require.ErrorIs(t, err, ErrIPv6CouldNotFindPayload) require.ErrorIs(t, err, ErrIPv6CouldNotFindPayload)
// A v6 packet with a hop-by-hop extension
// ICMPv6 Payload (Echo Request)
icmpLayer := layers.ICMPv6{
TypeCode: layers.ICMPv6TypeEchoRequest,
}
// Hop-by-Hop Extension Header
hopOption := layers.IPv6HopByHopOption{}
hopOption.OptionData = []byte{0, 0, 0, 0}
hopByHop := layers.IPv6HopByHop{}
hopByHop.Options = append(hopByHop.Options, &hopOption)
ip = layers.IPv6{
Version: 6,
HopLimit: 128,
NextHeader: layers.IPProtocolIPv6Destination,
SrcIP: net.IPv6linklocalallrouters,
DstIP: net.IPv6linklocalallnodes,
}
buffer.Clear()
err = gopacket.SerializeLayers(buffer, gopacket.SerializeOptions{
ComputeChecksums: false,
FixLengths: true,
}, &ip, &hopByHop, &icmpLayer)
if err != nil {
panic(err)
}
// Ensure buffer length checks during parsing with the next 2 tests.
// A full IPv6 header and 1 byte in the first extension, but missing
// the length byte.
err = newPacket(buffer.Bytes()[:41], true, p)
require.ErrorIs(t, err, ErrIPv6CouldNotFindPayload)
// A full IPv6 header plus 1 full extension, but only 1 byte of the
// next layer, missing length byte
err = newPacket(buffer.Bytes()[:49], true, p)
require.ErrorIs(t, err, ErrIPv6CouldNotFindPayload)
// A good ICMP packet // A good ICMP packet
ip = layers.IPv6{ ip = layers.IPv6{
Version: 6, Version: 6,
@@ -288,6 +327,10 @@ func Test_newPacket_v6(t *testing.T) {
assert.Equal(t, uint16(22), p.LocalPort) assert.Equal(t, uint16(22), p.LocalPort)
assert.False(t, p.Fragment) assert.False(t, p.Fragment)
// Ensure buffer bounds checking during processing
err = newPacket(b[:41], true, p)
require.ErrorIs(t, err, ErrIPv6PacketTooShort)
// Invalid AH header // Invalid AH header
b = buffer.Bytes() b = buffer.Bytes()
err = newPacket(b, true, p) err = newPacket(b, true, p)

View File

@@ -7,11 +7,11 @@ import (
"slices" "slices"
"sort" "sort"
"strconv" "strconv"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/wadey/synctrace"
) )
// forEachFunc is used to benefit folks that want to do work inside the lock // forEachFunc is used to benefit folks that want to do work inside the lock
@@ -185,7 +185,7 @@ func (hr *hostnamesResults) GetAddrs() []netip.AddrPort {
// It serves as a local cache of query replies, host update notifications, and locally learned addresses // It serves as a local cache of query replies, host update notifications, and locally learned addresses
type RemoteList struct { type RemoteList struct {
// Every interaction with internals requires a lock! // Every interaction with internals requires a lock!
sync.RWMutex synctrace.RWMutex
// The full list of vpn addresses assigned to this host // The full list of vpn addresses assigned to this host
vpnAddrs []netip.Addr vpnAddrs []netip.Addr
@@ -215,6 +215,7 @@ type RemoteList struct {
// NewRemoteList creates a new empty RemoteList // NewRemoteList creates a new empty RemoteList
func NewRemoteList(vpnAddrs []netip.Addr, shouldAdd func(netip.Addr) bool) *RemoteList { func NewRemoteList(vpnAddrs []netip.Addr, shouldAdd func(netip.Addr) bool) *RemoteList {
r := &RemoteList{ r := &RemoteList{
RWMutex: synctrace.NewRWMutex("remote-list"),
vpnAddrs: make([]netip.Addr, len(vpnAddrs)), vpnAddrs: make([]netip.Addr, len(vpnAddrs)),
addrs: make([]netip.AddrPort, 0), addrs: make([]netip.AddrPort, 0),
relays: make([]netip.Addr, 0), relays: make([]netip.Addr, 0),

View File

@@ -5,10 +5,10 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"sync"
"github.com/armon/go-radix" "github.com/armon/go-radix"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/wadey/synctrace"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
) )
@@ -28,7 +28,7 @@ type SSHServer struct {
listener net.Listener listener net.Listener
// Locks the conns/counter to avoid concurrent map access // Locks the conns/counter to avoid concurrent map access
connsLock sync.Mutex connsLock synctrace.Mutex
conns map[int]*session conns map[int]*session
counter int counter int
} }
@@ -41,6 +41,7 @@ func NewSSHServer(l *logrus.Entry) (*SSHServer, error) {
l: l, l: l,
commands: radix.New(), commands: radix.New(),
conns: make(map[int]*session), conns: make(map[int]*session),
connsLock: synctrace.NewMutex("ssh-server-conns"),
} }
cc := ssh.CertChecker{ cc := ssh.CertChecker{

View File

@@ -1,8 +1,9 @@
package nebula package nebula
import ( import (
"sync"
"time" "time"
"github.com/wadey/synctrace"
) )
// How many timer objects should be cached // How many timer objects should be cached
@@ -34,7 +35,7 @@ type TimerWheel[T any] struct {
} }
type LockingTimerWheel[T any] struct { type LockingTimerWheel[T any] struct {
m sync.Mutex m synctrace.Mutex
t *TimerWheel[T] t *TimerWheel[T]
} }
@@ -81,8 +82,9 @@ func NewTimerWheel[T any](min, max time.Duration) *TimerWheel[T] {
} }
// NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penalty // NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penalty
func NewLockingTimerWheel[T any](min, max time.Duration) *LockingTimerWheel[T] { func NewLockingTimerWheel[T any](name string, min, max time.Duration) *LockingTimerWheel[T] {
return &LockingTimerWheel[T]{ return &LockingTimerWheel[T]{
m: synctrace.NewMutex(name),
t: NewTimerWheel[T](min, max), t: NewTimerWheel[T](min, max),
} }
} }

View File

@@ -11,13 +11,13 @@ import (
"io" "io"
"net" "net"
"net/netip" "net/netip"
"sync"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
"unsafe" "unsafe"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/slackhq/nebula/config" "github.com/slackhq/nebula/config"
"github.com/wadey/synctrace"
"golang.org/x/sys/windows" "golang.org/x/sys/windows"
"golang.zx2c4.com/wireguard/conn/winrio" "golang.zx2c4.com/wireguard/conn/winrio"
) )
@@ -46,7 +46,7 @@ type ringBuffer struct {
iocp windows.Handle iocp windows.Handle
isFull bool isFull bool
cq winrio.Cq cq winrio.Cq
mu sync.Mutex mu synctrace.Mutex
overlapped windows.Overlapped overlapped windows.Overlapped
} }
@@ -64,7 +64,11 @@ func NewRIOListener(l *logrus.Logger, addr netip.Addr, port int) (*RIOConn, erro
return nil, errors.New("could not initialize winrio") return nil, errors.New("could not initialize winrio")
} }
u := &RIOConn{l: l} u := &RIOConn{
l: l,
rx: ringBuffer{mu: synctrace.NewMutex("rio-rx")},
tx: ringBuffer{mu: synctrace.NewMutex("rio-tx")},
}
err := u.bind(&windows.SockaddrInet6{Addr: addr.As16(), Port: port}) err := u.bind(&windows.SockaddrInet6{Addr: addr.As16(), Port: port})
if err != nil { if err != nil {