From 33c2d7277c3a6f43b3ef63dc87f7e4754722ca29 Mon Sep 17 00:00:00 2001 From: Nate Brown Date: Fri, 1 May 2026 13:21:38 -0500 Subject: [PATCH] Reduce HandshakeManager complexity a little bit (#1701) --- handshake_manager.go | 144 +------------------------------------ main.go | 10 +-- relay_manager.go | 166 +++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 163 insertions(+), 157 deletions(-) diff --git a/handshake_manager.go b/handshake_manager.go index 9fc69ff4..87257028 100644 --- a/handshake_manager.go +++ b/handshake_manager.go @@ -23,7 +23,6 @@ const ( DefaultHandshakeTryInterval = time.Millisecond * 100 DefaultHandshakeRetries = 10 DefaultHandshakeTriggerBuffer = 64 - DefaultUseRelays = true // maxCachedPackets is how many unsent packets we'll buffer per pending // handshake before dropping further ones. @@ -43,7 +42,6 @@ var ( tryInterval: DefaultHandshakeTryInterval, retries: DefaultHandshakeRetries, triggerBuffer: DefaultHandshakeTriggerBuffer, - useRelays: DefaultUseRelays, } ) @@ -51,7 +49,6 @@ type HandshakeConfig struct { tryInterval time.Duration retries int64 triggerBuffer int - useRelays bool messageMetrics *MessageMetrics } @@ -326,146 +323,7 @@ func (hm *HandshakeManager) handleOutbound(vpnIp netip.Addr, lighthouseTriggered ) } - if hm.config.useRelays && len(hostinfo.remotes.relays) > 0 { - hostinfo.logger(hm.l).Info("Attempt to relay through hosts", "relays", hostinfo.remotes.relays) - // Send a RelayRequest to all known Relay IP's - for _, relay := range hostinfo.remotes.relays { - // Don't relay through the host I'm trying to connect to - if relay == vpnIp { - continue - } - - // Don't relay to myself - if hm.f.myVpnAddrsTable.Contains(relay) { - continue - } - - relayHostInfo := hm.mainHostMap.QueryVpnAddr(relay) - if relayHostInfo == nil || !relayHostInfo.remote.IsValid() { - hostinfo.logger(hm.l).Info("Establish tunnel to relay target", "relay", relay.String()) - hm.f.Handshake(relay) - continue - } - // Check the relay HostInfo to see if we already established a relay through - existingRelay, ok := relayHostInfo.relayState.QueryRelayForByIp(vpnIp) - if !ok { - // No relays exist or requested yet. - if relayHostInfo.remote.IsValid() { - idx, err := AddRelay(hm.l, relayHostInfo, hm.mainHostMap, vpnIp, nil, TerminalType, Requested) - if err != nil { - hostinfo.logger(hm.l).Info("Failed to add relay to hostmap", "relay", relay.String(), "error", err) - } - - m := NebulaControl{ - Type: NebulaControl_CreateRelayRequest, - InitiatorRelayIndex: idx, - } - - switch relayHostInfo.GetCert().Certificate.Version() { - case cert.Version1: - if !hm.f.myVpnAddrs[0].Is4() { - hostinfo.logger(hm.l).Error("can not establish v1 relay with a v6 network because the relay is not running a current nebula version") - continue - } - - if !vpnIp.Is4() { - hostinfo.logger(hm.l).Error("can not establish v1 relay with a v6 remote network because the relay is not running a current nebula version") - continue - } - - b := hm.f.myVpnAddrs[0].As4() - m.OldRelayFromAddr = binary.BigEndian.Uint32(b[:]) - b = vpnIp.As4() - m.OldRelayToAddr = binary.BigEndian.Uint32(b[:]) - case cert.Version2: - m.RelayFromAddr = netAddrToProtoAddr(hm.f.myVpnAddrs[0]) - m.RelayToAddr = netAddrToProtoAddr(vpnIp) - default: - hostinfo.logger(hm.l).Error("Unknown certificate version found while creating relay") - continue - } - - msg, err := m.Marshal() - if err != nil { - hostinfo.logger(hm.l).Error("Failed to marshal Control message to create relay", "error", err) - } else { - hm.f.SendMessageToHostInfo(header.Control, 0, relayHostInfo, msg, make([]byte, 12), make([]byte, mtu)) - hm.l.Info("send CreateRelayRequest", - "relayFrom", hm.f.myVpnAddrs[0], - "relayTo", vpnIp, - "initiatorRelayIndex", idx, - "relay", relay, - ) - } - } - continue - } - - switch existingRelay.State { - case Established: - hostinfo.logger(hm.l).Info("Send handshake via relay", "relay", relay.String()) - hm.f.SendVia(relayHostInfo, existingRelay, hostinfo.HandshakePacket[handshakePacketStage0], make([]byte, 12), make([]byte, mtu), false) - case Disestablished: - // Mark this relay as 'requested' - relayHostInfo.relayState.UpdateRelayForByIpState(vpnIp, Requested) - fallthrough - case Requested: - hostinfo.logger(hm.l).Info("Re-send CreateRelay request", "relay", relay.String()) - // Re-send the CreateRelay request, in case the previous one was lost. - m := NebulaControl{ - Type: NebulaControl_CreateRelayRequest, - InitiatorRelayIndex: existingRelay.LocalIndex, - } - - switch relayHostInfo.GetCert().Certificate.Version() { - case cert.Version1: - if !hm.f.myVpnAddrs[0].Is4() { - hostinfo.logger(hm.l).Error("can not establish v1 relay with a v6 network because the relay is not running a current nebula version") - continue - } - - if !vpnIp.Is4() { - hostinfo.logger(hm.l).Error("can not establish v1 relay with a v6 remote network because the relay is not running a current nebula version") - continue - } - - b := hm.f.myVpnAddrs[0].As4() - m.OldRelayFromAddr = binary.BigEndian.Uint32(b[:]) - b = vpnIp.As4() - m.OldRelayToAddr = binary.BigEndian.Uint32(b[:]) - case cert.Version2: - m.RelayFromAddr = netAddrToProtoAddr(hm.f.myVpnAddrs[0]) - m.RelayToAddr = netAddrToProtoAddr(vpnIp) - default: - hostinfo.logger(hm.l).Error("Unknown certificate version found while creating relay") - continue - } - msg, err := m.Marshal() - if err != nil { - hostinfo.logger(hm.l).Error("Failed to marshal Control message to create relay", "error", err) - } else { - // This must send over the hostinfo, not over hm.Hosts[ip] - hm.f.SendMessageToHostInfo(header.Control, 0, relayHostInfo, msg, make([]byte, 12), make([]byte, mtu)) - hm.l.Info("send CreateRelayRequest", - "relayFrom", hm.f.myVpnAddrs[0], - "relayTo", vpnIp, - "initiatorRelayIndex", existingRelay.LocalIndex, - "relay", relay, - ) - } - case PeerRequested: - // PeerRequested only occurs in Forwarding relays, not Terminal relays, and this is a Terminal relay case. - fallthrough - default: - hostinfo.logger(hm.l).Error("Relay unexpected state", - "vpnIp", vpnIp, - "state", existingRelay.State, - "relay", relay, - ) - - } - } - } + hm.f.relayManager.StartRelays(hm.f, vpnIp, hostinfo, stage0) // If a lighthouse triggered this attempt then we are still in the timer wheel and do not need to re-add if !lighthouseTriggered { diff --git a/main.go b/main.go index eef13c97..d5e5dcc8 100644 --- a/main.go +++ b/main.go @@ -184,14 +184,10 @@ func Main(c *config.C, configTest bool, buildVersion string, l *slog.Logger, dev messageMetrics = newMessageMetricsOnlyRecvError() } - useRelays := c.GetBool("relay.use_relays", DefaultUseRelays) && !c.GetBool("relay.am_relay", false) - handshakeConfig := HandshakeConfig{ - tryInterval: c.GetDuration("handshakes.try_interval", DefaultHandshakeTryInterval), - retries: int64(c.GetInt("handshakes.retries", DefaultHandshakeRetries)), - triggerBuffer: c.GetInt("handshakes.trigger_buffer", DefaultHandshakeTriggerBuffer), - useRelays: useRelays, - + tryInterval: c.GetDuration("handshakes.try_interval", DefaultHandshakeTryInterval), + retries: int64(c.GetInt("handshakes.retries", DefaultHandshakeRetries)), + triggerBuffer: c.GetInt("handshakes.trigger_buffer", DefaultHandshakeTriggerBuffer), messageMetrics: messageMetrics, } diff --git a/relay_manager.go b/relay_manager.go index 919bb2b6..25e65871 100644 --- a/relay_manager.go +++ b/relay_manager.go @@ -15,9 +15,10 @@ import ( ) type relayManager struct { - l *slog.Logger - hostmap *HostMap - amRelay atomic.Bool + l *slog.Logger + hostmap *HostMap + amRelay atomic.Bool + useRelays atomic.Bool } func NewRelayManager(ctx context.Context, l *slog.Logger, hostmap *HostMap, c *config.C) *relayManager { @@ -36,8 +37,10 @@ func NewRelayManager(ctx context.Context, l *slog.Logger, hostmap *HostMap, c *c } func (rm *relayManager) reload(c *config.C, initial bool) error { - if initial || c.HasChanged("relay.am_relay") { - rm.setAmRelay(c.GetBool("relay.am_relay", false)) + if initial || c.HasChanged("relay.am_relay") || c.HasChanged("relay.use_relays") { + amRelay := c.GetBool("relay.am_relay", false) + rm.amRelay.Store(amRelay) + rm.useRelays.Store(c.GetBool("relay.use_relays", true) && !amRelay) } return nil } @@ -46,8 +49,157 @@ func (rm *relayManager) GetAmRelay() bool { return rm.amRelay.Load() } -func (rm *relayManager) setAmRelay(v bool) { - rm.amRelay.Store(v) +func (rm *relayManager) GetUseRelays() bool { + return rm.useRelays.Load() +} + +// StartRelays drives the relay-establishment side of an outbound handshake attempt. +// For each candidate relay it either kicks off a handshake to the relay, sends a CreateRelayRequest, retransmits +// one that may have been lost, or, once the relay is Established, forwards the in-progress +// stage 0 handshake packet for vpnIp through it. +func (rm *relayManager) StartRelays(f *Interface, vpnIp netip.Addr, hostinfo *HostInfo, stage0 []byte) { + if !rm.GetUseRelays() || len(hostinfo.remotes.relays) == 0 { + return + } + + hostinfo.logger(rm.l).Info("Attempt to relay through hosts", "relays", hostinfo.remotes.relays) + // Send a RelayRequest to all known Relay IP's + for _, relay := range hostinfo.remotes.relays { + // Don't relay through the host I'm trying to connect to + if relay == vpnIp { + continue + } + + // Don't relay to myself + if f.myVpnAddrsTable.Contains(relay) { + continue + } + + relayHostInfo := rm.hostmap.QueryVpnAddr(relay) + if relayHostInfo == nil || !relayHostInfo.remote.IsValid() { + hostinfo.logger(rm.l).Info("Establish tunnel to relay target", "relay", relay.String()) + f.Handshake(relay) + continue + } + // Check the relay HostInfo to see if we already established a relay through + existingRelay, ok := relayHostInfo.relayState.QueryRelayForByIp(vpnIp) + if !ok { + // No relays exist or requested yet. + if relayHostInfo.remote.IsValid() { + idx, err := AddRelay(rm.l, relayHostInfo, rm.hostmap, vpnIp, nil, TerminalType, Requested) + if err != nil { + hostinfo.logger(rm.l).Info("Failed to add relay to hostmap", "relay", relay.String(), "error", err) + } + + m := NebulaControl{ + Type: NebulaControl_CreateRelayRequest, + InitiatorRelayIndex: idx, + } + + switch relayHostInfo.GetCert().Certificate.Version() { + case cert.Version1: + if !f.myVpnAddrs[0].Is4() { + hostinfo.logger(rm.l).Error("can not establish v1 relay with a v6 network because the relay is not running a current nebula version") + continue + } + + if !vpnIp.Is4() { + hostinfo.logger(rm.l).Error("can not establish v1 relay with a v6 remote network because the relay is not running a current nebula version") + continue + } + + b := f.myVpnAddrs[0].As4() + m.OldRelayFromAddr = binary.BigEndian.Uint32(b[:]) + b = vpnIp.As4() + m.OldRelayToAddr = binary.BigEndian.Uint32(b[:]) + case cert.Version2: + m.RelayFromAddr = netAddrToProtoAddr(f.myVpnAddrs[0]) + m.RelayToAddr = netAddrToProtoAddr(vpnIp) + default: + hostinfo.logger(rm.l).Error("Unknown certificate version found while creating relay") + continue + } + + msg, err := m.Marshal() + if err != nil { + hostinfo.logger(rm.l).Error("Failed to marshal Control message to create relay", "error", err) + } else { + f.SendMessageToHostInfo(header.Control, 0, relayHostInfo, msg, make([]byte, 12), make([]byte, mtu)) + rm.l.Info("send CreateRelayRequest", + "relayFrom", f.myVpnAddrs[0], + "relayTo", vpnIp, + "initiatorRelayIndex", idx, + "relay", relay, + ) + } + } + continue + } + + switch existingRelay.State { + case Established: + hostinfo.logger(rm.l).Info("Send handshake via relay", "relay", relay.String()) + f.SendVia(relayHostInfo, existingRelay, stage0, make([]byte, 12), make([]byte, mtu), false) + case Disestablished: + // Mark this relay as 'requested' + relayHostInfo.relayState.UpdateRelayForByIpState(vpnIp, Requested) + fallthrough + case Requested: + hostinfo.logger(rm.l).Info("Re-send CreateRelay request", "relay", relay.String()) + // Re-send the CreateRelay request, in case the previous one was lost. + m := NebulaControl{ + Type: NebulaControl_CreateRelayRequest, + InitiatorRelayIndex: existingRelay.LocalIndex, + } + + switch relayHostInfo.GetCert().Certificate.Version() { + case cert.Version1: + if !f.myVpnAddrs[0].Is4() { + hostinfo.logger(rm.l).Error("can not establish v1 relay with a v6 network because the relay is not running a current nebula version") + continue + } + + if !vpnIp.Is4() { + hostinfo.logger(rm.l).Error("can not establish v1 relay with a v6 remote network because the relay is not running a current nebula version") + continue + } + + b := f.myVpnAddrs[0].As4() + m.OldRelayFromAddr = binary.BigEndian.Uint32(b[:]) + b = vpnIp.As4() + m.OldRelayToAddr = binary.BigEndian.Uint32(b[:]) + case cert.Version2: + m.RelayFromAddr = netAddrToProtoAddr(f.myVpnAddrs[0]) + m.RelayToAddr = netAddrToProtoAddr(vpnIp) + default: + hostinfo.logger(rm.l).Error("Unknown certificate version found while creating relay") + continue + } + msg, err := m.Marshal() + if err != nil { + hostinfo.logger(rm.l).Error("Failed to marshal Control message to create relay", "error", err) + } else { + // This must send over the hostinfo, not over hm.Hosts[ip] + f.SendMessageToHostInfo(header.Control, 0, relayHostInfo, msg, make([]byte, 12), make([]byte, mtu)) + rm.l.Info("send CreateRelayRequest", + "relayFrom", f.myVpnAddrs[0], + "relayTo", vpnIp, + "initiatorRelayIndex", existingRelay.LocalIndex, + "relay", relay, + ) + } + case PeerRequested: + // PeerRequested only occurs in Forwarding relays, not Terminal relays, and this is a Terminal relay case. + fallthrough + default: + hostinfo.logger(rm.l).Error("Relay unexpected state", + "vpnIp", vpnIp, + "state", existingRelay.State, + "relay", relay, + ) + + } + } } // AddRelay finds an available relay index on the hostmap, and associates the relay info with it.