mirror of
https://github.com/slackhq/nebula.git
synced 2025-11-23 17:04:25 +01:00
Compare commits
30 Commits
jay.wren-w
...
mutex-debu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d4aea03dd1 | ||
|
|
1c9fdba403 | ||
|
|
77eced39dd | ||
|
|
1704d7f75a | ||
|
|
2030cbf018 | ||
|
|
dffaaf38d4 | ||
|
|
f2251645bb | ||
|
|
2ff26b261d | ||
|
|
c7f1bed882 | ||
|
|
0ccfad1a1e | ||
|
|
1be8dc43a7 | ||
|
|
94dd14c1a3 | ||
|
|
91ec6bb1ff | ||
|
|
26f7a9fd45 | ||
|
|
6f27f46965 | ||
|
|
bcaefce4ac | ||
|
|
540a171ef8 | ||
|
|
4d88c0711a | ||
|
|
5ce8279875 | ||
|
|
fdb78044ba | ||
|
|
4c89b3c6a3 | ||
|
|
5cc43ea9cd | ||
|
|
92c4245329 | ||
|
|
e5789770b1 | ||
|
|
a83f0ca470 | ||
|
|
90e9a8e42c | ||
|
|
9105eba939 | ||
|
|
3e5e48f937 | ||
|
|
afde2080d6 | ||
|
|
e6eeef785e |
2
.github/workflows/smoke.yml
vendored
2
.github/workflows/smoke.yml
vendored
@@ -26,7 +26,7 @@ jobs:
|
|||||||
check-latest: true
|
check-latest: true
|
||||||
|
|
||||||
- name: build
|
- name: build
|
||||||
run: make bin-docker CGO_ENABLED=1 BUILD_ARGS=-race
|
run: make bin-docker CGO_ENABLED=1 BUILD_ARGS="-race -tags=mutex_debug"
|
||||||
|
|
||||||
- name: setup docker image
|
- name: setup docker image
|
||||||
working-directory: ./.github/workflows/smoke
|
working-directory: ./.github/workflows/smoke
|
||||||
|
|||||||
2
.github/workflows/test.yml
vendored
2
.github/workflows/test.yml
vendored
@@ -40,7 +40,7 @@ jobs:
|
|||||||
run: make test
|
run: make test
|
||||||
|
|
||||||
- name: End 2 end
|
- name: End 2 end
|
||||||
run: make e2evv
|
run: make e2e-mutex-debug TEST_LOGS=1 TEST_FLAGS=-v
|
||||||
|
|
||||||
- name: Build test mobile
|
- name: Build test mobile
|
||||||
run: make build-test-mobile
|
run: make build-test-mobile
|
||||||
|
|||||||
4
Makefile
4
Makefile
@@ -63,6 +63,9 @@ ALL = $(ALL_LINUX) \
|
|||||||
e2e:
|
e2e:
|
||||||
$(TEST_ENV) go test -tags=e2e_testing -count=1 $(TEST_FLAGS) ./e2e
|
$(TEST_ENV) go test -tags=e2e_testing -count=1 $(TEST_FLAGS) ./e2e
|
||||||
|
|
||||||
|
e2e-mutex-debug:
|
||||||
|
$(TEST_ENV) go test -tags=mutex_debug,e2e_testing -count=1 $(TEST_FLAGS) ./e2e
|
||||||
|
|
||||||
e2ev: TEST_FLAGS += -v
|
e2ev: TEST_FLAGS += -v
|
||||||
e2ev: e2e
|
e2ev: e2e
|
||||||
|
|
||||||
@@ -215,6 +218,7 @@ ifeq ($(words $(MAKECMDGOALS)),1)
|
|||||||
@$(MAKE) service ${.DEFAULT_GOAL} --no-print-directory
|
@$(MAKE) service ${.DEFAULT_GOAL} --no-print-directory
|
||||||
endif
|
endif
|
||||||
|
|
||||||
|
bin-docker: BUILD_ARGS = -tags=mutex_debug
|
||||||
bin-docker: bin build/linux-amd64/nebula build/linux-amd64/nebula-cert
|
bin-docker: bin build/linux-amd64/nebula build/linux-amd64/nebula-cert
|
||||||
|
|
||||||
smoke-docker: bin-docker
|
smoke-docker: bin-docker
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/rcrowley/go-metrics"
|
"github.com/rcrowley/go-metrics"
|
||||||
@@ -28,14 +27,14 @@ const (
|
|||||||
|
|
||||||
type connectionManager struct {
|
type connectionManager struct {
|
||||||
in map[uint32]struct{}
|
in map[uint32]struct{}
|
||||||
inLock *sync.RWMutex
|
inLock syncRWMutex
|
||||||
|
|
||||||
out map[uint32]struct{}
|
out map[uint32]struct{}
|
||||||
outLock *sync.RWMutex
|
outLock syncRWMutex
|
||||||
|
|
||||||
// relayUsed holds which relay localIndexs are in use
|
// relayUsed holds which relay localIndexs are in use
|
||||||
relayUsed map[uint32]struct{}
|
relayUsed map[uint32]struct{}
|
||||||
relayUsedLock *sync.RWMutex
|
relayUsedLock syncRWMutex
|
||||||
|
|
||||||
hostMap *HostMap
|
hostMap *HostMap
|
||||||
trafficTimer *LockingTimerWheel[uint32]
|
trafficTimer *LockingTimerWheel[uint32]
|
||||||
@@ -60,12 +59,12 @@ func newConnectionManager(ctx context.Context, l *logrus.Logger, intf *Interface
|
|||||||
nc := &connectionManager{
|
nc := &connectionManager{
|
||||||
hostMap: intf.hostMap,
|
hostMap: intf.hostMap,
|
||||||
in: make(map[uint32]struct{}),
|
in: make(map[uint32]struct{}),
|
||||||
inLock: &sync.RWMutex{},
|
inLock: newSyncRWMutex("connection-manager-in"),
|
||||||
out: make(map[uint32]struct{}),
|
out: make(map[uint32]struct{}),
|
||||||
outLock: &sync.RWMutex{},
|
outLock: newSyncRWMutex("connection-manager-out"),
|
||||||
relayUsed: make(map[uint32]struct{}),
|
relayUsed: make(map[uint32]struct{}),
|
||||||
relayUsedLock: &sync.RWMutex{},
|
relayUsedLock: newSyncRWMutex("connection-manager-relay-used"),
|
||||||
trafficTimer: NewLockingTimerWheel[uint32](time.Millisecond*500, max),
|
trafficTimer: NewLockingTimerWheel[uint32]("connection-manager-timer", time.Millisecond*500, max),
|
||||||
intf: intf,
|
intf: intf,
|
||||||
pendingDeletion: make(map[uint32]struct{}),
|
pendingDeletion: make(map[uint32]struct{}),
|
||||||
checkInterval: checkInterval,
|
checkInterval: checkInterval,
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/flynn/noise"
|
"github.com/flynn/noise"
|
||||||
@@ -24,7 +23,7 @@ type ConnectionState struct {
|
|||||||
initiator bool
|
initiator bool
|
||||||
messageCounter atomic.Uint64
|
messageCounter atomic.Uint64
|
||||||
window *Bits
|
window *Bits
|
||||||
writeLock sync.Mutex
|
writeLock syncMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConnectionState(l *logrus.Logger, cs *CertState, crt cert.Certificate, initiator bool, pattern noise.HandshakePattern) (*ConnectionState, error) {
|
func NewConnectionState(l *logrus.Logger, cs *CertState, crt cert.Certificate, initiator bool, pattern noise.HandshakePattern) (*ConnectionState, error) {
|
||||||
@@ -76,6 +75,8 @@ func NewConnectionState(l *logrus.Logger, cs *CertState, crt cert.Certificate, i
|
|||||||
initiator: initiator,
|
initiator: initiator,
|
||||||
window: b,
|
window: b,
|
||||||
myCert: crt,
|
myCert: crt,
|
||||||
|
|
||||||
|
writeLock: newSyncMutex("connection-state-write"),
|
||||||
}
|
}
|
||||||
// always start the counter from 2, as packet 1 and packet 2 are handshake packets.
|
// always start the counter from 2, as packet 1 and packet 2 are handshake packets.
|
||||||
ci.messageCounter.Add(2)
|
ci.messageCounter.Add(2)
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import (
|
|||||||
"net/netip"
|
"net/netip"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/gaissmai/bart"
|
"github.com/gaissmai/bart"
|
||||||
"github.com/miekg/dns"
|
"github.com/miekg/dns"
|
||||||
@@ -21,7 +20,7 @@ var dnsServer *dns.Server
|
|||||||
var dnsAddr string
|
var dnsAddr string
|
||||||
|
|
||||||
type dnsRecords struct {
|
type dnsRecords struct {
|
||||||
sync.RWMutex
|
syncRWMutex
|
||||||
l *logrus.Logger
|
l *logrus.Logger
|
||||||
dnsMap4 map[string]netip.Addr
|
dnsMap4 map[string]netip.Addr
|
||||||
dnsMap6 map[string]netip.Addr
|
dnsMap6 map[string]netip.Addr
|
||||||
@@ -31,6 +30,7 @@ type dnsRecords struct {
|
|||||||
|
|
||||||
func newDnsRecords(l *logrus.Logger, cs *CertState, hostMap *HostMap) *dnsRecords {
|
func newDnsRecords(l *logrus.Logger, cs *CertState, hostMap *HostMap) *dnsRecords {
|
||||||
return &dnsRecords{
|
return &dnsRecords{
|
||||||
|
syncRWMutex: newSyncRWMutex("dns-records"),
|
||||||
l: l,
|
l: l,
|
||||||
dnsMap4: make(map[string]netip.Addr),
|
dnsMap4: make(map[string]netip.Addr),
|
||||||
dnsMap6: make(map[string]netip.Addr),
|
dnsMap6: make(map[string]netip.Addr),
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gaissmai/bart"
|
"github.com/gaissmai/bart"
|
||||||
@@ -76,7 +75,7 @@ type firewallMetrics struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type FirewallConntrack struct {
|
type FirewallConntrack struct {
|
||||||
sync.Mutex
|
syncMutex
|
||||||
|
|
||||||
Conns map[firewall.Packet]*conn
|
Conns map[firewall.Packet]*conn
|
||||||
TimerWheel *TimerWheel[firewall.Packet]
|
TimerWheel *TimerWheel[firewall.Packet]
|
||||||
@@ -164,6 +163,7 @@ func NewFirewall(l *logrus.Logger, tcpTimeout, UDPTimeout, defaultTimeout time.D
|
|||||||
|
|
||||||
return &Firewall{
|
return &Firewall{
|
||||||
Conntrack: &FirewallConntrack{
|
Conntrack: &FirewallConntrack{
|
||||||
|
syncMutex: newSyncMutex("firewall-conntrack"),
|
||||||
Conns: make(map[firewall.Packet]*conn),
|
Conns: make(map[firewall.Packet]*conn),
|
||||||
TimerWheel: NewTimerWheel[firewall.Packet](tmin, tmax),
|
TimerWheel: NewTimerWheel[firewall.Packet](tmin, tmax),
|
||||||
},
|
},
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -8,6 +8,7 @@ require (
|
|||||||
dario.cat/mergo v1.0.1
|
dario.cat/mergo v1.0.1
|
||||||
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be
|
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be
|
||||||
github.com/armon/go-radix v1.0.0
|
github.com/armon/go-radix v1.0.0
|
||||||
|
github.com/clarkmcc/go-dag v0.0.0-20220908000337-9c3ba5b365fc
|
||||||
github.com/cyberdelia/go-metrics-graphite v0.0.0-20161219230853-39f87cc3b432
|
github.com/cyberdelia/go-metrics-graphite v0.0.0-20161219230853-39f87cc3b432
|
||||||
github.com/flynn/noise v1.1.0
|
github.com/flynn/noise v1.1.0
|
||||||
github.com/gaissmai/bart v0.20.1
|
github.com/gaissmai/bart v0.20.1
|
||||||
@@ -23,6 +24,7 @@ require (
|
|||||||
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
|
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
|
||||||
github.com/stefanberger/go-pkcs11uri v0.0.0-20230803200340-78284954bff6
|
github.com/stefanberger/go-pkcs11uri v0.0.0-20230803200340-78284954bff6
|
||||||
github.com/stretchr/testify v1.10.0
|
github.com/stretchr/testify v1.10.0
|
||||||
|
github.com/timandy/routine v1.1.5
|
||||||
github.com/vishvananda/netlink v1.3.0
|
github.com/vishvananda/netlink v1.3.0
|
||||||
golang.org/x/crypto v0.36.0
|
golang.org/x/crypto v0.36.0
|
||||||
golang.org/x/exp v0.0.0-20230725093048-515e97ebf090
|
golang.org/x/exp v0.0.0-20230725093048-515e97ebf090
|
||||||
|
|||||||
6
go.sum
6
go.sum
@@ -17,6 +17,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
|
|||||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
|
github.com/clarkmcc/go-dag v0.0.0-20220908000337-9c3ba5b365fc h1:6e91sWiDE69Jl0WUsY/LvTCBPRBe6b2j8H7W96JGJ4s=
|
||||||
|
github.com/clarkmcc/go-dag v0.0.0-20220908000337-9c3ba5b365fc/go.mod h1:RGIcF96ORCYAsdz60Ou9mPBNa4+DjoQFS8nelPniFoY=
|
||||||
github.com/cyberdelia/go-metrics-graphite v0.0.0-20161219230853-39f87cc3b432 h1:M5QgkYacWj0Xs8MhpIK/5uwU02icXpEoSo9sM2aRCps=
|
github.com/cyberdelia/go-metrics-graphite v0.0.0-20161219230853-39f87cc3b432 h1:M5QgkYacWj0Xs8MhpIK/5uwU02icXpEoSo9sM2aRCps=
|
||||||
github.com/cyberdelia/go-metrics-graphite v0.0.0-20161219230853-39f87cc3b432/go.mod h1:xwIwAxMvYnVrGJPe2FKx5prTrnAjGOD8zvDOnxnrrkM=
|
github.com/cyberdelia/go-metrics-graphite v0.0.0-20161219230853-39f87cc3b432/go.mod h1:xwIwAxMvYnVrGJPe2FKx5prTrnAjGOD8zvDOnxnrrkM=
|
||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
@@ -145,6 +147,10 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
|
|||||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
|
github.com/timandy/routine v1.1.1 h1:6/Z7qLFZj3GrzuRksBFzIG8YGUh8CLhjnnMePBQTrEI=
|
||||||
|
github.com/timandy/routine v1.1.1/go.mod h1:OZHPOKSvqL/ZvqXFkNZyit0xIVelERptYXdAHH00adQ=
|
||||||
|
github.com/timandy/routine v1.1.5 h1:LSpm7Iijwb9imIPlucl4krpr2EeCeAUvifiQ9Uf5X+M=
|
||||||
|
github.com/timandy/routine v1.1.5/go.mod h1:kXslgIosdY8LW0byTyPnenDgn4/azt2euufAq9rK51w=
|
||||||
github.com/vishvananda/netlink v1.3.0 h1:X7l42GfcV4S6E4vHTsw48qbrV+9PVojNfIhZcwQdrZk=
|
github.com/vishvananda/netlink v1.3.0 h1:X7l42GfcV4S6E4vHTsw48qbrV+9PVojNfIhZcwQdrZk=
|
||||||
github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs=
|
github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs=
|
||||||
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
|
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
|
||||||
|
|||||||
@@ -243,6 +243,7 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
|
|||||||
}
|
}
|
||||||
|
|
||||||
hostinfo := &HostInfo{
|
hostinfo := &HostInfo{
|
||||||
|
syncRWMutex: newSyncRWMutex("hostinfo"),
|
||||||
ConnectionState: ci,
|
ConnectionState: ci,
|
||||||
localIndexId: myIndex,
|
localIndexId: myIndex,
|
||||||
remoteIndexId: hs.Details.InitiatorIndex,
|
remoteIndexId: hs.Details.InitiatorIndex,
|
||||||
@@ -250,6 +251,7 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
|
|||||||
HandshakePacket: make(map[uint8][]byte, 0),
|
HandshakePacket: make(map[uint8][]byte, 0),
|
||||||
lastHandshakeTime: hs.Details.Time,
|
lastHandshakeTime: hs.Details.Time,
|
||||||
relayState: RelayState{
|
relayState: RelayState{
|
||||||
|
syncRWMutex: newSyncRWMutex("relay-state"),
|
||||||
relays: map[netip.Addr]struct{}{},
|
relays: map[netip.Addr]struct{}{},
|
||||||
relayForByAddr: map[netip.Addr]*Relay{},
|
relayForByAddr: map[netip.Addr]*Relay{},
|
||||||
relayForByIdx: map[uint32]*Relay{},
|
relayForByIdx: map[uint32]*Relay{},
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"slices"
|
"slices"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/rcrowley/go-metrics"
|
"github.com/rcrowley/go-metrics"
|
||||||
@@ -45,7 +44,7 @@ type HandshakeConfig struct {
|
|||||||
|
|
||||||
type HandshakeManager struct {
|
type HandshakeManager struct {
|
||||||
// Mutex for interacting with the vpnIps and indexes maps
|
// Mutex for interacting with the vpnIps and indexes maps
|
||||||
sync.RWMutex
|
syncRWMutex
|
||||||
|
|
||||||
vpnIps map[netip.Addr]*HandshakeHostInfo
|
vpnIps map[netip.Addr]*HandshakeHostInfo
|
||||||
indexes map[uint32]*HandshakeHostInfo
|
indexes map[uint32]*HandshakeHostInfo
|
||||||
@@ -66,7 +65,7 @@ type HandshakeManager struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type HandshakeHostInfo struct {
|
type HandshakeHostInfo struct {
|
||||||
sync.Mutex
|
syncMutex
|
||||||
|
|
||||||
startTime time.Time // Time that we first started trying with this handshake
|
startTime time.Time // Time that we first started trying with this handshake
|
||||||
ready bool // Is the handshake ready
|
ready bool // Is the handshake ready
|
||||||
@@ -104,6 +103,7 @@ func (hh *HandshakeHostInfo) cachePacket(l *logrus.Logger, t header.MessageType,
|
|||||||
|
|
||||||
func NewHandshakeManager(l *logrus.Logger, mainHostMap *HostMap, lightHouse *LightHouse, outside udp.Conn, config HandshakeConfig) *HandshakeManager {
|
func NewHandshakeManager(l *logrus.Logger, mainHostMap *HostMap, lightHouse *LightHouse, outside udp.Conn, config HandshakeConfig) *HandshakeManager {
|
||||||
return &HandshakeManager{
|
return &HandshakeManager{
|
||||||
|
syncRWMutex: newSyncRWMutex("handshake-manager"),
|
||||||
vpnIps: map[netip.Addr]*HandshakeHostInfo{},
|
vpnIps: map[netip.Addr]*HandshakeHostInfo{},
|
||||||
indexes: map[uint32]*HandshakeHostInfo{},
|
indexes: map[uint32]*HandshakeHostInfo{},
|
||||||
mainHostMap: mainHostMap,
|
mainHostMap: mainHostMap,
|
||||||
@@ -111,7 +111,7 @@ func NewHandshakeManager(l *logrus.Logger, mainHostMap *HostMap, lightHouse *Lig
|
|||||||
outside: outside,
|
outside: outside,
|
||||||
config: config,
|
config: config,
|
||||||
trigger: make(chan netip.Addr, config.triggerBuffer),
|
trigger: make(chan netip.Addr, config.triggerBuffer),
|
||||||
OutboundHandshakeTimer: NewLockingTimerWheel[netip.Addr](config.tryInterval, hsTimeout(config.retries, config.tryInterval)),
|
OutboundHandshakeTimer: NewLockingTimerWheel[netip.Addr]("handshake-manager-timer", config.tryInterval, hsTimeout(config.retries, config.tryInterval)),
|
||||||
messageMetrics: config.messageMetrics,
|
messageMetrics: config.messageMetrics,
|
||||||
metricInitiated: metrics.GetOrRegisterCounter("handshake_manager.initiated", nil),
|
metricInitiated: metrics.GetOrRegisterCounter("handshake_manager.initiated", nil),
|
||||||
metricTimedOut: metrics.GetOrRegisterCounter("handshake_manager.timed_out", nil),
|
metricTimedOut: metrics.GetOrRegisterCounter("handshake_manager.timed_out", nil),
|
||||||
@@ -448,9 +448,11 @@ func (hm *HandshakeManager) StartHandshake(vpnAddr netip.Addr, cacheCb func(*Han
|
|||||||
}
|
}
|
||||||
|
|
||||||
hostinfo := &HostInfo{
|
hostinfo := &HostInfo{
|
||||||
|
syncRWMutex: newSyncRWMutex("hostinfo"),
|
||||||
vpnAddrs: []netip.Addr{vpnAddr},
|
vpnAddrs: []netip.Addr{vpnAddr},
|
||||||
HandshakePacket: make(map[uint8][]byte, 0),
|
HandshakePacket: make(map[uint8][]byte, 0),
|
||||||
relayState: RelayState{
|
relayState: RelayState{
|
||||||
|
syncRWMutex: newSyncRWMutex("relay-state"),
|
||||||
relays: map[netip.Addr]struct{}{},
|
relays: map[netip.Addr]struct{}{},
|
||||||
relayForByAddr: map[netip.Addr]*Relay{},
|
relayForByAddr: map[netip.Addr]*Relay{},
|
||||||
relayForByIdx: map[uint32]*Relay{},
|
relayForByIdx: map[uint32]*Relay{},
|
||||||
@@ -458,6 +460,7 @@ func (hm *HandshakeManager) StartHandshake(vpnAddr netip.Addr, cacheCb func(*Han
|
|||||||
}
|
}
|
||||||
|
|
||||||
hh := &HandshakeHostInfo{
|
hh := &HandshakeHostInfo{
|
||||||
|
syncMutex: newSyncMutex("handshake-hostinfo"),
|
||||||
hostinfo: hostinfo,
|
hostinfo: hostinfo,
|
||||||
startTime: time.Now(),
|
startTime: time.Now(),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"net"
|
"net"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -53,7 +52,7 @@ type Relay struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type HostMap struct {
|
type HostMap struct {
|
||||||
sync.RWMutex //Because we concurrently read and write to our maps
|
syncRWMutex //Because we concurrently read and write to our maps
|
||||||
Indexes map[uint32]*HostInfo
|
Indexes map[uint32]*HostInfo
|
||||||
Relays map[uint32]*HostInfo // Maps a Relay IDX to a Relay HostInfo object
|
Relays map[uint32]*HostInfo // Maps a Relay IDX to a Relay HostInfo object
|
||||||
RemoteIndexes map[uint32]*HostInfo
|
RemoteIndexes map[uint32]*HostInfo
|
||||||
@@ -66,7 +65,7 @@ type HostMap struct {
|
|||||||
// struct, make a copy of an existing value, edit the fileds in the copy, and
|
// struct, make a copy of an existing value, edit the fileds in the copy, and
|
||||||
// then store a pointer to the new copy in both realyForBy* maps.
|
// then store a pointer to the new copy in both realyForBy* maps.
|
||||||
type RelayState struct {
|
type RelayState struct {
|
||||||
sync.RWMutex
|
syncRWMutex
|
||||||
|
|
||||||
relays map[netip.Addr]struct{} // Set of vpnAddr's of Hosts to use as relays to access this peer
|
relays map[netip.Addr]struct{} // Set of vpnAddr's of Hosts to use as relays to access this peer
|
||||||
// For data race avoidance, the contents of a *Relay are treated immutably. To update a *Relay, copy the existing data,
|
// For data race avoidance, the contents of a *Relay are treated immutably. To update a *Relay, copy the existing data,
|
||||||
@@ -209,6 +208,7 @@ func (rs *RelayState) InsertRelay(ip netip.Addr, idx uint32, r *Relay) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type HostInfo struct {
|
type HostInfo struct {
|
||||||
|
syncRWMutex
|
||||||
remote netip.AddrPort
|
remote netip.AddrPort
|
||||||
remotes *RemoteList
|
remotes *RemoteList
|
||||||
promoteCounter atomic.Uint32
|
promoteCounter atomic.Uint32
|
||||||
@@ -288,6 +288,7 @@ func NewHostMapFromConfig(l *logrus.Logger, c *config.C) *HostMap {
|
|||||||
|
|
||||||
func newHostMap(l *logrus.Logger) *HostMap {
|
func newHostMap(l *logrus.Logger) *HostMap {
|
||||||
return &HostMap{
|
return &HostMap{
|
||||||
|
syncRWMutex: newSyncRWMutex("hostmap"),
|
||||||
Indexes: map[uint32]*HostInfo{},
|
Indexes: map[uint32]*HostInfo{},
|
||||||
Relays: map[uint32]*HostInfo{},
|
Relays: map[uint32]*HostInfo{},
|
||||||
RemoteIndexes: map[uint32]*HostInfo{},
|
RemoteIndexes: map[uint32]*HostInfo{},
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ import (
|
|||||||
"net/netip"
|
"net/netip"
|
||||||
"slices"
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -27,7 +26,7 @@ var ErrHostNotKnown = errors.New("host not known")
|
|||||||
|
|
||||||
type LightHouse struct {
|
type LightHouse struct {
|
||||||
//TODO: We need a timer wheel to kick out vpnAddrs that haven't reported in a long time
|
//TODO: We need a timer wheel to kick out vpnAddrs that haven't reported in a long time
|
||||||
sync.RWMutex //Because we concurrently read and write to our maps
|
syncRWMutex //Because we concurrently read and write to our maps
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
amLighthouse bool
|
amLighthouse bool
|
||||||
|
|
||||||
@@ -96,6 +95,7 @@ func NewLightHouseFromConfig(ctx context.Context, l *logrus.Logger, c *config.C,
|
|||||||
}
|
}
|
||||||
|
|
||||||
h := LightHouse{
|
h := LightHouse{
|
||||||
|
syncRWMutex: newSyncRWMutex("lighthouse"),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
amLighthouse: amLighthouse,
|
amLighthouse: amLighthouse,
|
||||||
myVpnNetworks: cs.myVpnNetworks,
|
myVpnNetworks: cs.myVpnNetworks,
|
||||||
@@ -475,6 +475,7 @@ func (lh *LightHouse) QueryServer(vpnAddr netip.Addr) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
chanDebugSend("lighthouse-query-chan")
|
||||||
lh.queryChan <- vpnAddr
|
lh.queryChan <- vpnAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -731,6 +732,8 @@ func (lh *LightHouse) startQueryWorker() {
|
|||||||
nb := make([]byte, 12, 12)
|
nb := make([]byte, 12, 12)
|
||||||
out := make([]byte, mtu)
|
out := make([]byte, mtu)
|
||||||
|
|
||||||
|
chanDebugRecv("lighthouse-query-chan")
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-lh.ctx.Done():
|
case <-lh.ctx.Done():
|
||||||
|
|||||||
195
mutex_debug.go
Normal file
195
mutex_debug.go
Normal file
@@ -0,0 +1,195 @@
|
|||||||
|
//go:build mutex_debug
|
||||||
|
// +build mutex_debug
|
||||||
|
|
||||||
|
package nebula
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/clarkmcc/go-dag"
|
||||||
|
"github.com/timandy/routine"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mutexKey = string
|
||||||
|
|
||||||
|
// For each Key in this map, the Value is a list of lock types you can already have
|
||||||
|
// when you want to grab that Key. This ensures that locks are always fetched
|
||||||
|
// in the same order, to prevent deadlocks.
|
||||||
|
var allowedConcurrentLocks = map[mutexKey][]mutexKey{
|
||||||
|
"connection-manager-in": {"hostmap"},
|
||||||
|
"connection-manager-out": {"connection-manager-in", "handshake-hostinfo", "handshake-manager"},
|
||||||
|
"connection-manager-relay-used": {"handshake-hostinfo"},
|
||||||
|
"connection-manager-timer": {"connection-manager-out"},
|
||||||
|
// "connection-state-write": {"hostmap"},
|
||||||
|
"firewall-conntrack": {"handshake-hostinfo"},
|
||||||
|
"handshake-manager": {"handshake-hostinfo", "hostmap"},
|
||||||
|
"handshake-manager-timer": {"handshake-manager"},
|
||||||
|
"hostmap": {"lighthouse-query-chan", "handshake-hostinfo"},
|
||||||
|
"lighthouse": {"handshake-hostinfo"},
|
||||||
|
"relay-state": {"connection-manager-relay-used", "hostmap"},
|
||||||
|
"remote-list": {"lighthouse", "handshake-manager"},
|
||||||
|
"lighthouse-query-chan": {"handshake-hostinfo"},
|
||||||
|
}
|
||||||
|
|
||||||
|
type mutexValue struct {
|
||||||
|
file string
|
||||||
|
line int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mutexValue) String() string {
|
||||||
|
return fmt.Sprintf("%s:%d", m.file, m.line)
|
||||||
|
}
|
||||||
|
|
||||||
|
var threadLocal = routine.NewThreadLocalWithInitial[map[mutexKey]mutexValue](func() map[mutexKey]mutexValue { return map[mutexKey]mutexValue{} })
|
||||||
|
|
||||||
|
var allowedDAG dag.AcyclicGraph
|
||||||
|
|
||||||
|
// We build a directed acyclic graph to assert that the locks can only be
|
||||||
|
// acquired in a determined order, If there are cycles in the DAG, then we
|
||||||
|
// know that the locking order is not guaranteed.
|
||||||
|
func init() {
|
||||||
|
for k, v := range allowedConcurrentLocks {
|
||||||
|
allowedDAG.Add(k)
|
||||||
|
for _, t := range v {
|
||||||
|
allowedDAG.Add(t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for k, v := range allowedConcurrentLocks {
|
||||||
|
for _, t := range v {
|
||||||
|
allowedDAG.Connect(dag.BasicEdge(k, t))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if cycles := allowedDAG.Cycles(); len(cycles) > 0 {
|
||||||
|
panic(fmt.Errorf("Cycles found in allowedConcurrentLocks: %v", cycles))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rebuild allowedConcurrentLocks as a flattened list of all possibilities
|
||||||
|
for k := range allowedConcurrentLocks {
|
||||||
|
ancestors, err := allowedDAG.Ancestors(k)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var allowed []mutexKey
|
||||||
|
for t := range ancestors {
|
||||||
|
allowed = append(allowed, t.(mutexKey))
|
||||||
|
}
|
||||||
|
allowedConcurrentLocks[k] = allowed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type syncRWMutex struct {
|
||||||
|
sync.RWMutex
|
||||||
|
mutexKey
|
||||||
|
}
|
||||||
|
|
||||||
|
type syncMutex struct {
|
||||||
|
sync.Mutex
|
||||||
|
mutexKey
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSyncRWMutex(key mutexKey) syncRWMutex {
|
||||||
|
return syncRWMutex{
|
||||||
|
mutexKey: key,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSyncMutex(key mutexKey) syncMutex {
|
||||||
|
return syncMutex{
|
||||||
|
mutexKey: key,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func alertMutex(err error) {
|
||||||
|
panic(err)
|
||||||
|
// NOTE: you could switch to this log Line and remove the panic if you want
|
||||||
|
// to log all failures instead of panicking on the first one
|
||||||
|
//log.Print(err, string(debug.Stack()))
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkMutex(state map[mutexKey]mutexValue, add mutexKey) {
|
||||||
|
if add == "" {
|
||||||
|
alertMutex(fmt.Errorf("mutex not initialized with mutexKey"))
|
||||||
|
}
|
||||||
|
|
||||||
|
allowedConcurrent := allowedConcurrentLocks[add]
|
||||||
|
|
||||||
|
for k, v := range state {
|
||||||
|
if add == k {
|
||||||
|
alertMutex(fmt.Errorf("re-entrant lock: %s. previous allocation: %s", add, v))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO use slices.Contains, but requires go1.21
|
||||||
|
var found bool
|
||||||
|
for _, a := range allowedConcurrent {
|
||||||
|
if a == k {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
alertMutex(fmt.Errorf("grabbing %s lock and already have these locks: %s", add, state))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func chanDebugRecv(key mutexKey) {
|
||||||
|
m := threadLocal.Get()
|
||||||
|
checkMutex(m, key)
|
||||||
|
v := mutexValue{}
|
||||||
|
_, v.file, v.line, _ = runtime.Caller(1)
|
||||||
|
m[key] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func chanDebugSend(key mutexKey) {
|
||||||
|
m := threadLocal.Get()
|
||||||
|
checkMutex(m, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncRWMutex) Lock() {
|
||||||
|
m := threadLocal.Get()
|
||||||
|
checkMutex(m, s.mutexKey)
|
||||||
|
v := mutexValue{}
|
||||||
|
_, v.file, v.line, _ = runtime.Caller(1)
|
||||||
|
m[s.mutexKey] = v
|
||||||
|
s.RWMutex.Lock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncRWMutex) Unlock() {
|
||||||
|
m := threadLocal.Get()
|
||||||
|
delete(m, s.mutexKey)
|
||||||
|
s.RWMutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncRWMutex) RLock() {
|
||||||
|
m := threadLocal.Get()
|
||||||
|
checkMutex(m, s.mutexKey)
|
||||||
|
v := mutexValue{}
|
||||||
|
_, v.file, v.line, _ = runtime.Caller(1)
|
||||||
|
m[s.mutexKey] = v
|
||||||
|
s.RWMutex.RLock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncRWMutex) RUnlock() {
|
||||||
|
m := threadLocal.Get()
|
||||||
|
delete(m, s.mutexKey)
|
||||||
|
s.RWMutex.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncMutex) Lock() {
|
||||||
|
m := threadLocal.Get()
|
||||||
|
checkMutex(m, s.mutexKey)
|
||||||
|
v := mutexValue{}
|
||||||
|
_, v.file, v.line, _ = runtime.Caller(1)
|
||||||
|
m[s.mutexKey] = v
|
||||||
|
s.Mutex.Lock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncMutex) Unlock() {
|
||||||
|
m := threadLocal.Get()
|
||||||
|
delete(m, s.mutexKey)
|
||||||
|
s.Mutex.Unlock()
|
||||||
|
}
|
||||||
23
mutex_nodebug.go
Normal file
23
mutex_nodebug.go
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
//go:build !mutex_debug
|
||||||
|
// +build !mutex_debug
|
||||||
|
|
||||||
|
package nebula
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mutexKey = string
|
||||||
|
type syncRWMutex = sync.RWMutex
|
||||||
|
type syncMutex = sync.Mutex
|
||||||
|
|
||||||
|
func newSyncRWMutex(mutexKey) syncRWMutex {
|
||||||
|
return sync.RWMutex{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSyncMutex(mutexKey) syncMutex {
|
||||||
|
return sync.Mutex{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func chanDebugRecv(key mutexKey) {}
|
||||||
|
func chanDebugSend(key mutexKey) {}
|
||||||
@@ -7,7 +7,6 @@ import (
|
|||||||
"slices"
|
"slices"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -185,7 +184,7 @@ func (hr *hostnamesResults) GetAddrs() []netip.AddrPort {
|
|||||||
// It serves as a local cache of query replies, host update notifications, and locally learned addresses
|
// It serves as a local cache of query replies, host update notifications, and locally learned addresses
|
||||||
type RemoteList struct {
|
type RemoteList struct {
|
||||||
// Every interaction with internals requires a lock!
|
// Every interaction with internals requires a lock!
|
||||||
sync.RWMutex
|
syncRWMutex
|
||||||
|
|
||||||
// The full list of vpn addresses assigned to this host
|
// The full list of vpn addresses assigned to this host
|
||||||
vpnAddrs []netip.Addr
|
vpnAddrs []netip.Addr
|
||||||
@@ -215,11 +214,12 @@ type RemoteList struct {
|
|||||||
// NewRemoteList creates a new empty RemoteList
|
// NewRemoteList creates a new empty RemoteList
|
||||||
func NewRemoteList(vpnAddrs []netip.Addr, shouldAdd func(netip.Addr) bool) *RemoteList {
|
func NewRemoteList(vpnAddrs []netip.Addr, shouldAdd func(netip.Addr) bool) *RemoteList {
|
||||||
r := &RemoteList{
|
r := &RemoteList{
|
||||||
vpnAddrs: make([]netip.Addr, len(vpnAddrs)),
|
syncRWMutex: newSyncRWMutex("remote-list"),
|
||||||
addrs: make([]netip.AddrPort, 0),
|
vpnAddrs: make([]netip.Addr, len(vpnAddrs)),
|
||||||
relays: make([]netip.Addr, 0),
|
addrs: make([]netip.AddrPort, 0),
|
||||||
cache: make(map[netip.Addr]*cache),
|
relays: make([]netip.Addr, 0),
|
||||||
shouldAdd: shouldAdd,
|
cache: make(map[netip.Addr]*cache),
|
||||||
|
shouldAdd: shouldAdd,
|
||||||
}
|
}
|
||||||
copy(r.vpnAddrs, vpnAddrs)
|
copy(r.vpnAddrs, vpnAddrs)
|
||||||
return r
|
return r
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package nebula
|
package nebula
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -34,7 +33,7 @@ type TimerWheel[T any] struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type LockingTimerWheel[T any] struct {
|
type LockingTimerWheel[T any] struct {
|
||||||
m sync.Mutex
|
m syncMutex
|
||||||
t *TimerWheel[T]
|
t *TimerWheel[T]
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -81,8 +80,9 @@ func NewTimerWheel[T any](min, max time.Duration) *TimerWheel[T] {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penalty
|
// NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penalty
|
||||||
func NewLockingTimerWheel[T any](min, max time.Duration) *LockingTimerWheel[T] {
|
func NewLockingTimerWheel[T any](name string, min, max time.Duration) *LockingTimerWheel[T] {
|
||||||
return &LockingTimerWheel[T]{
|
return &LockingTimerWheel[T]{
|
||||||
|
m: newSyncMutex(name),
|
||||||
t: NewTimerWheel[T](min, max),
|
t: NewTimerWheel[T](min, max),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user