mirror of
https://github.com/slackhq/nebula.git
synced 2025-11-22 08:24:25 +01:00
Compare commits
30 Commits
stinky
...
mutex-debu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d4aea03dd1 | ||
|
|
1c9fdba403 | ||
|
|
77eced39dd | ||
|
|
1704d7f75a | ||
|
|
2030cbf018 | ||
|
|
dffaaf38d4 | ||
|
|
f2251645bb | ||
|
|
2ff26b261d | ||
|
|
c7f1bed882 | ||
|
|
0ccfad1a1e | ||
|
|
1be8dc43a7 | ||
|
|
94dd14c1a3 | ||
|
|
91ec6bb1ff | ||
|
|
26f7a9fd45 | ||
|
|
6f27f46965 | ||
|
|
bcaefce4ac | ||
|
|
540a171ef8 | ||
|
|
4d88c0711a | ||
|
|
5ce8279875 | ||
|
|
fdb78044ba | ||
|
|
4c89b3c6a3 | ||
|
|
5cc43ea9cd | ||
|
|
92c4245329 | ||
|
|
e5789770b1 | ||
|
|
a83f0ca470 | ||
|
|
90e9a8e42c | ||
|
|
9105eba939 | ||
|
|
3e5e48f937 | ||
|
|
afde2080d6 | ||
|
|
e6eeef785e |
2
.github/workflows/smoke.yml
vendored
2
.github/workflows/smoke.yml
vendored
@@ -26,7 +26,7 @@ jobs:
|
||||
check-latest: true
|
||||
|
||||
- name: build
|
||||
run: make bin-docker CGO_ENABLED=1 BUILD_ARGS=-race
|
||||
run: make bin-docker CGO_ENABLED=1 BUILD_ARGS="-race -tags=mutex_debug"
|
||||
|
||||
- name: setup docker image
|
||||
working-directory: ./.github/workflows/smoke
|
||||
|
||||
2
.github/workflows/test.yml
vendored
2
.github/workflows/test.yml
vendored
@@ -40,7 +40,7 @@ jobs:
|
||||
run: make test
|
||||
|
||||
- name: End 2 end
|
||||
run: make e2evv
|
||||
run: make e2e-mutex-debug TEST_LOGS=1 TEST_FLAGS=-v
|
||||
|
||||
- name: Build test mobile
|
||||
run: make build-test-mobile
|
||||
|
||||
4
Makefile
4
Makefile
@@ -63,6 +63,9 @@ ALL = $(ALL_LINUX) \
|
||||
e2e:
|
||||
$(TEST_ENV) go test -tags=e2e_testing -count=1 $(TEST_FLAGS) ./e2e
|
||||
|
||||
e2e-mutex-debug:
|
||||
$(TEST_ENV) go test -tags=mutex_debug,e2e_testing -count=1 $(TEST_FLAGS) ./e2e
|
||||
|
||||
e2ev: TEST_FLAGS += -v
|
||||
e2ev: e2e
|
||||
|
||||
@@ -215,6 +218,7 @@ ifeq ($(words $(MAKECMDGOALS)),1)
|
||||
@$(MAKE) service ${.DEFAULT_GOAL} --no-print-directory
|
||||
endif
|
||||
|
||||
bin-docker: BUILD_ARGS = -tags=mutex_debug
|
||||
bin-docker: bin build/linux-amd64/nebula build/linux-amd64/nebula-cert
|
||||
|
||||
smoke-docker: bin-docker
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"net/netip"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rcrowley/go-metrics"
|
||||
@@ -28,14 +27,14 @@ const (
|
||||
|
||||
type connectionManager struct {
|
||||
in map[uint32]struct{}
|
||||
inLock *sync.RWMutex
|
||||
inLock syncRWMutex
|
||||
|
||||
out map[uint32]struct{}
|
||||
outLock *sync.RWMutex
|
||||
outLock syncRWMutex
|
||||
|
||||
// relayUsed holds which relay localIndexs are in use
|
||||
relayUsed map[uint32]struct{}
|
||||
relayUsedLock *sync.RWMutex
|
||||
relayUsedLock syncRWMutex
|
||||
|
||||
hostMap *HostMap
|
||||
trafficTimer *LockingTimerWheel[uint32]
|
||||
@@ -60,12 +59,12 @@ func newConnectionManager(ctx context.Context, l *logrus.Logger, intf *Interface
|
||||
nc := &connectionManager{
|
||||
hostMap: intf.hostMap,
|
||||
in: make(map[uint32]struct{}),
|
||||
inLock: &sync.RWMutex{},
|
||||
inLock: newSyncRWMutex("connection-manager-in"),
|
||||
out: make(map[uint32]struct{}),
|
||||
outLock: &sync.RWMutex{},
|
||||
outLock: newSyncRWMutex("connection-manager-out"),
|
||||
relayUsed: make(map[uint32]struct{}),
|
||||
relayUsedLock: &sync.RWMutex{},
|
||||
trafficTimer: NewLockingTimerWheel[uint32](time.Millisecond*500, max),
|
||||
relayUsedLock: newSyncRWMutex("connection-manager-relay-used"),
|
||||
trafficTimer: NewLockingTimerWheel[uint32]("connection-manager-timer", time.Millisecond*500, max),
|
||||
intf: intf,
|
||||
pendingDeletion: make(map[uint32]struct{}),
|
||||
checkInterval: checkInterval,
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"crypto/rand"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/flynn/noise"
|
||||
@@ -24,7 +23,7 @@ type ConnectionState struct {
|
||||
initiator bool
|
||||
messageCounter atomic.Uint64
|
||||
window *Bits
|
||||
writeLock sync.Mutex
|
||||
writeLock syncMutex
|
||||
}
|
||||
|
||||
func NewConnectionState(l *logrus.Logger, cs *CertState, crt cert.Certificate, initiator bool, pattern noise.HandshakePattern) (*ConnectionState, error) {
|
||||
@@ -76,6 +75,8 @@ func NewConnectionState(l *logrus.Logger, cs *CertState, crt cert.Certificate, i
|
||||
initiator: initiator,
|
||||
window: b,
|
||||
myCert: crt,
|
||||
|
||||
writeLock: newSyncMutex("connection-state-write"),
|
||||
}
|
||||
// always start the counter from 2, as packet 1 and packet 2 are handshake packets.
|
||||
ci.messageCounter.Add(2)
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"net/netip"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/gaissmai/bart"
|
||||
"github.com/miekg/dns"
|
||||
@@ -21,7 +20,7 @@ var dnsServer *dns.Server
|
||||
var dnsAddr string
|
||||
|
||||
type dnsRecords struct {
|
||||
sync.RWMutex
|
||||
syncRWMutex
|
||||
l *logrus.Logger
|
||||
dnsMap4 map[string]netip.Addr
|
||||
dnsMap6 map[string]netip.Addr
|
||||
@@ -31,6 +30,7 @@ type dnsRecords struct {
|
||||
|
||||
func newDnsRecords(l *logrus.Logger, cs *CertState, hostMap *HostMap) *dnsRecords {
|
||||
return &dnsRecords{
|
||||
syncRWMutex: newSyncRWMutex("dns-records"),
|
||||
l: l,
|
||||
dnsMap4: make(map[string]netip.Addr),
|
||||
dnsMap6: make(map[string]netip.Addr),
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gaissmai/bart"
|
||||
@@ -76,7 +75,7 @@ type firewallMetrics struct {
|
||||
}
|
||||
|
||||
type FirewallConntrack struct {
|
||||
sync.Mutex
|
||||
syncMutex
|
||||
|
||||
Conns map[firewall.Packet]*conn
|
||||
TimerWheel *TimerWheel[firewall.Packet]
|
||||
@@ -164,6 +163,7 @@ func NewFirewall(l *logrus.Logger, tcpTimeout, UDPTimeout, defaultTimeout time.D
|
||||
|
||||
return &Firewall{
|
||||
Conntrack: &FirewallConntrack{
|
||||
syncMutex: newSyncMutex("firewall-conntrack"),
|
||||
Conns: make(map[firewall.Packet]*conn),
|
||||
TimerWheel: NewTimerWheel[firewall.Packet](tmin, tmax),
|
||||
},
|
||||
|
||||
2
go.mod
2
go.mod
@@ -8,6 +8,7 @@ require (
|
||||
dario.cat/mergo v1.0.1
|
||||
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be
|
||||
github.com/armon/go-radix v1.0.0
|
||||
github.com/clarkmcc/go-dag v0.0.0-20220908000337-9c3ba5b365fc
|
||||
github.com/cyberdelia/go-metrics-graphite v0.0.0-20161219230853-39f87cc3b432
|
||||
github.com/flynn/noise v1.1.0
|
||||
github.com/gaissmai/bart v0.20.1
|
||||
@@ -23,6 +24,7 @@ require (
|
||||
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
|
||||
github.com/stefanberger/go-pkcs11uri v0.0.0-20230803200340-78284954bff6
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/timandy/routine v1.1.5
|
||||
github.com/vishvananda/netlink v1.3.0
|
||||
golang.org/x/crypto v0.36.0
|
||||
golang.org/x/exp v0.0.0-20230725093048-515e97ebf090
|
||||
|
||||
6
go.sum
6
go.sum
@@ -17,6 +17,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
|
||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/clarkmcc/go-dag v0.0.0-20220908000337-9c3ba5b365fc h1:6e91sWiDE69Jl0WUsY/LvTCBPRBe6b2j8H7W96JGJ4s=
|
||||
github.com/clarkmcc/go-dag v0.0.0-20220908000337-9c3ba5b365fc/go.mod h1:RGIcF96ORCYAsdz60Ou9mPBNa4+DjoQFS8nelPniFoY=
|
||||
github.com/cyberdelia/go-metrics-graphite v0.0.0-20161219230853-39f87cc3b432 h1:M5QgkYacWj0Xs8MhpIK/5uwU02icXpEoSo9sM2aRCps=
|
||||
github.com/cyberdelia/go-metrics-graphite v0.0.0-20161219230853-39f87cc3b432/go.mod h1:xwIwAxMvYnVrGJPe2FKx5prTrnAjGOD8zvDOnxnrrkM=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
@@ -145,6 +147,10 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/timandy/routine v1.1.1 h1:6/Z7qLFZj3GrzuRksBFzIG8YGUh8CLhjnnMePBQTrEI=
|
||||
github.com/timandy/routine v1.1.1/go.mod h1:OZHPOKSvqL/ZvqXFkNZyit0xIVelERptYXdAHH00adQ=
|
||||
github.com/timandy/routine v1.1.5 h1:LSpm7Iijwb9imIPlucl4krpr2EeCeAUvifiQ9Uf5X+M=
|
||||
github.com/timandy/routine v1.1.5/go.mod h1:kXslgIosdY8LW0byTyPnenDgn4/azt2euufAq9rK51w=
|
||||
github.com/vishvananda/netlink v1.3.0 h1:X7l42GfcV4S6E4vHTsw48qbrV+9PVojNfIhZcwQdrZk=
|
||||
github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs=
|
||||
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
|
||||
|
||||
@@ -243,6 +243,7 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
|
||||
}
|
||||
|
||||
hostinfo := &HostInfo{
|
||||
syncRWMutex: newSyncRWMutex("hostinfo"),
|
||||
ConnectionState: ci,
|
||||
localIndexId: myIndex,
|
||||
remoteIndexId: hs.Details.InitiatorIndex,
|
||||
@@ -250,6 +251,7 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
|
||||
HandshakePacket: make(map[uint8][]byte, 0),
|
||||
lastHandshakeTime: hs.Details.Time,
|
||||
relayState: RelayState{
|
||||
syncRWMutex: newSyncRWMutex("relay-state"),
|
||||
relays: map[netip.Addr]struct{}{},
|
||||
relayForByAddr: map[netip.Addr]*Relay{},
|
||||
relayForByIdx: map[uint32]*Relay{},
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"errors"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rcrowley/go-metrics"
|
||||
@@ -45,7 +44,7 @@ type HandshakeConfig struct {
|
||||
|
||||
type HandshakeManager struct {
|
||||
// Mutex for interacting with the vpnIps and indexes maps
|
||||
sync.RWMutex
|
||||
syncRWMutex
|
||||
|
||||
vpnIps map[netip.Addr]*HandshakeHostInfo
|
||||
indexes map[uint32]*HandshakeHostInfo
|
||||
@@ -66,7 +65,7 @@ type HandshakeManager struct {
|
||||
}
|
||||
|
||||
type HandshakeHostInfo struct {
|
||||
sync.Mutex
|
||||
syncMutex
|
||||
|
||||
startTime time.Time // Time that we first started trying with this handshake
|
||||
ready bool // Is the handshake ready
|
||||
@@ -104,6 +103,7 @@ func (hh *HandshakeHostInfo) cachePacket(l *logrus.Logger, t header.MessageType,
|
||||
|
||||
func NewHandshakeManager(l *logrus.Logger, mainHostMap *HostMap, lightHouse *LightHouse, outside udp.Conn, config HandshakeConfig) *HandshakeManager {
|
||||
return &HandshakeManager{
|
||||
syncRWMutex: newSyncRWMutex("handshake-manager"),
|
||||
vpnIps: map[netip.Addr]*HandshakeHostInfo{},
|
||||
indexes: map[uint32]*HandshakeHostInfo{},
|
||||
mainHostMap: mainHostMap,
|
||||
@@ -111,7 +111,7 @@ func NewHandshakeManager(l *logrus.Logger, mainHostMap *HostMap, lightHouse *Lig
|
||||
outside: outside,
|
||||
config: config,
|
||||
trigger: make(chan netip.Addr, config.triggerBuffer),
|
||||
OutboundHandshakeTimer: NewLockingTimerWheel[netip.Addr](config.tryInterval, hsTimeout(config.retries, config.tryInterval)),
|
||||
OutboundHandshakeTimer: NewLockingTimerWheel[netip.Addr]("handshake-manager-timer", config.tryInterval, hsTimeout(config.retries, config.tryInterval)),
|
||||
messageMetrics: config.messageMetrics,
|
||||
metricInitiated: metrics.GetOrRegisterCounter("handshake_manager.initiated", nil),
|
||||
metricTimedOut: metrics.GetOrRegisterCounter("handshake_manager.timed_out", nil),
|
||||
@@ -448,9 +448,11 @@ func (hm *HandshakeManager) StartHandshake(vpnAddr netip.Addr, cacheCb func(*Han
|
||||
}
|
||||
|
||||
hostinfo := &HostInfo{
|
||||
syncRWMutex: newSyncRWMutex("hostinfo"),
|
||||
vpnAddrs: []netip.Addr{vpnAddr},
|
||||
HandshakePacket: make(map[uint8][]byte, 0),
|
||||
relayState: RelayState{
|
||||
syncRWMutex: newSyncRWMutex("relay-state"),
|
||||
relays: map[netip.Addr]struct{}{},
|
||||
relayForByAddr: map[netip.Addr]*Relay{},
|
||||
relayForByIdx: map[uint32]*Relay{},
|
||||
@@ -458,6 +460,7 @@ func (hm *HandshakeManager) StartHandshake(vpnAddr netip.Addr, cacheCb func(*Han
|
||||
}
|
||||
|
||||
hh := &HandshakeHostInfo{
|
||||
syncMutex: newSyncMutex("handshake-hostinfo"),
|
||||
hostinfo: hostinfo,
|
||||
startTime: time.Now(),
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"errors"
|
||||
"net"
|
||||
"net/netip"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -53,7 +52,7 @@ type Relay struct {
|
||||
}
|
||||
|
||||
type HostMap struct {
|
||||
sync.RWMutex //Because we concurrently read and write to our maps
|
||||
syncRWMutex //Because we concurrently read and write to our maps
|
||||
Indexes map[uint32]*HostInfo
|
||||
Relays map[uint32]*HostInfo // Maps a Relay IDX to a Relay HostInfo object
|
||||
RemoteIndexes map[uint32]*HostInfo
|
||||
@@ -66,7 +65,7 @@ type HostMap struct {
|
||||
// struct, make a copy of an existing value, edit the fileds in the copy, and
|
||||
// then store a pointer to the new copy in both realyForBy* maps.
|
||||
type RelayState struct {
|
||||
sync.RWMutex
|
||||
syncRWMutex
|
||||
|
||||
relays map[netip.Addr]struct{} // Set of vpnAddr's of Hosts to use as relays to access this peer
|
||||
// For data race avoidance, the contents of a *Relay are treated immutably. To update a *Relay, copy the existing data,
|
||||
@@ -209,6 +208,7 @@ func (rs *RelayState) InsertRelay(ip netip.Addr, idx uint32, r *Relay) {
|
||||
}
|
||||
|
||||
type HostInfo struct {
|
||||
syncRWMutex
|
||||
remote netip.AddrPort
|
||||
remotes *RemoteList
|
||||
promoteCounter atomic.Uint32
|
||||
@@ -288,6 +288,7 @@ func NewHostMapFromConfig(l *logrus.Logger, c *config.C) *HostMap {
|
||||
|
||||
func newHostMap(l *logrus.Logger) *HostMap {
|
||||
return &HostMap{
|
||||
syncRWMutex: newSyncRWMutex("hostmap"),
|
||||
Indexes: map[uint32]*HostInfo{},
|
||||
Relays: map[uint32]*HostInfo{},
|
||||
RemoteIndexes: map[uint32]*HostInfo{},
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"net/netip"
|
||||
"slices"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -27,7 +26,7 @@ var ErrHostNotKnown = errors.New("host not known")
|
||||
|
||||
type LightHouse struct {
|
||||
//TODO: We need a timer wheel to kick out vpnAddrs that haven't reported in a long time
|
||||
sync.RWMutex //Because we concurrently read and write to our maps
|
||||
syncRWMutex //Because we concurrently read and write to our maps
|
||||
ctx context.Context
|
||||
amLighthouse bool
|
||||
|
||||
@@ -96,6 +95,7 @@ func NewLightHouseFromConfig(ctx context.Context, l *logrus.Logger, c *config.C,
|
||||
}
|
||||
|
||||
h := LightHouse{
|
||||
syncRWMutex: newSyncRWMutex("lighthouse"),
|
||||
ctx: ctx,
|
||||
amLighthouse: amLighthouse,
|
||||
myVpnNetworks: cs.myVpnNetworks,
|
||||
@@ -475,6 +475,7 @@ func (lh *LightHouse) QueryServer(vpnAddr netip.Addr) {
|
||||
return
|
||||
}
|
||||
|
||||
chanDebugSend("lighthouse-query-chan")
|
||||
lh.queryChan <- vpnAddr
|
||||
}
|
||||
|
||||
@@ -731,6 +732,8 @@ func (lh *LightHouse) startQueryWorker() {
|
||||
nb := make([]byte, 12, 12)
|
||||
out := make([]byte, mtu)
|
||||
|
||||
chanDebugRecv("lighthouse-query-chan")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-lh.ctx.Done():
|
||||
|
||||
195
mutex_debug.go
Normal file
195
mutex_debug.go
Normal file
@@ -0,0 +1,195 @@
|
||||
//go:build mutex_debug
|
||||
// +build mutex_debug
|
||||
|
||||
package nebula
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"github.com/clarkmcc/go-dag"
|
||||
"github.com/timandy/routine"
|
||||
)
|
||||
|
||||
type mutexKey = string
|
||||
|
||||
// For each Key in this map, the Value is a list of lock types you can already have
|
||||
// when you want to grab that Key. This ensures that locks are always fetched
|
||||
// in the same order, to prevent deadlocks.
|
||||
var allowedConcurrentLocks = map[mutexKey][]mutexKey{
|
||||
"connection-manager-in": {"hostmap"},
|
||||
"connection-manager-out": {"connection-manager-in", "handshake-hostinfo", "handshake-manager"},
|
||||
"connection-manager-relay-used": {"handshake-hostinfo"},
|
||||
"connection-manager-timer": {"connection-manager-out"},
|
||||
// "connection-state-write": {"hostmap"},
|
||||
"firewall-conntrack": {"handshake-hostinfo"},
|
||||
"handshake-manager": {"handshake-hostinfo", "hostmap"},
|
||||
"handshake-manager-timer": {"handshake-manager"},
|
||||
"hostmap": {"lighthouse-query-chan", "handshake-hostinfo"},
|
||||
"lighthouse": {"handshake-hostinfo"},
|
||||
"relay-state": {"connection-manager-relay-used", "hostmap"},
|
||||
"remote-list": {"lighthouse", "handshake-manager"},
|
||||
"lighthouse-query-chan": {"handshake-hostinfo"},
|
||||
}
|
||||
|
||||
type mutexValue struct {
|
||||
file string
|
||||
line int
|
||||
}
|
||||
|
||||
func (m mutexValue) String() string {
|
||||
return fmt.Sprintf("%s:%d", m.file, m.line)
|
||||
}
|
||||
|
||||
var threadLocal = routine.NewThreadLocalWithInitial[map[mutexKey]mutexValue](func() map[mutexKey]mutexValue { return map[mutexKey]mutexValue{} })
|
||||
|
||||
var allowedDAG dag.AcyclicGraph
|
||||
|
||||
// We build a directed acyclic graph to assert that the locks can only be
|
||||
// acquired in a determined order, If there are cycles in the DAG, then we
|
||||
// know that the locking order is not guaranteed.
|
||||
func init() {
|
||||
for k, v := range allowedConcurrentLocks {
|
||||
allowedDAG.Add(k)
|
||||
for _, t := range v {
|
||||
allowedDAG.Add(t)
|
||||
}
|
||||
}
|
||||
for k, v := range allowedConcurrentLocks {
|
||||
for _, t := range v {
|
||||
allowedDAG.Connect(dag.BasicEdge(k, t))
|
||||
}
|
||||
}
|
||||
|
||||
if cycles := allowedDAG.Cycles(); len(cycles) > 0 {
|
||||
panic(fmt.Errorf("Cycles found in allowedConcurrentLocks: %v", cycles))
|
||||
}
|
||||
|
||||
// Rebuild allowedConcurrentLocks as a flattened list of all possibilities
|
||||
for k := range allowedConcurrentLocks {
|
||||
ancestors, err := allowedDAG.Ancestors(k)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var allowed []mutexKey
|
||||
for t := range ancestors {
|
||||
allowed = append(allowed, t.(mutexKey))
|
||||
}
|
||||
allowedConcurrentLocks[k] = allowed
|
||||
}
|
||||
}
|
||||
|
||||
type syncRWMutex struct {
|
||||
sync.RWMutex
|
||||
mutexKey
|
||||
}
|
||||
|
||||
type syncMutex struct {
|
||||
sync.Mutex
|
||||
mutexKey
|
||||
}
|
||||
|
||||
func newSyncRWMutex(key mutexKey) syncRWMutex {
|
||||
return syncRWMutex{
|
||||
mutexKey: key,
|
||||
}
|
||||
}
|
||||
|
||||
func newSyncMutex(key mutexKey) syncMutex {
|
||||
return syncMutex{
|
||||
mutexKey: key,
|
||||
}
|
||||
}
|
||||
|
||||
func alertMutex(err error) {
|
||||
panic(err)
|
||||
// NOTE: you could switch to this log Line and remove the panic if you want
|
||||
// to log all failures instead of panicking on the first one
|
||||
//log.Print(err, string(debug.Stack()))
|
||||
}
|
||||
|
||||
func checkMutex(state map[mutexKey]mutexValue, add mutexKey) {
|
||||
if add == "" {
|
||||
alertMutex(fmt.Errorf("mutex not initialized with mutexKey"))
|
||||
}
|
||||
|
||||
allowedConcurrent := allowedConcurrentLocks[add]
|
||||
|
||||
for k, v := range state {
|
||||
if add == k {
|
||||
alertMutex(fmt.Errorf("re-entrant lock: %s. previous allocation: %s", add, v))
|
||||
}
|
||||
|
||||
// TODO use slices.Contains, but requires go1.21
|
||||
var found bool
|
||||
for _, a := range allowedConcurrent {
|
||||
if a == k {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
alertMutex(fmt.Errorf("grabbing %s lock and already have these locks: %s", add, state))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func chanDebugRecv(key mutexKey) {
|
||||
m := threadLocal.Get()
|
||||
checkMutex(m, key)
|
||||
v := mutexValue{}
|
||||
_, v.file, v.line, _ = runtime.Caller(1)
|
||||
m[key] = v
|
||||
}
|
||||
|
||||
func chanDebugSend(key mutexKey) {
|
||||
m := threadLocal.Get()
|
||||
checkMutex(m, key)
|
||||
}
|
||||
|
||||
func (s *syncRWMutex) Lock() {
|
||||
m := threadLocal.Get()
|
||||
checkMutex(m, s.mutexKey)
|
||||
v := mutexValue{}
|
||||
_, v.file, v.line, _ = runtime.Caller(1)
|
||||
m[s.mutexKey] = v
|
||||
s.RWMutex.Lock()
|
||||
}
|
||||
|
||||
func (s *syncRWMutex) Unlock() {
|
||||
m := threadLocal.Get()
|
||||
delete(m, s.mutexKey)
|
||||
s.RWMutex.Unlock()
|
||||
}
|
||||
|
||||
func (s *syncRWMutex) RLock() {
|
||||
m := threadLocal.Get()
|
||||
checkMutex(m, s.mutexKey)
|
||||
v := mutexValue{}
|
||||
_, v.file, v.line, _ = runtime.Caller(1)
|
||||
m[s.mutexKey] = v
|
||||
s.RWMutex.RLock()
|
||||
}
|
||||
|
||||
func (s *syncRWMutex) RUnlock() {
|
||||
m := threadLocal.Get()
|
||||
delete(m, s.mutexKey)
|
||||
s.RWMutex.RUnlock()
|
||||
}
|
||||
|
||||
func (s *syncMutex) Lock() {
|
||||
m := threadLocal.Get()
|
||||
checkMutex(m, s.mutexKey)
|
||||
v := mutexValue{}
|
||||
_, v.file, v.line, _ = runtime.Caller(1)
|
||||
m[s.mutexKey] = v
|
||||
s.Mutex.Lock()
|
||||
}
|
||||
|
||||
func (s *syncMutex) Unlock() {
|
||||
m := threadLocal.Get()
|
||||
delete(m, s.mutexKey)
|
||||
s.Mutex.Unlock()
|
||||
}
|
||||
23
mutex_nodebug.go
Normal file
23
mutex_nodebug.go
Normal file
@@ -0,0 +1,23 @@
|
||||
//go:build !mutex_debug
|
||||
// +build !mutex_debug
|
||||
|
||||
package nebula
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type mutexKey = string
|
||||
type syncRWMutex = sync.RWMutex
|
||||
type syncMutex = sync.Mutex
|
||||
|
||||
func newSyncRWMutex(mutexKey) syncRWMutex {
|
||||
return sync.RWMutex{}
|
||||
}
|
||||
|
||||
func newSyncMutex(mutexKey) syncMutex {
|
||||
return sync.Mutex{}
|
||||
}
|
||||
|
||||
func chanDebugRecv(key mutexKey) {}
|
||||
func chanDebugSend(key mutexKey) {}
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -185,7 +184,7 @@ func (hr *hostnamesResults) GetAddrs() []netip.AddrPort {
|
||||
// It serves as a local cache of query replies, host update notifications, and locally learned addresses
|
||||
type RemoteList struct {
|
||||
// Every interaction with internals requires a lock!
|
||||
sync.RWMutex
|
||||
syncRWMutex
|
||||
|
||||
// The full list of vpn addresses assigned to this host
|
||||
vpnAddrs []netip.Addr
|
||||
@@ -215,11 +214,12 @@ type RemoteList struct {
|
||||
// NewRemoteList creates a new empty RemoteList
|
||||
func NewRemoteList(vpnAddrs []netip.Addr, shouldAdd func(netip.Addr) bool) *RemoteList {
|
||||
r := &RemoteList{
|
||||
vpnAddrs: make([]netip.Addr, len(vpnAddrs)),
|
||||
addrs: make([]netip.AddrPort, 0),
|
||||
relays: make([]netip.Addr, 0),
|
||||
cache: make(map[netip.Addr]*cache),
|
||||
shouldAdd: shouldAdd,
|
||||
syncRWMutex: newSyncRWMutex("remote-list"),
|
||||
vpnAddrs: make([]netip.Addr, len(vpnAddrs)),
|
||||
addrs: make([]netip.AddrPort, 0),
|
||||
relays: make([]netip.Addr, 0),
|
||||
cache: make(map[netip.Addr]*cache),
|
||||
shouldAdd: shouldAdd,
|
||||
}
|
||||
copy(r.vpnAddrs, vpnAddrs)
|
||||
return r
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package nebula
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -34,7 +33,7 @@ type TimerWheel[T any] struct {
|
||||
}
|
||||
|
||||
type LockingTimerWheel[T any] struct {
|
||||
m sync.Mutex
|
||||
m syncMutex
|
||||
t *TimerWheel[T]
|
||||
}
|
||||
|
||||
@@ -81,8 +80,9 @@ func NewTimerWheel[T any](min, max time.Duration) *TimerWheel[T] {
|
||||
}
|
||||
|
||||
// NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penalty
|
||||
func NewLockingTimerWheel[T any](min, max time.Duration) *LockingTimerWheel[T] {
|
||||
func NewLockingTimerWheel[T any](name string, min, max time.Duration) *LockingTimerWheel[T] {
|
||||
return &LockingTimerWheel[T]{
|
||||
m: newSyncMutex(name),
|
||||
t: NewTimerWheel[T](min, max),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user