Reestablish relays when last hop fails (#1270)

This commit is contained in:
brad-defined 2024-11-15 11:41:07 -05:00 committed by GitHub
parent 9d310e72c2
commit 21a117a156
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 324 additions and 168 deletions

View File

@ -9,6 +9,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/slackhq/nebula" "github.com/slackhq/nebula"
"github.com/slackhq/nebula/cert" "github.com/slackhq/nebula/cert"
@ -469,6 +471,137 @@ func TestRelays(t *testing.T) {
//TODO: assert we actually used the relay even though it should be impossible for a tunnel to have occurred without it //TODO: assert we actually used the relay even though it should be impossible for a tunnel to have occurred without it
} }
func TestReestablishRelays(t *testing.T) {
ca, _, caKey, _ := cert_test.NewTestCaCert(cert.Version1, cert.Curve_CURVE25519, time.Now(), time.Now().Add(10*time.Minute), nil, nil, []string{})
myControl, myVpnIpNet, _, _ := newSimpleServer(cert.Version1, ca, caKey, "me ", "10.128.0.1/24", m{"relay": m{"use_relays": true}})
relayControl, relayVpnIpNet, relayUdpAddr, _ := newSimpleServer(cert.Version1, ca, caKey, "relay ", "10.128.0.128/24", m{"relay": m{"am_relay": true}})
theirControl, theirVpnIpNet, theirUdpAddr, _ := newSimpleServer(cert.Version1, ca, caKey, "them ", "10.128.0.2/24", m{"relay": m{"use_relays": true}})
// Teach my how to get to the relay and that their can be reached via the relay
myControl.InjectLightHouseAddr(relayVpnIpNet[0].Addr(), relayUdpAddr)
myControl.InjectRelays(theirVpnIpNet[0].Addr(), []netip.Addr{relayVpnIpNet[0].Addr()})
relayControl.InjectLightHouseAddr(theirVpnIpNet[0].Addr(), theirUdpAddr)
// Build a router so we don't have to reason who gets which packet
r := router.NewR(t, myControl, relayControl, theirControl)
defer r.RenderFlow()
// Start the servers
myControl.Start()
relayControl.Start()
theirControl.Start()
t.Log("Trigger a handshake from me to them via the relay")
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))
p := r.RouteForAllUntilTxTun(theirControl)
r.Log("Assert the tunnel works")
assertUdpPacket(t, []byte("Hi from me"), p, myVpnIpNet[0].Addr(), theirVpnIpNet[0].Addr(), 80, 80)
t.Log("Ensure packet traversal from them to me via the relay")
theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them"))
p = r.RouteForAllUntilTxTun(myControl)
r.Log("Assert the tunnel works")
assertUdpPacket(t, []byte("Hi from them"), p, theirVpnIpNet[0].Addr(), myVpnIpNet[0].Addr(), 80, 80)
// If we break the relay's connection to 'them', 'me' needs to detect and recover the connection
r.Log("Close the tunnel")
relayControl.CloseTunnel(theirVpnIpNet[0].Addr(), true)
start := len(myControl.GetHostmap().Indexes)
curIndexes := len(myControl.GetHostmap().Indexes)
for curIndexes >= start {
curIndexes = len(myControl.GetHostmap().Indexes)
r.Logf("Wait for the dead index to go away:start=%v indexes, currnet=%v indexes", start, curIndexes)
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me should fail"))
r.RouteForAllExitFunc(func(p *udp.Packet, c *nebula.Control) router.ExitType {
return router.RouteAndExit
})
time.Sleep(2 * time.Second)
}
r.Log("Dead index went away. Woot!")
r.RenderHostmaps("Me removed hostinfo", myControl, relayControl, theirControl)
// Next packet should re-establish a relayed connection and work just great.
t.Logf("Assert the tunnel...")
for {
t.Log("RouteForAllUntilTxTun")
myControl.InjectLightHouseAddr(relayVpnIpNet[0].Addr(), relayUdpAddr)
myControl.InjectRelays(theirVpnIpNet[0].Addr(), []netip.Addr{relayVpnIpNet[0].Addr()})
relayControl.InjectLightHouseAddr(theirVpnIpNet[0].Addr(), theirUdpAddr)
myControl.InjectTunUDPPacket(theirVpnIpNet[0].Addr(), 80, myVpnIpNet[0].Addr(), 80, []byte("Hi from me"))
p = r.RouteForAllUntilTxTun(theirControl)
r.Log("Assert the tunnel works")
packet := gopacket.NewPacket(p, layers.LayerTypeIPv4, gopacket.Lazy)
v4 := packet.Layer(layers.LayerTypeIPv4).(*layers.IPv4)
if slices.Compare(v4.SrcIP, myVpnIpNet[0].Addr().AsSlice()) != 0 {
t.Logf("SrcIP is unexpected...this is not the packet I'm looking for. Keep looking")
continue
}
if slices.Compare(v4.DstIP, theirVpnIpNet[0].Addr().AsSlice()) != 0 {
t.Logf("DstIP is unexpected...this is not the packet I'm looking for. Keep looking")
continue
}
udp := packet.Layer(layers.LayerTypeUDP).(*layers.UDP)
if udp == nil {
t.Log("Not a UDP packet. This is not the packet I'm looking for. Keep looking")
continue
}
data := packet.ApplicationLayer()
if data == nil {
t.Log("No data found in packet. This is not the packet I'm looking for. Keep looking.")
continue
}
if string(data.Payload()) != "Hi from me" {
t.Logf("Unexpected payload: '%v', keep looking", string(data.Payload()))
continue
}
t.Log("I found my lost packet. I am so happy.")
break
}
t.Log("Assert the tunnel works the other way, too")
for {
t.Log("RouteForAllUntilTxTun")
theirControl.InjectTunUDPPacket(myVpnIpNet[0].Addr(), 80, theirVpnIpNet[0].Addr(), 80, []byte("Hi from them"))
p = r.RouteForAllUntilTxTun(myControl)
r.Log("Assert the tunnel works")
packet := gopacket.NewPacket(p, layers.LayerTypeIPv4, gopacket.Lazy)
v4 := packet.Layer(layers.LayerTypeIPv4).(*layers.IPv4)
if slices.Compare(v4.DstIP, myVpnIpNet[0].Addr().AsSlice()) != 0 {
t.Logf("Dst is unexpected...this is not the packet I'm looking for. Keep looking")
continue
}
if slices.Compare(v4.SrcIP, theirVpnIpNet[0].Addr().AsSlice()) != 0 {
t.Logf("SrcIP is unexpected...this is not the packet I'm looking for. Keep looking")
continue
}
udp := packet.Layer(layers.LayerTypeUDP).(*layers.UDP)
if udp == nil {
t.Log("Not a UDP packet. This is not the packet I'm looking for. Keep looking")
continue
}
data := packet.ApplicationLayer()
if data == nil {
t.Log("No data found in packet. This is not the packet I'm looking for. Keep looking.")
continue
}
if string(data.Payload()) != "Hi from them" {
t.Logf("Unexpected payload: '%v', keep looking", string(data.Payload()))
continue
}
t.Log("I found my lost packet. I am so happy.")
break
}
r.RenderHostmaps("Final hostmaps", myControl, relayControl, theirControl)
}
func TestStage1RaceRelays(t *testing.T) { func TestStage1RaceRelays(t *testing.T) {
//NOTE: this is a race between me and relay resulting in a full tunnel from me to them via relay //NOTE: this is a race between me and relay resulting in a full tunnel from me to them via relay
ca, _, caKey, _ := cert_test.NewTestCaCert(cert.Version1, cert.Curve_CURVE25519, time.Now(), time.Now().Add(10*time.Minute), nil, nil, []string{}) ca, _, caKey, _ := cert_test.NewTestCaCert(cert.Version1, cert.Curve_CURVE25519, time.Now(), time.Now().Add(10*time.Minute), nil, nil, []string{})

View File

@ -400,6 +400,9 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
return return
} }
hostinfo.relayState.InsertRelayTo(via.relayHI.vpnAddrs[0]) hostinfo.relayState.InsertRelayTo(via.relayHI.vpnAddrs[0])
// I successfully received a handshake. Just in case I marked this tunnel as 'Disestablished', ensure
// it's correctly marked as working.
via.relayHI.relayState.UpdateRelayForByIdxState(via.remoteIdx, Established)
f.SendVia(via.relayHI, via.relay, msg, make([]byte, 12), make([]byte, mtu), false) f.SendVia(via.relayHI, via.relay, msg, make([]byte, 12), make([]byte, mtu), false)
f.l.WithField("vpnAddrs", vpnAddrs).WithField("relay", via.relayHI.vpnAddrs[0]). f.l.WithField("vpnAddrs", vpnAddrs).WithField("relay", via.relayHI.vpnAddrs[0]).
WithField("certName", certName). WithField("certName", certName).

View File

@ -285,67 +285,9 @@ func (hm *HandshakeManager) handleOutbound(vpnIp netip.Addr, lighthouseTriggered
hm.f.Handshake(relay) hm.f.Handshake(relay)
continue continue
} }
// Check the relay HostInfo to see if we already established a relay through it // Check the relay HostInfo to see if we already established a relay through
if existingRelay, ok := relayHostInfo.relayState.QueryRelayForByIp(vpnIp); ok { existingRelay, ok := relayHostInfo.relayState.QueryRelayForByIp(vpnIp)
switch existingRelay.State { if !ok {
case Established:
hostinfo.logger(hm.l).WithField("relay", relay.String()).Info("Send handshake via relay")
hm.f.SendVia(relayHostInfo, existingRelay, hostinfo.HandshakePacket[0], make([]byte, 12), make([]byte, mtu), false)
case Requested:
hostinfo.logger(hm.l).WithField("relay", relay.String()).Info("Re-send CreateRelay request")
m := NebulaControl{
Type: NebulaControl_CreateRelayRequest,
InitiatorRelayIndex: existingRelay.LocalIndex,
}
switch relayHostInfo.GetCert().Certificate.Version() {
case cert.Version1:
if !hm.f.myVpnAddrs[0].Is4() {
hostinfo.logger(hm.l).Error("can not establish v1 relay with a v6 network because the relay is not running a current nebula version")
continue
}
if !vpnIp.Is4() {
hostinfo.logger(hm.l).Error("can not establish v1 relay with a v6 remote network because the relay is not running a current nebula version")
continue
}
b := hm.f.myVpnAddrs[0].As4()
m.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
b = vpnIp.As4()
m.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
case cert.Version2:
m.RelayFromAddr = netAddrToProtoAddr(hm.f.myVpnAddrs[0])
m.RelayToAddr = netAddrToProtoAddr(vpnIp)
default:
hostinfo.logger(hm.l).Error("Unknown certificate version found while creating relay")
continue
}
msg, err := m.Marshal()
if err != nil {
hostinfo.logger(hm.l).
WithError(err).
Error("Failed to marshal Control message to create relay")
} else {
// This must send over the hostinfo, not over hm.Hosts[ip]
hm.f.SendMessageToHostInfo(header.Control, 0, relayHostInfo, msg, make([]byte, 12), make([]byte, mtu))
hm.l.WithFields(logrus.Fields{
"relayFrom": hm.f.myVpnAddrs[0],
"relayTo": vpnIp,
"initiatorRelayIndex": existingRelay.LocalIndex,
"relay": relay}).
Info("send CreateRelayRequest")
}
default:
hostinfo.logger(hm.l).
WithField("vpnIp", vpnIp).
WithField("state", existingRelay.State).
WithField("relay", relayHostInfo.vpnAddrs[0]).
Errorf("Relay unexpected state")
}
} else {
// No relays exist or requested yet. // No relays exist or requested yet.
if relayHostInfo.remote.IsValid() { if relayHostInfo.remote.IsValid() {
idx, err := AddRelay(hm.l, relayHostInfo, hm.mainHostMap, vpnIp, nil, TerminalType, Requested) idx, err := AddRelay(hm.l, relayHostInfo, hm.mainHostMap, vpnIp, nil, TerminalType, Requested)
@ -397,6 +339,73 @@ func (hm *HandshakeManager) handleOutbound(vpnIp netip.Addr, lighthouseTriggered
Info("send CreateRelayRequest") Info("send CreateRelayRequest")
} }
} }
continue
}
switch existingRelay.State {
case Established:
hostinfo.logger(hm.l).WithField("relay", relay.String()).Info("Send handshake via relay")
hm.f.SendVia(relayHostInfo, existingRelay, hostinfo.HandshakePacket[0], make([]byte, 12), make([]byte, mtu), false)
case Disestablished:
// Mark this relay as 'requested'
relayHostInfo.relayState.UpdateRelayForByIpState(vpnIp, Requested)
fallthrough
case Requested:
hostinfo.logger(hm.l).WithField("relay", relay.String()).Info("Re-send CreateRelay request")
// Re-send the CreateRelay request, in case the previous one was lost.
m := NebulaControl{
Type: NebulaControl_CreateRelayRequest,
InitiatorRelayIndex: existingRelay.LocalIndex,
}
switch relayHostInfo.GetCert().Certificate.Version() {
case cert.Version1:
if !hm.f.myVpnAddrs[0].Is4() {
hostinfo.logger(hm.l).Error("can not establish v1 relay with a v6 network because the relay is not running a current nebula version")
continue
}
if !vpnIp.Is4() {
hostinfo.logger(hm.l).Error("can not establish v1 relay with a v6 remote network because the relay is not running a current nebula version")
continue
}
b := hm.f.myVpnAddrs[0].As4()
m.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
b = vpnIp.As4()
m.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
case cert.Version2:
m.RelayFromAddr = netAddrToProtoAddr(hm.f.myVpnAddrs[0])
m.RelayToAddr = netAddrToProtoAddr(vpnIp)
default:
hostinfo.logger(hm.l).Error("Unknown certificate version found while creating relay")
continue
}
msg, err := m.Marshal()
if err != nil {
hostinfo.logger(hm.l).
WithError(err).
Error("Failed to marshal Control message to create relay")
} else {
// This must send over the hostinfo, not over hm.Hosts[ip]
hm.f.SendMessageToHostInfo(header.Control, 0, relayHostInfo, msg, make([]byte, 12), make([]byte, mtu))
hm.l.WithFields(logrus.Fields{
"relayFrom": hm.f.myVpnAddrs[0],
"relayTo": vpnIp,
"initiatorRelayIndex": existingRelay.LocalIndex,
"relay": relay}).
Info("send CreateRelayRequest")
}
case PeerRequested:
// PeerRequested only occurs in Forwarding relays, not Terminal relays, and this is a Terminal relay case.
fallthrough
default:
hostinfo.logger(hm.l).
WithField("vpnIp", vpnIp).
WithField("state", existingRelay.State).
WithField("relay", relay).
Errorf("Relay unexpected state")
} }
} }
} }

View File

@ -35,6 +35,7 @@ const (
Requested = iota Requested = iota
PeerRequested PeerRequested
Established Established
Disestablished
) )
const ( const (
@ -67,9 +68,12 @@ type HostMap struct {
type RelayState struct { type RelayState struct {
sync.RWMutex sync.RWMutex
relays map[netip.Addr]struct{} // Set of vpnAddr's of Hosts to use as relays to access this peer relays map[netip.Addr]struct{} // Set of vpnAddr's of Hosts to use as relays to access this peer
relayForByAddr map[netip.Addr]*Relay // Maps vpnAddr of peers for which this HostInfo is a relay to some Relay info // For data race avoidance, the contents of a *Relay are treated immutably. To update a *Relay, copy the existing data,
relayForByIdx map[uint32]*Relay // Maps a local index to some Relay info // modify what needs to be updated, and store the new modified copy in the relayForByIp and relayForByIdx maps (with
// the RelayState Lock held)
relayForByAddr map[netip.Addr]*Relay // Maps vpnAddr of peers for which this HostInfo is a relay to some Relay info
relayForByIdx map[uint32]*Relay // Maps a local index to some Relay info
} }
func (rs *RelayState) DeleteRelay(ip netip.Addr) { func (rs *RelayState) DeleteRelay(ip netip.Addr) {
@ -78,6 +82,28 @@ func (rs *RelayState) DeleteRelay(ip netip.Addr) {
delete(rs.relays, ip) delete(rs.relays, ip)
} }
func (rs *RelayState) UpdateRelayForByIpState(vpnIp netip.Addr, state int) {
rs.Lock()
defer rs.Unlock()
if r, ok := rs.relayForByAddr[vpnIp]; ok {
newRelay := *r
newRelay.State = state
rs.relayForByAddr[newRelay.PeerAddr] = &newRelay
rs.relayForByIdx[newRelay.LocalIndex] = &newRelay
}
}
func (rs *RelayState) UpdateRelayForByIdxState(idx uint32, state int) {
rs.Lock()
defer rs.Unlock()
if r, ok := rs.relayForByIdx[idx]; ok {
newRelay := *r
newRelay.State = state
rs.relayForByAddr[newRelay.PeerAddr] = &newRelay
rs.relayForByIdx[newRelay.LocalIndex] = &newRelay
}
}
func (rs *RelayState) CopyAllRelayFor() []*Relay { func (rs *RelayState) CopyAllRelayFor() []*Relay {
rs.RLock() rs.RLock()
defer rs.RUnlock() defer rs.RUnlock()
@ -363,6 +389,7 @@ func (hm *HostMap) unlockedDeleteHostInfo(hostinfo *HostInfo) {
func (hm *HostMap) unlockedInnerDeleteHostInfo(hostinfo *HostInfo, addr netip.Addr) { func (hm *HostMap) unlockedInnerDeleteHostInfo(hostinfo *HostInfo, addr netip.Addr) {
primary, ok := hm.Hosts[addr] primary, ok := hm.Hosts[addr]
isLastHostinfo := hostinfo.next == nil && hostinfo.prev == nil
if ok && primary == hostinfo { if ok && primary == hostinfo {
// The vpn addr pointer points to the same hostinfo as the local index id, we can remove it // The vpn addr pointer points to the same hostinfo as the local index id, we can remove it
delete(hm.Hosts, addr) delete(hm.Hosts, addr)
@ -412,6 +439,12 @@ func (hm *HostMap) unlockedInnerDeleteHostInfo(hostinfo *HostInfo, addr netip.Ad
Debug("Hostmap hostInfo deleted") Debug("Hostmap hostInfo deleted")
} }
if isLastHostinfo {
// I have lost connectivity to my peers. My relay tunnel is likely broken. Mark the next
// hops as 'Requested' so that new relay tunnels are created in the future.
hm.unlockedDisestablishVpnAddrRelayFor(hostinfo)
}
// Clean up any local relay indexes for which I am acting as a relay hop
for _, localRelayIdx := range hostinfo.relayState.CopyRelayForIdxs() { for _, localRelayIdx := range hostinfo.relayState.CopyRelayForIdxs() {
delete(hm.Relays, localRelayIdx) delete(hm.Relays, localRelayIdx)
} }
@ -476,6 +509,27 @@ func (hm *HostMap) QueryVpnAddrsRelayFor(targetIps []netip.Addr, relayHostIp net
return nil, nil, errors.New("unable to find host with relay") return nil, nil, errors.New("unable to find host with relay")
} }
func (hm *HostMap) unlockedDisestablishVpnAddrRelayFor(hi *HostInfo) {
for _, relayHostIp := range hi.relayState.CopyRelayIps() {
if h, ok := hm.Hosts[relayHostIp]; ok {
for h != nil {
h.relayState.UpdateRelayForByIpState(hi.vpnAddrs[0], Disestablished)
h = h.next
}
}
}
for _, rs := range hi.relayState.CopyAllRelayFor() {
if rs.Type == ForwardingType {
if h, ok := hm.Hosts[rs.PeerAddr]; ok {
for h != nil {
h.relayState.UpdateRelayForByIpState(hi.vpnAddrs[0], Disestablished)
h = h.next
}
}
}
}
}
func (hm *HostMap) queryVpnAddr(vpnIp netip.Addr, promoteIfce *Interface) *HostInfo { func (hm *HostMap) queryVpnAddr(vpnIp netip.Addr, promoteIfce *Interface) *HostInfo {
hm.RLock() hm.RLock()
if h, ok := hm.Hosts[vpnIp]; ok { if h, ok := hm.Hosts[vpnIp]; ok {

View File

@ -132,7 +132,6 @@ func (rm *relayManager) HandleControlMsg(h *HostInfo, d []byte, f *Interface) {
case NebulaControl_CreateRelayResponse: case NebulaControl_CreateRelayResponse:
rm.handleCreateRelayResponse(v, h, f, msg) rm.handleCreateRelayResponse(v, h, f, msg)
} }
} }
func (rm *relayManager) handleCreateRelayResponse(v cert.Version, h *HostInfo, f *Interface, m *NebulaControl) { func (rm *relayManager) handleCreateRelayResponse(v cert.Version, h *HostInfo, f *Interface, m *NebulaControl) {
@ -167,8 +166,12 @@ func (rm *relayManager) handleCreateRelayResponse(v cert.Version, h *HostInfo, f
rm.l.WithField("relayTo", peerHostInfo.vpnAddrs[0]).Error("peerRelay does not have Relay state for relayTo") rm.l.WithField("relayTo", peerHostInfo.vpnAddrs[0]).Error("peerRelay does not have Relay state for relayTo")
return return
} }
if peerRelay.State == PeerRequested { switch peerRelay.State {
peerRelay.State = Established case Requested:
// I initiated the request to this peer, but haven't heard back from the peer yet. I must wait for this peer
// to respond to complete the connection.
case PeerRequested, Disestablished, Established:
peerHostInfo.relayState.UpdateRelayForByIpState(targetAddr, Established)
resp := NebulaControl{ resp := NebulaControl{
Type: NebulaControl_CreateRelayResponse, Type: NebulaControl_CreateRelayResponse,
ResponderRelayIndex: peerRelay.LocalIndex, ResponderRelayIndex: peerRelay.LocalIndex,
@ -247,6 +250,21 @@ func (rm *relayManager) handleCreateRelayRequest(v cert.Version, h *HostInfo, f
"existingRemoteIndex": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest") "existingRemoteIndex": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
return return
} }
case Disestablished:
if existingRelay.RemoteIndex != m.InitiatorRelayIndex {
// We got a brand new Relay request, because its index is different than what we saw before.
// This should never happen. The peer should never change an index, once created.
logMsg.WithFields(logrus.Fields{
"existingRemoteIndex": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
return
}
// Mark the relay as 'Established' because it's safe to use again
h.relayState.UpdateRelayForByIpState(from, Established)
case PeerRequested:
// I should never be in this state, because I am terminal, not forwarding.
logMsg.WithFields(logrus.Fields{
"existingRemoteIndex": existingRelay.RemoteIndex,
"state": existingRelay.State}).Error("Unexpected Relay State found")
} }
} else { } else {
_, err := AddRelay(rm.l, h, f.hostMap, from, &m.InitiatorRelayIndex, TerminalType, Established) _, err := AddRelay(rm.l, h, f.hostMap, from, &m.InitiatorRelayIndex, TerminalType, Established)
@ -258,7 +276,7 @@ func (rm *relayManager) handleCreateRelayRequest(v cert.Version, h *HostInfo, f
relay, ok := h.relayState.QueryRelayForByIp(from) relay, ok := h.relayState.QueryRelayForByIp(from)
if !ok { if !ok {
logMsg.Error("Relay State not found") logMsg.WithField("from", from).Error("Relay State not found")
return return
} }
@ -310,126 +328,65 @@ func (rm *relayManager) handleCreateRelayRequest(v cert.Version, h *HostInfo, f
// Only create relays to peers for whom I have a direct connection // Only create relays to peers for whom I have a direct connection
return return
} }
sendCreateRequest := false
var index uint32 var index uint32
var err error var err error
targetRelay, ok := peer.relayState.QueryRelayForByIp(from) targetRelay, ok := peer.relayState.QueryRelayForByIp(from)
if ok { if ok {
index = targetRelay.LocalIndex index = targetRelay.LocalIndex
if targetRelay.State == Requested {
sendCreateRequest = true
}
} else { } else {
// Allocate an index in the hostMap for this relay peer // Allocate an index in the hostMap for this relay peer
index, err = AddRelay(rm.l, peer, f.hostMap, from, nil, ForwardingType, Requested) index, err = AddRelay(rm.l, peer, f.hostMap, from, nil, ForwardingType, Requested)
if err != nil { if err != nil {
return return
} }
sendCreateRequest = true
} }
if sendCreateRequest { peer.relayState.UpdateRelayForByIpState(from, Requested)
// Send a CreateRelayRequest to the peer. // Send a CreateRelayRequest to the peer.
req := NebulaControl{ req := NebulaControl{
Type: NebulaControl_CreateRelayRequest, Type: NebulaControl_CreateRelayRequest,
InitiatorRelayIndex: index, InitiatorRelayIndex: index,
}
if v == cert.Version1 {
if !h.vpnAddrs[0].Is4() {
//TODO: log it
return
}
b := h.vpnAddrs[0].As4()
req.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
b = target.As4()
req.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
} else {
req.RelayFromAddr = netAddrToProtoAddr(h.vpnAddrs[0])
req.RelayToAddr = netAddrToProtoAddr(target)
}
msg, err := req.Marshal()
if err != nil {
logMsg.
WithError(err).Error("relayManager Failed to marshal Control message to create relay")
} else {
f.SendMessageToHostInfo(header.Control, 0, peer, msg, make([]byte, 12), make([]byte, mtu))
rm.l.WithFields(logrus.Fields{
//TODO: IPV6-WORK another lazy used to use the req object
"relayFrom": h.vpnAddrs[0],
"relayTo": target,
"initiatorRelayIndex": req.InitiatorRelayIndex,
"responderRelayIndex": req.ResponderRelayIndex,
"vpnAddr": target}).
Info("send CreateRelayRequest")
}
} }
if v == cert.Version1 {
if !h.vpnAddrs[0].Is4() {
//TODO: log it
return
}
b := h.vpnAddrs[0].As4()
req.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
b = target.As4()
req.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
} else {
req.RelayFromAddr = netAddrToProtoAddr(h.vpnAddrs[0])
req.RelayToAddr = netAddrToProtoAddr(target)
}
msg, err := req.Marshal()
if err != nil {
logMsg.
WithError(err).Error("relayManager Failed to marshal Control message to create relay")
} else {
f.SendMessageToHostInfo(header.Control, 0, peer, msg, make([]byte, 12), make([]byte, mtu))
rm.l.WithFields(logrus.Fields{
//TODO: IPV6-WORK another lazy used to use the req object
"relayFrom": h.vpnAddrs[0],
"relayTo": target,
"initiatorRelayIndex": req.InitiatorRelayIndex,
"responderRelayIndex": req.ResponderRelayIndex,
"vpnAddr": target}).
Info("send CreateRelayRequest")
}
// Also track the half-created Relay state just received // Also track the half-created Relay state just received
relay, ok := h.relayState.QueryRelayForByIp(target) _, ok = h.relayState.QueryRelayForByIp(target)
if !ok { if !ok {
// Add the relay _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, PeerRequested)
state := PeerRequested
if targetRelay != nil && targetRelay.State == Established {
state = Established
}
_, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, state)
if err != nil { if err != nil {
logMsg. logMsg.
WithError(err).Error("relayManager Failed to allocate a local index for relay") WithError(err).Error("relayManager Failed to allocate a local index for relay")
return return
} }
} else {
switch relay.State {
case Established:
if relay.RemoteIndex != m.InitiatorRelayIndex {
// We got a brand new Relay request, because its index is different than what we saw before.
// This should never happen. The peer should never change an index, once created.
logMsg.WithFields(logrus.Fields{
"existingRemoteIndex": relay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
return
}
resp := NebulaControl{
Type: NebulaControl_CreateRelayResponse,
ResponderRelayIndex: relay.LocalIndex,
InitiatorRelayIndex: relay.RemoteIndex,
}
if v == cert.Version1 {
if !h.vpnAddrs[0].Is4() {
//TODO: log it
return
}
b := h.vpnAddrs[0].As4()
resp.OldRelayFromAddr = binary.BigEndian.Uint32(b[:])
b = target.As4()
resp.OldRelayToAddr = binary.BigEndian.Uint32(b[:])
} else {
resp.RelayFromAddr = netAddrToProtoAddr(h.vpnAddrs[0])
resp.RelayToAddr = netAddrToProtoAddr(target)
}
msg, err := resp.Marshal()
if err != nil {
rm.l.
WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
} else {
f.SendMessageToHostInfo(header.Control, 0, h, msg, make([]byte, 12), make([]byte, mtu))
rm.l.WithFields(logrus.Fields{
//TODO: IPV6-WORK more lazy, used to use resp object
"relayFrom": h.vpnAddrs[0],
"relayTo": target,
"initiatorRelayIndex": resp.InitiatorRelayIndex,
"responderRelayIndex": resp.ResponderRelayIndex,
"vpnAddrs": h.vpnAddrs}).
Info("send CreateRelayResponse")
}
case Requested:
// Keep waiting for the other relay to complete
}
} }
} }
} }