mirror of
https://github.com/slackhq/nebula.git
synced 2025-11-08 20:13:57 +01:00
699 lines
20 KiB
Go
699 lines
20 KiB
Go
package nebula
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"net/netip"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rcrowley/go-metrics"
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/slackhq/nebula/cert"
|
|
"github.com/slackhq/nebula/config"
|
|
"github.com/slackhq/nebula/header"
|
|
)
|
|
|
|
type trafficDecision int
|
|
|
|
const (
|
|
doNothing trafficDecision = 0
|
|
deleteTunnel trafficDecision = 1 // delete the hostinfo on our side, do not notify the remote
|
|
closeTunnel trafficDecision = 2 // delete the hostinfo and notify the remote
|
|
swapPrimary trafficDecision = 3
|
|
migrateRelays trafficDecision = 4
|
|
tryRehandshake trafficDecision = 5
|
|
sendTestPacket trafficDecision = 6
|
|
)
|
|
|
|
// LastCommunication tracks when we last communicated with a host
|
|
type LastCommunication struct {
|
|
timestamp time.Time
|
|
vpnIp netip.Addr // To help with logging
|
|
}
|
|
|
|
type connectionManager struct {
|
|
in map[uint32]struct{}
|
|
inLock *sync.RWMutex
|
|
|
|
out map[uint32]struct{}
|
|
outLock *sync.RWMutex
|
|
|
|
// relayUsed holds which relay localIndexs are in use
|
|
relayUsed map[uint32]struct{}
|
|
relayUsedLock *sync.RWMutex
|
|
|
|
// Track last communication with hosts
|
|
lastCommMap map[uint32]time.Time
|
|
lastCommLock *sync.RWMutex
|
|
inactivityTimer *LockingTimerWheel[uint32]
|
|
inactivityTimeout time.Duration
|
|
|
|
hostMap *HostMap
|
|
trafficTimer *LockingTimerWheel[uint32]
|
|
intf *Interface
|
|
pendingDeletion map[uint32]struct{}
|
|
punchy *Punchy
|
|
checkInterval time.Duration
|
|
pendingDeletionInterval time.Duration
|
|
metricsTxPunchy metrics.Counter
|
|
|
|
l *logrus.Logger
|
|
}
|
|
|
|
func newConnectionManager(ctx context.Context, l *logrus.Logger, intf *Interface, checkInterval, pendingDeletionInterval time.Duration, punchy *Punchy) *connectionManager {
|
|
var max time.Duration
|
|
if checkInterval < pendingDeletionInterval {
|
|
max = pendingDeletionInterval
|
|
} else {
|
|
max = checkInterval
|
|
}
|
|
|
|
nc := &connectionManager{
|
|
hostMap: intf.hostMap,
|
|
in: make(map[uint32]struct{}),
|
|
inLock: &sync.RWMutex{},
|
|
out: make(map[uint32]struct{}),
|
|
outLock: &sync.RWMutex{},
|
|
relayUsed: make(map[uint32]struct{}),
|
|
relayUsedLock: &sync.RWMutex{},
|
|
lastCommMap: make(map[uint32]time.Time),
|
|
lastCommLock: &sync.RWMutex{},
|
|
inactivityTimeout: 1 * time.Minute, // Default inactivity timeout: 10 minutes
|
|
trafficTimer: NewLockingTimerWheel[uint32](time.Millisecond*500, max),
|
|
intf: intf,
|
|
pendingDeletion: make(map[uint32]struct{}),
|
|
checkInterval: checkInterval,
|
|
pendingDeletionInterval: pendingDeletionInterval,
|
|
punchy: punchy,
|
|
metricsTxPunchy: metrics.GetOrRegisterCounter("messages.tx.punchy", nil),
|
|
l: l,
|
|
}
|
|
|
|
// Initialize the inactivity timer wheel - make wheel duration slightly longer than the timeout
|
|
nc.inactivityTimer = NewLockingTimerWheel[uint32](time.Minute, nc.inactivityTimeout+time.Minute)
|
|
|
|
nc.Start(ctx)
|
|
return nc
|
|
}
|
|
|
|
func (n *connectionManager) updateLastCommunication(localIndex uint32) {
|
|
// Get host info to record VPN IP for better logging
|
|
hostInfo := n.hostMap.QueryIndex(localIndex)
|
|
if hostInfo == nil {
|
|
return
|
|
}
|
|
|
|
now := time.Now()
|
|
n.lastCommLock.Lock()
|
|
n.lastCommMap[localIndex] = now
|
|
n.lastCommLock.Unlock()
|
|
|
|
// Reset the inactivity timer for this host
|
|
n.inactivityTimer.m.Lock()
|
|
n.inactivityTimer.t.Add(localIndex, n.inactivityTimeout)
|
|
n.inactivityTimer.m.Unlock()
|
|
}
|
|
|
|
func (n *connectionManager) In(localIndex uint32) {
|
|
n.inLock.RLock()
|
|
// If this already exists, return
|
|
if _, ok := n.in[localIndex]; ok {
|
|
n.inLock.RUnlock()
|
|
return
|
|
}
|
|
n.inLock.RUnlock()
|
|
n.inLock.Lock()
|
|
n.in[localIndex] = struct{}{}
|
|
n.inLock.Unlock()
|
|
|
|
// Update last communication time
|
|
n.updateLastCommunication(localIndex)
|
|
}
|
|
|
|
func (n *connectionManager) Out(localIndex uint32) {
|
|
n.outLock.RLock()
|
|
// If this already exists, return
|
|
if _, ok := n.out[localIndex]; ok {
|
|
n.outLock.RUnlock()
|
|
return
|
|
}
|
|
n.outLock.RUnlock()
|
|
n.outLock.Lock()
|
|
n.out[localIndex] = struct{}{}
|
|
n.outLock.Unlock()
|
|
|
|
// Update last communication time
|
|
n.updateLastCommunication(localIndex)
|
|
}
|
|
|
|
func (n *connectionManager) RelayUsed(localIndex uint32) {
|
|
n.relayUsedLock.RLock()
|
|
// If this already exists, return
|
|
if _, ok := n.relayUsed[localIndex]; ok {
|
|
n.relayUsedLock.RUnlock()
|
|
return
|
|
}
|
|
n.relayUsedLock.RUnlock()
|
|
n.relayUsedLock.Lock()
|
|
n.relayUsed[localIndex] = struct{}{}
|
|
n.relayUsedLock.Unlock()
|
|
}
|
|
|
|
// getAndResetTrafficCheck returns if there was any inbound or outbound traffic within the last tick and
|
|
// resets the state for this local index
|
|
func (n *connectionManager) getAndResetTrafficCheck(localIndex uint32) (bool, bool) {
|
|
n.inLock.Lock()
|
|
n.outLock.Lock()
|
|
_, in := n.in[localIndex]
|
|
_, out := n.out[localIndex]
|
|
delete(n.in, localIndex)
|
|
delete(n.out, localIndex)
|
|
n.inLock.Unlock()
|
|
n.outLock.Unlock()
|
|
return in, out
|
|
}
|
|
|
|
func (n *connectionManager) AddTrafficWatch(localIndex uint32) {
|
|
// Use a write lock directly because it should be incredibly rare that we are ever already tracking this index
|
|
n.outLock.Lock()
|
|
if _, ok := n.out[localIndex]; ok {
|
|
n.outLock.Unlock()
|
|
return
|
|
}
|
|
n.out[localIndex] = struct{}{}
|
|
n.trafficTimer.Add(localIndex, n.checkInterval)
|
|
n.outLock.Unlock()
|
|
}
|
|
|
|
// checkInactiveTunnels checks for tunnels that have been inactive for too long and drops them
|
|
func (n *connectionManager) checkInactiveTunnels() {
|
|
now := time.Now()
|
|
|
|
// First, advance the timer wheel to the current time
|
|
n.inactivityTimer.m.Lock()
|
|
n.inactivityTimer.t.Advance(now)
|
|
n.inactivityTimer.m.Unlock()
|
|
|
|
// Check for expired timers (inactive connections)
|
|
for {
|
|
// Get the next expired tunnel
|
|
n.inactivityTimer.m.Lock()
|
|
localIndex, ok := n.inactivityTimer.t.Purge()
|
|
n.inactivityTimer.m.Unlock()
|
|
|
|
if !ok {
|
|
// No more expired timers
|
|
break
|
|
}
|
|
|
|
n.lastCommLock.RLock()
|
|
lastComm, exists := n.lastCommMap[localIndex]
|
|
n.lastCommLock.RUnlock()
|
|
|
|
if !exists {
|
|
// No last communication record, odd but skip
|
|
continue
|
|
}
|
|
|
|
// Calculate inactivity duration
|
|
inactiveDuration := now.Sub(lastComm)
|
|
|
|
// Check if we've exceeded the inactivity timeout
|
|
if inactiveDuration >= n.inactivityTimeout {
|
|
// Get the host info (if it still exists)
|
|
hostInfo := n.hostMap.QueryIndex(localIndex)
|
|
if hostInfo == nil {
|
|
// Host info is gone, remove from our tracking map
|
|
n.lastCommLock.Lock()
|
|
delete(n.lastCommMap, localIndex)
|
|
n.lastCommLock.Unlock()
|
|
continue
|
|
}
|
|
|
|
// Log the inactivity and drop the tunnel
|
|
n.l.WithField("vpnIp", hostInfo.vpnAddrs[0]).
|
|
WithField("localIndex", localIndex).
|
|
WithField("inactiveDuration", inactiveDuration).
|
|
WithField("timeout", n.inactivityTimeout).
|
|
Info("Dropping tunnel due to inactivity")
|
|
|
|
// Close the tunnel using the existing mechanism
|
|
n.intf.closeTunnel(hostInfo)
|
|
|
|
// Clean up our tracking map
|
|
n.lastCommLock.Lock()
|
|
delete(n.lastCommMap, localIndex)
|
|
n.lastCommLock.Unlock()
|
|
} else {
|
|
// Re-add to the timer wheel with the remaining time
|
|
remainingTime := n.inactivityTimeout - inactiveDuration
|
|
n.inactivityTimer.m.Lock()
|
|
n.inactivityTimer.t.Add(localIndex, remainingTime)
|
|
n.inactivityTimer.m.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// CleanupDeletedHostInfos removes entries from our lastCommMap for hosts that no longer exist
|
|
func (n *connectionManager) CleanupDeletedHostInfos() {
|
|
n.lastCommLock.Lock()
|
|
defer n.lastCommLock.Unlock()
|
|
|
|
// Find indexes to delete
|
|
var toDelete []uint32
|
|
for localIndex := range n.lastCommMap {
|
|
if n.hostMap.QueryIndex(localIndex) == nil {
|
|
toDelete = append(toDelete, localIndex)
|
|
}
|
|
}
|
|
|
|
// Delete them
|
|
for _, localIndex := range toDelete {
|
|
delete(n.lastCommMap, localIndex)
|
|
}
|
|
|
|
if len(toDelete) > 0 && n.l.Level >= logrus.DebugLevel {
|
|
n.l.WithField("count", len(toDelete)).Debug("Cleaned up deleted host entries from lastCommMap")
|
|
}
|
|
}
|
|
|
|
// ReloadConfig updates the connection manager configuration
|
|
func (n *connectionManager) ReloadConfig(c *config.C) {
|
|
// Get the inactivity timeout from config
|
|
inactivityTimeout := c.GetDuration("timers.inactivity_timeout", 10*time.Minute)
|
|
|
|
// Only update if different
|
|
if inactivityTimeout != n.inactivityTimeout {
|
|
n.l.WithField("old", n.inactivityTimeout).
|
|
WithField("new", inactivityTimeout).
|
|
Info("Updating inactivity timeout")
|
|
|
|
n.inactivityTimeout = inactivityTimeout
|
|
|
|
// Recreate the inactivity timer wheel with the new timeout
|
|
n.inactivityTimer = NewLockingTimerWheel[uint32](time.Minute, n.inactivityTimeout+time.Minute)
|
|
|
|
// Re-add all existing hosts to the new timer wheel
|
|
n.lastCommLock.RLock()
|
|
for localIndex, lastComm := range n.lastCommMap {
|
|
// Calculate remaining time based on last communication
|
|
now := time.Now()
|
|
elapsed := now.Sub(lastComm)
|
|
|
|
// If the elapsed time exceeds the new timeout, this will be caught
|
|
// in the next inactivity check. Otherwise, add with remaining time.
|
|
if elapsed < n.inactivityTimeout {
|
|
remainingTime := n.inactivityTimeout - elapsed
|
|
n.inactivityTimer.m.Lock()
|
|
n.inactivityTimer.t.Add(localIndex, remainingTime)
|
|
n.inactivityTimer.m.Unlock()
|
|
}
|
|
}
|
|
n.lastCommLock.RUnlock()
|
|
}
|
|
}
|
|
|
|
func (n *connectionManager) Start(ctx context.Context) {
|
|
go n.Run(ctx)
|
|
}
|
|
|
|
func (n *connectionManager) Run(ctx context.Context) {
|
|
//TODO: this tick should be based on the min wheel tick? Check firewall
|
|
clockSource := time.NewTicker(500 * time.Millisecond)
|
|
defer clockSource.Stop()
|
|
|
|
// Create ticker for inactivity checks (every minute)
|
|
inactivityTicker := time.NewTicker(time.Minute)
|
|
defer inactivityTicker.Stop()
|
|
|
|
// Create ticker for cleanup (every 5 minutes)
|
|
cleanupTicker := time.NewTicker(5 * time.Minute)
|
|
defer cleanupTicker.Stop()
|
|
|
|
p := []byte("")
|
|
nb := make([]byte, 12, 12)
|
|
out := make([]byte, mtu)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case now := <-clockSource.C:
|
|
n.trafficTimer.Advance(now)
|
|
for {
|
|
localIndex, has := n.trafficTimer.Purge()
|
|
if !has {
|
|
break
|
|
}
|
|
|
|
n.doTrafficCheck(localIndex, p, nb, out, now)
|
|
}
|
|
|
|
case <-inactivityTicker.C:
|
|
// Check for inactive tunnels
|
|
n.checkInactiveTunnels()
|
|
|
|
case <-cleanupTicker.C:
|
|
// Periodically clean up deleted hosts
|
|
n.CleanupDeletedHostInfos()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (n *connectionManager) doTrafficCheck(localIndex uint32, p, nb, out []byte, now time.Time) {
|
|
decision, hostinfo, primary := n.makeTrafficDecision(localIndex, now)
|
|
|
|
switch decision {
|
|
case deleteTunnel:
|
|
if n.hostMap.DeleteHostInfo(hostinfo) {
|
|
// Only clearing the lighthouse cache if this is the last hostinfo for this vpn ip in the hostmap
|
|
n.intf.lightHouse.DeleteVpnAddrs(hostinfo.vpnAddrs)
|
|
}
|
|
|
|
case closeTunnel:
|
|
n.intf.sendCloseTunnel(hostinfo)
|
|
n.intf.closeTunnel(hostinfo)
|
|
|
|
case swapPrimary:
|
|
n.swapPrimary(hostinfo, primary)
|
|
|
|
case migrateRelays:
|
|
n.migrateRelayUsed(hostinfo, primary)
|
|
|
|
case tryRehandshake:
|
|
n.tryRehandshake(hostinfo)
|
|
|
|
case sendTestPacket:
|
|
n.intf.SendMessageToHostInfo(header.Test, header.TestRequest, hostinfo, p, nb, out)
|
|
}
|
|
|
|
n.resetRelayTrafficCheck(hostinfo)
|
|
}
|
|
|
|
func (n *connectionManager) resetRelayTrafficCheck(hostinfo *HostInfo) {
|
|
if hostinfo != nil {
|
|
n.relayUsedLock.Lock()
|
|
defer n.relayUsedLock.Unlock()
|
|
// No need to migrate any relays, delete usage info now.
|
|
for _, idx := range hostinfo.relayState.CopyRelayForIdxs() {
|
|
delete(n.relayUsed, idx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (n *connectionManager) migrateRelayUsed(oldhostinfo, newhostinfo *HostInfo) {
|
|
relayFor := oldhostinfo.relayState.CopyAllRelayFor()
|
|
|
|
for _, r := range relayFor {
|
|
existing, ok := newhostinfo.relayState.QueryRelayForByIp(r.PeerAddr)
|
|
|
|
var index uint32
|
|
var relayFrom netip.Addr
|
|
var relayTo netip.Addr
|
|
switch {
|
|
case ok && existing.State == Established:
|
|
// This relay already exists in newhostinfo, then do nothing.
|
|
continue
|
|
case ok && existing.State == Requested:
|
|
// The relay exists in a Requested state; re-send the request
|
|
index = existing.LocalIndex
|
|
switch r.Type {
|
|
case TerminalType:
|
|
relayFrom = n.intf.myVpnAddrs[0]
|
|
relayTo = existing.PeerAddr
|
|
case ForwardingType:
|
|
relayFrom = existing.PeerAddr
|
|
relayTo = newhostinfo.vpnAddrs[0]
|
|
default:
|
|
// should never happen
|
|
}
|
|
case !ok:
|
|
n.relayUsedLock.RLock()
|
|
if _, relayUsed := n.relayUsed[r.LocalIndex]; !relayUsed {
|
|
// The relay hasn't been used; don't migrate it.
|
|
n.relayUsedLock.RUnlock()
|
|
continue
|
|
}
|
|
n.relayUsedLock.RUnlock()
|
|
// The relay doesn't exist at all; create some relay state and send the request.
|
|
var err error
|
|
index, err = AddRelay(n.l, newhostinfo, n.hostMap, r.PeerAddr, nil, r.Type, Requested)
|
|
if err != nil {
|
|
n.l.WithError(err).Error("failed to migrate relay to new hostinfo")
|
|
continue
|
|
}
|
|
switch r.Type {
|
|
case TerminalType:
|
|
relayFrom = n.intf.myVpnAddrs[0]
|
|
relayTo = r.PeerAddr
|
|
case ForwardingType:
|
|
relayFrom = r.PeerAddr
|
|
relayTo = newhostinfo.vpnAddrs[0]
|
|
default:
|
|
// should never happen
|
|
}
|
|
}
|
|
|
|
// Send a CreateRelayRequest to the peer.
|
|
req := NebulaControl{
|
|
Type: NebulaControl_CreateRelayRequest,
|
|
InitiatorRelayIndex: index,
|
|
}
|
|
|
|
switch newhostinfo.GetCert().Certificate.Version() {
|
|
case cert.Version1:
|
|
if !relayFrom.Is4() {
|
|
n.l.Error("can not migrate v1 relay with a v6 network because the relay is not running a current nebula version")
|
|
continue
|
|
}
|
|
|
|
if !relayTo.Is4() {
|
|
n.l.Error("can not migrate v1 relay with a v6 remote network because the relay is not running a current nebula version")
|
|
continue
|
|
}
|
|
|
|
b := relayFrom.As4()
|
|
req.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
|
|
b = relayTo.As4()
|
|
req.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
|
|
case cert.Version2:
|
|
req.RelayFromAddr = netAddrToProtoAddr(relayFrom)
|
|
req.RelayToAddr = netAddrToProtoAddr(relayTo)
|
|
default:
|
|
newhostinfo.logger(n.l).Error("Unknown certificate version found while attempting to migrate relay")
|
|
continue
|
|
}
|
|
|
|
msg, err := req.Marshal()
|
|
if err != nil {
|
|
n.l.WithError(err).Error("failed to marshal Control message to migrate relay")
|
|
} else {
|
|
n.intf.SendMessageToHostInfo(header.Control, 0, newhostinfo, msg, make([]byte, 12), make([]byte, mtu))
|
|
n.l.WithFields(logrus.Fields{
|
|
"relayFrom": req.RelayFromAddr,
|
|
"relayTo": req.RelayToAddr,
|
|
"initiatorRelayIndex": req.InitiatorRelayIndex,
|
|
"responderRelayIndex": req.ResponderRelayIndex,
|
|
"vpnAddrs": newhostinfo.vpnAddrs}).
|
|
Info("send CreateRelayRequest")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (n *connectionManager) makeTrafficDecision(localIndex uint32, now time.Time) (trafficDecision, *HostInfo, *HostInfo) {
|
|
n.hostMap.RLock()
|
|
defer n.hostMap.RUnlock()
|
|
|
|
hostinfo := n.hostMap.Indexes[localIndex]
|
|
if hostinfo == nil {
|
|
n.l.WithField("localIndex", localIndex).Debugf("Not found in hostmap")
|
|
delete(n.pendingDeletion, localIndex)
|
|
return doNothing, nil, nil
|
|
}
|
|
|
|
if n.isInvalidCertificate(now, hostinfo) {
|
|
delete(n.pendingDeletion, hostinfo.localIndexId)
|
|
return closeTunnel, hostinfo, nil
|
|
}
|
|
|
|
primary := n.hostMap.Hosts[hostinfo.vpnAddrs[0]]
|
|
mainHostInfo := true
|
|
if primary != nil && primary != hostinfo {
|
|
mainHostInfo = false
|
|
}
|
|
|
|
// Check for traffic on this hostinfo
|
|
inTraffic, outTraffic := n.getAndResetTrafficCheck(localIndex)
|
|
|
|
// A hostinfo is determined alive if there is incoming traffic
|
|
if inTraffic {
|
|
decision := doNothing
|
|
if n.l.Level >= logrus.DebugLevel {
|
|
hostinfo.logger(n.l).
|
|
WithField("tunnelCheck", m{"state": "alive", "method": "passive"}).
|
|
Debug("Tunnel status")
|
|
}
|
|
delete(n.pendingDeletion, hostinfo.localIndexId)
|
|
|
|
if mainHostInfo {
|
|
decision = tryRehandshake
|
|
|
|
} else {
|
|
if n.shouldSwapPrimary(hostinfo, primary) {
|
|
decision = swapPrimary
|
|
} else {
|
|
// migrate the relays to the primary, if in use.
|
|
decision = migrateRelays
|
|
}
|
|
}
|
|
|
|
n.trafficTimer.Add(hostinfo.localIndexId, n.checkInterval)
|
|
|
|
if !outTraffic {
|
|
// Send a punch packet to keep the NAT state alive
|
|
n.sendPunch(hostinfo)
|
|
}
|
|
|
|
return decision, hostinfo, primary
|
|
}
|
|
|
|
if _, ok := n.pendingDeletion[hostinfo.localIndexId]; ok {
|
|
// We have already sent a test packet and nothing was returned, this hostinfo is dead
|
|
hostinfo.logger(n.l).
|
|
WithField("tunnelCheck", m{"state": "dead", "method": "active"}).
|
|
Info("Tunnel status")
|
|
|
|
delete(n.pendingDeletion, hostinfo.localIndexId)
|
|
return deleteTunnel, hostinfo, nil
|
|
}
|
|
|
|
decision := doNothing
|
|
if hostinfo != nil && hostinfo.ConnectionState != nil && mainHostInfo {
|
|
if !outTraffic {
|
|
// If we aren't sending or receiving traffic then its an unused tunnel and we don't to test the tunnel.
|
|
// Just maintain NAT state if configured to do so.
|
|
n.sendPunch(hostinfo)
|
|
n.trafficTimer.Add(hostinfo.localIndexId, n.checkInterval)
|
|
return doNothing, nil, nil
|
|
|
|
}
|
|
|
|
if n.punchy.GetTargetEverything() {
|
|
// This is similar to the old punchy behavior with a slight optimization.
|
|
// We aren't receiving traffic but we are sending it, punch on all known
|
|
// ips in case we need to re-prime NAT state
|
|
n.sendPunch(hostinfo)
|
|
}
|
|
|
|
if n.l.Level >= logrus.DebugLevel {
|
|
hostinfo.logger(n.l).
|
|
WithField("tunnelCheck", m{"state": "testing", "method": "active"}).
|
|
Debug("Tunnel status")
|
|
}
|
|
|
|
// Send a test packet to trigger an authenticated tunnel test, this should suss out any lingering tunnel issues
|
|
decision = sendTestPacket
|
|
|
|
} else {
|
|
if n.l.Level >= logrus.DebugLevel {
|
|
hostinfo.logger(n.l).Debugf("Hostinfo sadness")
|
|
}
|
|
}
|
|
|
|
n.pendingDeletion[hostinfo.localIndexId] = struct{}{}
|
|
n.trafficTimer.Add(hostinfo.localIndexId, n.pendingDeletionInterval)
|
|
return decision, hostinfo, nil
|
|
}
|
|
|
|
func (n *connectionManager) shouldSwapPrimary(current, primary *HostInfo) bool {
|
|
// The primary tunnel is the most recent handshake to complete locally and should work entirely fine.
|
|
// If we are here then we have multiple tunnels for a host pair and neither side believes the same tunnel is primary.
|
|
// Let's sort this out.
|
|
|
|
// Only one side should swap because if both swap then we may never resolve to a single tunnel.
|
|
// vpn addr is static across all tunnels for this host pair so lets
|
|
// use that to determine if we should consider swapping.
|
|
if current.vpnAddrs[0].Compare(n.intf.myVpnAddrs[0]) < 0 {
|
|
// Their primary vpn addr is less than mine. Do not swap.
|
|
return false
|
|
}
|
|
|
|
crt := n.intf.pki.getCertState().getCertificate(current.ConnectionState.myCert.Version())
|
|
// If this tunnel is using the latest certificate then we should swap it to primary for a bit and see if things
|
|
// settle down.
|
|
return bytes.Equal(current.ConnectionState.myCert.Signature(), crt.Signature())
|
|
}
|
|
|
|
func (n *connectionManager) swapPrimary(current, primary *HostInfo) {
|
|
n.hostMap.Lock()
|
|
// Make sure the primary is still the same after the write lock. This avoids a race with a rehandshake.
|
|
if n.hostMap.Hosts[current.vpnAddrs[0]] == primary {
|
|
n.hostMap.unlockedMakePrimary(current)
|
|
}
|
|
n.hostMap.Unlock()
|
|
}
|
|
|
|
// isInvalidCertificate will check if we should destroy a tunnel if pki.disconnect_invalid is true and
|
|
// the certificate is no longer valid. Block listed certificates will skip the pki.disconnect_invalid
|
|
// check and return true.
|
|
func (n *connectionManager) isInvalidCertificate(now time.Time, hostinfo *HostInfo) bool {
|
|
remoteCert := hostinfo.GetCert()
|
|
if remoteCert == nil {
|
|
return false
|
|
}
|
|
|
|
caPool := n.intf.pki.GetCAPool()
|
|
err := caPool.VerifyCachedCertificate(now, remoteCert)
|
|
if err == nil {
|
|
return false
|
|
}
|
|
|
|
if !n.intf.disconnectInvalid.Load() && err != cert.ErrBlockListed {
|
|
// Block listed certificates should always be disconnected
|
|
return false
|
|
}
|
|
|
|
hostinfo.logger(n.l).WithError(err).
|
|
WithField("fingerprint", remoteCert.Fingerprint).
|
|
Info("Remote certificate is no longer valid, tearing down the tunnel")
|
|
|
|
return true
|
|
}
|
|
|
|
func (n *connectionManager) sendPunch(hostinfo *HostInfo) {
|
|
if !n.punchy.GetPunch() {
|
|
// Punching is disabled
|
|
return
|
|
}
|
|
|
|
if n.punchy.GetTargetEverything() {
|
|
hostinfo.remotes.ForEach(n.hostMap.GetPreferredRanges(), func(addr netip.AddrPort, preferred bool) {
|
|
n.metricsTxPunchy.Inc(1)
|
|
_ = n.intf.outside.WriteTo([]byte{1}, addr)
|
|
})
|
|
|
|
} else if hostinfo.remote.IsValid() {
|
|
n.metricsTxPunchy.Inc(1)
|
|
_ = n.intf.outside.WriteTo([]byte{1}, hostinfo.remote)
|
|
}
|
|
}
|
|
|
|
func (n *connectionManager) tryRehandshake(hostinfo *HostInfo) {
|
|
cs := n.intf.pki.getCertState()
|
|
curCrt := hostinfo.ConnectionState.myCert
|
|
myCrt := cs.getCertificate(curCrt.Version())
|
|
if curCrt.Version() >= cs.initiatingVersion && bytes.Equal(curCrt.Signature(), myCrt.Signature()) == true {
|
|
// The current tunnel is using the latest certificate and version, no need to rehandshake.
|
|
return
|
|
}
|
|
|
|
n.l.WithField("vpnAddrs", hostinfo.vpnAddrs).
|
|
WithField("reason", "local certificate is not current").
|
|
Info("Re-handshaking with remote")
|
|
|
|
n.intf.handshakeManager.StartHandshake(hostinfo.vpnAddrs[0], nil)
|
|
}
|