mirror of
https://github.com/slackhq/nebula.git
synced 2025-11-22 08:24:25 +01:00
Remove handshake race avoidance (#820)
Co-authored-by: Wade Simmons <wadey@slack-corp.com>
This commit is contained in:
@@ -53,6 +53,10 @@ type HandshakeManager struct {
|
||||
metricTimedOut metrics.Counter
|
||||
l *logrus.Logger
|
||||
|
||||
// vpnIps is another map similar to the pending hostmap but tracks entries in the wheel instead
|
||||
// this is to avoid situations where the same vpn ip enters the wheel and causes rapid fire handshaking
|
||||
vpnIps map[iputil.VpnIp]struct{}
|
||||
|
||||
// can be used to trigger outbound handshake for the given vpnIp
|
||||
trigger chan iputil.VpnIp
|
||||
}
|
||||
@@ -66,6 +70,7 @@ func NewHandshakeManager(l *logrus.Logger, tunCidr *net.IPNet, preferredRanges [
|
||||
config: config,
|
||||
trigger: make(chan iputil.VpnIp, config.triggerBuffer),
|
||||
OutboundHandshakeTimer: NewLockingTimerWheel[iputil.VpnIp](config.tryInterval, hsTimeout(config.retries, config.tryInterval)),
|
||||
vpnIps: map[iputil.VpnIp]struct{}{},
|
||||
messageMetrics: config.messageMetrics,
|
||||
metricInitiated: metrics.GetOrRegisterCounter("handshake_manager.initiated", nil),
|
||||
metricTimedOut: metrics.GetOrRegisterCounter("handshake_manager.timed_out", nil),
|
||||
@@ -103,6 +108,7 @@ func (c *HandshakeManager) NextOutboundHandshakeTimerTick(now time.Time, f udp.E
|
||||
func (c *HandshakeManager) handleOutbound(vpnIp iputil.VpnIp, f udp.EncWriter, lighthouseTriggered bool) {
|
||||
hostinfo, err := c.pendingHostMap.QueryVpnIp(vpnIp)
|
||||
if err != nil {
|
||||
delete(c.vpnIps, vpnIp)
|
||||
return
|
||||
}
|
||||
hostinfo.Lock()
|
||||
@@ -160,7 +166,7 @@ func (c *HandshakeManager) handleOutbound(vpnIp iputil.VpnIp, f udp.EncWriter, l
|
||||
c.lightHouse.QueryServer(vpnIp, f)
|
||||
}
|
||||
|
||||
// Send a the handshake to all known ips, stage 2 takes care of assigning the hostinfo.remote based on the first to reply
|
||||
// Send the handshake to all known ips, stage 2 takes care of assigning the hostinfo.remote based on the first to reply
|
||||
var sentTo []*udp.Addr
|
||||
hostinfo.remotes.ForEach(c.pendingHostMap.preferredRanges, func(addr *udp.Addr, _ bool) {
|
||||
c.messageMetrics.Tx(header.Handshake, header.MessageSubType(hostinfo.HandshakePacket[0][1]), 1)
|
||||
@@ -260,7 +266,6 @@ func (c *HandshakeManager) handleOutbound(vpnIp iputil.VpnIp, f udp.EncWriter, l
|
||||
|
||||
// If a lighthouse triggered this attempt then we are still in the timer wheel and do not need to re-add
|
||||
if !lighthouseTriggered {
|
||||
//TODO: feel like we dupe handshake real fast in a tight loop, why?
|
||||
c.OutboundHandshakeTimer.Add(vpnIp, c.config.tryInterval*time.Duration(hostinfo.HandshakeCounter))
|
||||
}
|
||||
}
|
||||
@@ -269,7 +274,10 @@ func (c *HandshakeManager) AddVpnIp(vpnIp iputil.VpnIp, init func(*HostInfo)) *H
|
||||
hostinfo, created := c.pendingHostMap.AddVpnIp(vpnIp, init)
|
||||
|
||||
if created {
|
||||
c.OutboundHandshakeTimer.Add(vpnIp, c.config.tryInterval)
|
||||
if _, ok := c.vpnIps[vpnIp]; !ok {
|
||||
c.OutboundHandshakeTimer.Add(vpnIp, c.config.tryInterval)
|
||||
}
|
||||
c.vpnIps[vpnIp] = struct{}{}
|
||||
c.metricInitiated.Inc(1)
|
||||
}
|
||||
|
||||
@@ -280,7 +288,6 @@ var (
|
||||
ErrExistingHostInfo = errors.New("existing hostinfo")
|
||||
ErrAlreadySeen = errors.New("already seen")
|
||||
ErrLocalIndexCollision = errors.New("local index collision")
|
||||
ErrExistingHandshake = errors.New("existing handshake")
|
||||
)
|
||||
|
||||
// CheckAndComplete checks for any conflicts in the main and pending hostmap
|
||||
@@ -294,7 +301,7 @@ var (
|
||||
//
|
||||
// ErrLocalIndexCollision if we already have an entry in the main or pending
|
||||
// hostmap for the hostinfo.localIndexId.
|
||||
func (c *HandshakeManager) CheckAndComplete(hostinfo *HostInfo, handshakePacket uint8, overwrite bool, f *Interface) (*HostInfo, error) {
|
||||
func (c *HandshakeManager) CheckAndComplete(hostinfo *HostInfo, handshakePacket uint8, f *Interface) (*HostInfo, error) {
|
||||
c.pendingHostMap.Lock()
|
||||
defer c.pendingHostMap.Unlock()
|
||||
c.mainHostMap.Lock()
|
||||
@@ -303,9 +310,14 @@ func (c *HandshakeManager) CheckAndComplete(hostinfo *HostInfo, handshakePacket
|
||||
// Check if we already have a tunnel with this vpn ip
|
||||
existingHostInfo, found := c.mainHostMap.Hosts[hostinfo.vpnIp]
|
||||
if found && existingHostInfo != nil {
|
||||
// Is it just a delayed handshake packet?
|
||||
if bytes.Equal(hostinfo.HandshakePacket[handshakePacket], existingHostInfo.HandshakePacket[handshakePacket]) {
|
||||
return existingHostInfo, ErrAlreadySeen
|
||||
testHostInfo := existingHostInfo
|
||||
for testHostInfo != nil {
|
||||
// Is it just a delayed handshake packet?
|
||||
if bytes.Equal(hostinfo.HandshakePacket[handshakePacket], existingHostInfo.HandshakePacket[handshakePacket]) {
|
||||
return existingHostInfo, ErrAlreadySeen
|
||||
}
|
||||
|
||||
testHostInfo = testHostInfo.next
|
||||
}
|
||||
|
||||
// Is this a newer handshake?
|
||||
@@ -337,56 +349,19 @@ func (c *HandshakeManager) CheckAndComplete(hostinfo *HostInfo, handshakePacket
|
||||
Info("New host shadows existing host remoteIndex")
|
||||
}
|
||||
|
||||
// Check if we are also handshaking with this vpn ip
|
||||
pendingHostInfo, found := c.pendingHostMap.Hosts[hostinfo.vpnIp]
|
||||
if found && pendingHostInfo != nil {
|
||||
if !overwrite {
|
||||
// We won, let our pending handshake win
|
||||
return pendingHostInfo, ErrExistingHandshake
|
||||
}
|
||||
|
||||
// We lost, take this handshake and move any cached packets over so they get sent
|
||||
pendingHostInfo.ConnectionState.queueLock.Lock()
|
||||
hostinfo.packetStore = append(hostinfo.packetStore, pendingHostInfo.packetStore...)
|
||||
c.pendingHostMap.unlockedDeleteHostInfo(pendingHostInfo)
|
||||
pendingHostInfo.ConnectionState.queueLock.Unlock()
|
||||
pendingHostInfo.logger(c.l).Info("Handshake race lost, replacing pending handshake with completed tunnel")
|
||||
}
|
||||
|
||||
if existingHostInfo != nil {
|
||||
// We are going to overwrite this entry, so remove the old references
|
||||
delete(c.mainHostMap.Hosts, existingHostInfo.vpnIp)
|
||||
delete(c.mainHostMap.Indexes, existingHostInfo.localIndexId)
|
||||
delete(c.mainHostMap.RemoteIndexes, existingHostInfo.remoteIndexId)
|
||||
for _, relayIdx := range existingHostInfo.relayState.CopyRelayForIdxs() {
|
||||
delete(c.mainHostMap.Relays, relayIdx)
|
||||
}
|
||||
}
|
||||
|
||||
c.mainHostMap.addHostInfo(hostinfo, f)
|
||||
c.mainHostMap.unlockedAddHostInfo(hostinfo, f)
|
||||
return existingHostInfo, nil
|
||||
}
|
||||
|
||||
// Complete is a simpler version of CheckAndComplete when we already know we
|
||||
// won't have a localIndexId collision because we already have an entry in the
|
||||
// pendingHostMap
|
||||
func (c *HandshakeManager) Complete(hostinfo *HostInfo, f *Interface) {
|
||||
// pendingHostMap. An existing hostinfo is returned if there was one.
|
||||
func (c *HandshakeManager) Complete(hostinfo *HostInfo, f *Interface) *HostInfo {
|
||||
c.pendingHostMap.Lock()
|
||||
defer c.pendingHostMap.Unlock()
|
||||
c.mainHostMap.Lock()
|
||||
defer c.mainHostMap.Unlock()
|
||||
|
||||
existingHostInfo, found := c.mainHostMap.Hosts[hostinfo.vpnIp]
|
||||
if found && existingHostInfo != nil {
|
||||
// We are going to overwrite this entry, so remove the old references
|
||||
delete(c.mainHostMap.Hosts, existingHostInfo.vpnIp)
|
||||
delete(c.mainHostMap.Indexes, existingHostInfo.localIndexId)
|
||||
delete(c.mainHostMap.RemoteIndexes, existingHostInfo.remoteIndexId)
|
||||
for _, relayIdx := range existingHostInfo.relayState.CopyRelayForIdxs() {
|
||||
delete(c.mainHostMap.Relays, relayIdx)
|
||||
}
|
||||
}
|
||||
|
||||
existingRemoteIndex, found := c.mainHostMap.RemoteIndexes[hostinfo.remoteIndexId]
|
||||
if found && existingRemoteIndex != nil {
|
||||
// We have a collision, but this can happen since we can't control
|
||||
@@ -396,8 +371,10 @@ func (c *HandshakeManager) Complete(hostinfo *HostInfo, f *Interface) {
|
||||
Info("New host shadows existing host remoteIndex")
|
||||
}
|
||||
|
||||
c.mainHostMap.addHostInfo(hostinfo, f)
|
||||
existingHostInfo := c.mainHostMap.Hosts[hostinfo.vpnIp]
|
||||
c.mainHostMap.unlockedAddHostInfo(hostinfo, f)
|
||||
c.pendingHostMap.unlockedDeleteHostInfo(hostinfo)
|
||||
return existingHostInfo
|
||||
}
|
||||
|
||||
// AddIndexHostInfo generates a unique localIndexId for this HostInfo
|
||||
|
||||
Reference in New Issue
Block a user