mirror of
https://github.com/slackhq/nebula.git
synced 2025-11-10 01:23:58 +01:00
The goal of this work is to send packets between two hosts using more than one
5-tuple. When running on networks like AWS where the underlying network driver
and overlay fabric makes routing, load balancing, and failover decisions based
on the flow hash, this enables more than one flow between pairs of hosts.
Multiport spreads outgoing UDP packets across multiple UDP send ports,
which allows nebula to work around any issues on the underlay network.
Some example issues this could work around:
- UDP rate limits on a per flow basis.
- Partial underlay network failure in which some flows work and some don't
Agreement is done during the handshake to decide if multiport mode will
be used for a given tunnel (one side must have tx_enabled set, the other
side must have rx_enabled set)
NOTE: you cannot use multiport on a host if you are relying on UDP hole
punching to get through a NAT or firewall.
NOTE: Linux only (uses raw sockets to send). Also currently only works
with IPv4 underlay network remotes.
This is implemented by opening a raw socket and sending packets with
a source port that is based on a hash of the overlay source/destiation
port. For ICMP and Nebula metadata packets, we use a random source port.
Example configuration:
multiport:
# This host support sending via multiple UDP ports.
tx_enabled: false
# This host supports receiving packets sent from multiple UDP ports.
rx_enabled: false
# How many UDP ports to use when sending. The lowest source port will be
# listen.port and go up to (but not including) listen.port + tx_ports.
tx_ports: 100
# NOTE: All of your hosts must be running a version of Nebula that supports
# multiport if you want to enable this feature. Older versions of Nebula
# will be confused by these multiport handshakes.
#
# If handshakes are not getting a response, attempt to transmit handshakes
# using random UDP source ports (to get around partial underlay network
# failures).
tx_handshake: false
# How many unresponded handshakes we should send before we attempt to
# send multiport handshakes.
tx_handshake_delay: 2
421 lines
11 KiB
Go
421 lines
11 KiB
Go
package nebula
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"runtime"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/rcrowley/go-metrics"
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/slackhq/nebula/cert"
|
|
"github.com/slackhq/nebula/config"
|
|
"github.com/slackhq/nebula/firewall"
|
|
"github.com/slackhq/nebula/iputil"
|
|
"github.com/slackhq/nebula/overlay"
|
|
"github.com/slackhq/nebula/udp"
|
|
)
|
|
|
|
const mtu = 9001
|
|
|
|
type InterfaceConfig struct {
|
|
HostMap *HostMap
|
|
Outside *udp.Conn
|
|
Inside overlay.Device
|
|
certState *CertState
|
|
Cipher string
|
|
Firewall *Firewall
|
|
ServeDns bool
|
|
HandshakeManager *HandshakeManager
|
|
lightHouse *LightHouse
|
|
checkInterval int
|
|
pendingDeletionInterval int
|
|
DropLocalBroadcast bool
|
|
DropMulticast bool
|
|
routines int
|
|
MessageMetrics *MessageMetrics
|
|
version string
|
|
caPool *cert.NebulaCAPool
|
|
disconnectInvalid bool
|
|
relayManager *relayManager
|
|
|
|
ConntrackCacheTimeout time.Duration
|
|
l *logrus.Logger
|
|
}
|
|
|
|
type Interface struct {
|
|
hostMap *HostMap
|
|
outside *udp.Conn
|
|
inside overlay.Device
|
|
certState *CertState
|
|
cipher string
|
|
firewall *Firewall
|
|
connectionManager *connectionManager
|
|
handshakeManager *HandshakeManager
|
|
serveDns bool
|
|
createTime time.Time
|
|
lightHouse *LightHouse
|
|
localBroadcast iputil.VpnIp
|
|
myVpnIp iputil.VpnIp
|
|
dropLocalBroadcast bool
|
|
dropMulticast bool
|
|
routines int
|
|
caPool *cert.NebulaCAPool
|
|
disconnectInvalid bool
|
|
closed int32
|
|
relayManager *relayManager
|
|
|
|
sendRecvErrorConfig sendRecvErrorConfig
|
|
|
|
// rebindCount is used to decide if an active tunnel should trigger a punch notification through a lighthouse
|
|
rebindCount int8
|
|
version string
|
|
|
|
conntrackCacheTimeout time.Duration
|
|
|
|
writers []*udp.Conn
|
|
readers []io.ReadWriteCloser
|
|
udpRaw *udp.RawConn
|
|
|
|
multiPort MultiPortConfig
|
|
|
|
metricHandshakes metrics.Histogram
|
|
messageMetrics *MessageMetrics
|
|
cachedPacketMetrics *cachedPacketMetrics
|
|
|
|
l *logrus.Logger
|
|
}
|
|
|
|
type MultiPortConfig struct {
|
|
Tx bool
|
|
Rx bool
|
|
TxBasePort uint16
|
|
TxPorts int
|
|
TxHandshake bool
|
|
TxHandshakeDelay int
|
|
}
|
|
|
|
type sendRecvErrorConfig uint8
|
|
|
|
const (
|
|
sendRecvErrorAlways sendRecvErrorConfig = iota
|
|
sendRecvErrorNever
|
|
sendRecvErrorPrivate
|
|
)
|
|
|
|
func (s sendRecvErrorConfig) ShouldSendRecvError(ip net.IP) bool {
|
|
switch s {
|
|
case sendRecvErrorPrivate:
|
|
return ip.IsPrivate()
|
|
case sendRecvErrorAlways:
|
|
return true
|
|
case sendRecvErrorNever:
|
|
return false
|
|
default:
|
|
panic(fmt.Errorf("invalid sendRecvErrorConfig value: %d", s))
|
|
}
|
|
}
|
|
|
|
func (s sendRecvErrorConfig) String() string {
|
|
switch s {
|
|
case sendRecvErrorAlways:
|
|
return "always"
|
|
case sendRecvErrorNever:
|
|
return "never"
|
|
case sendRecvErrorPrivate:
|
|
return "private"
|
|
default:
|
|
return fmt.Sprintf("invalid(%d)", s)
|
|
}
|
|
}
|
|
|
|
func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
|
|
if c.Outside == nil {
|
|
return nil, errors.New("no outside connection")
|
|
}
|
|
if c.Inside == nil {
|
|
return nil, errors.New("no inside interface (tun)")
|
|
}
|
|
if c.certState == nil {
|
|
return nil, errors.New("no certificate state")
|
|
}
|
|
if c.Firewall == nil {
|
|
return nil, errors.New("no firewall rules")
|
|
}
|
|
|
|
myVpnIp := iputil.Ip2VpnIp(c.certState.certificate.Details.Ips[0].IP)
|
|
ifce := &Interface{
|
|
hostMap: c.HostMap,
|
|
outside: c.Outside,
|
|
inside: c.Inside,
|
|
certState: c.certState,
|
|
cipher: c.Cipher,
|
|
firewall: c.Firewall,
|
|
serveDns: c.ServeDns,
|
|
handshakeManager: c.HandshakeManager,
|
|
createTime: time.Now(),
|
|
lightHouse: c.lightHouse,
|
|
localBroadcast: myVpnIp | ^iputil.Ip2VpnIp(c.certState.certificate.Details.Ips[0].Mask),
|
|
dropLocalBroadcast: c.DropLocalBroadcast,
|
|
dropMulticast: c.DropMulticast,
|
|
routines: c.routines,
|
|
version: c.version,
|
|
writers: make([]*udp.Conn, c.routines),
|
|
readers: make([]io.ReadWriteCloser, c.routines),
|
|
caPool: c.caPool,
|
|
disconnectInvalid: c.disconnectInvalid,
|
|
myVpnIp: myVpnIp,
|
|
relayManager: c.relayManager,
|
|
|
|
conntrackCacheTimeout: c.ConntrackCacheTimeout,
|
|
|
|
metricHandshakes: metrics.GetOrRegisterHistogram("handshakes", nil, metrics.NewExpDecaySample(1028, 0.015)),
|
|
messageMetrics: c.MessageMetrics,
|
|
cachedPacketMetrics: &cachedPacketMetrics{
|
|
sent: metrics.GetOrRegisterCounter("hostinfo.cached_packets.sent", nil),
|
|
dropped: metrics.GetOrRegisterCounter("hostinfo.cached_packets.dropped", nil),
|
|
},
|
|
|
|
l: c.l,
|
|
}
|
|
|
|
ifce.connectionManager = newConnectionManager(ctx, c.l, ifce, c.checkInterval, c.pendingDeletionInterval)
|
|
|
|
return ifce, nil
|
|
}
|
|
|
|
// activate creates the interface on the host. After the interface is created, any
|
|
// other services that want to bind listeners to its IP may do so successfully. However,
|
|
// the interface isn't going to process anything until run() is called.
|
|
func (f *Interface) activate() {
|
|
// actually turn on tun dev
|
|
|
|
addr, err := f.outside.LocalAddr()
|
|
if err != nil {
|
|
f.l.WithError(err).Error("Failed to get udp listen address")
|
|
}
|
|
|
|
f.l.WithField("interface", f.inside.Name()).WithField("network", f.inside.Cidr().String()).
|
|
WithField("build", f.version).WithField("udpAddr", addr).
|
|
Info("Nebula interface is active")
|
|
|
|
metrics.GetOrRegisterGauge("routines", nil).Update(int64(f.routines))
|
|
|
|
metrics.GetOrRegisterGauge("multiport.tx_ports", nil).Update(int64(f.multiPort.TxPorts))
|
|
|
|
// Prepare n tun queues
|
|
var reader io.ReadWriteCloser = f.inside
|
|
for i := 0; i < f.routines; i++ {
|
|
if i > 0 {
|
|
reader, err = f.inside.NewMultiQueueReader()
|
|
if err != nil {
|
|
f.l.Fatal(err)
|
|
}
|
|
}
|
|
f.readers[i] = reader
|
|
}
|
|
|
|
if err := f.inside.Activate(); err != nil {
|
|
f.inside.Close()
|
|
f.l.Fatal(err)
|
|
}
|
|
}
|
|
|
|
func (f *Interface) run() {
|
|
// Launch n queues to read packets from udp
|
|
for i := 0; i < f.routines; i++ {
|
|
go f.listenOut(i)
|
|
}
|
|
|
|
// Launch n queues to read packets from tun dev
|
|
for i := 0; i < f.routines; i++ {
|
|
go f.listenIn(f.readers[i], i)
|
|
}
|
|
}
|
|
|
|
func (f *Interface) listenOut(i int) {
|
|
runtime.LockOSThread()
|
|
|
|
var li *udp.Conn
|
|
// TODO clean this up with a coherent interface for each outside connection
|
|
if i > 0 {
|
|
li = f.writers[i]
|
|
} else {
|
|
li = f.outside
|
|
}
|
|
|
|
lhh := f.lightHouse.NewRequestHandler()
|
|
conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
|
|
li.ListenOut(f.readOutsidePackets, lhh.HandleRequest, conntrackCache, i)
|
|
}
|
|
|
|
func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
|
|
runtime.LockOSThread()
|
|
|
|
packet := make([]byte, mtu)
|
|
out := make([]byte, mtu)
|
|
fwPacket := &firewall.Packet{}
|
|
nb := make([]byte, 12, 12)
|
|
|
|
conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
|
|
|
|
for {
|
|
n, err := reader.Read(packet)
|
|
if err != nil {
|
|
if errors.Is(err, os.ErrClosed) && atomic.LoadInt32(&f.closed) != 0 {
|
|
return
|
|
}
|
|
|
|
f.l.WithError(err).Error("Error while reading outbound packet")
|
|
// This only seems to happen when something fatal happens to the fd, so exit.
|
|
os.Exit(2)
|
|
}
|
|
|
|
f.consumeInsidePacket(packet[:n], fwPacket, nb, out, i, conntrackCache.Get(f.l))
|
|
}
|
|
}
|
|
|
|
func (f *Interface) RegisterConfigChangeCallbacks(c *config.C) {
|
|
c.RegisterReloadCallback(f.reloadCA)
|
|
c.RegisterReloadCallback(f.reloadCertKey)
|
|
c.RegisterReloadCallback(f.reloadFirewall)
|
|
c.RegisterReloadCallback(f.reloadSendRecvError)
|
|
for _, udpConn := range f.writers {
|
|
c.RegisterReloadCallback(udpConn.ReloadConfig)
|
|
}
|
|
}
|
|
|
|
func (f *Interface) reloadCA(c *config.C) {
|
|
// reload and check regardless
|
|
// todo: need mutex?
|
|
newCAs, err := loadCAFromConfig(f.l, c)
|
|
if err != nil {
|
|
f.l.WithError(err).Error("Could not refresh trusted CA certificates")
|
|
return
|
|
}
|
|
|
|
f.caPool = newCAs
|
|
f.l.WithField("fingerprints", f.caPool.GetFingerprints()).Info("Trusted CA certificates refreshed")
|
|
}
|
|
|
|
func (f *Interface) reloadCertKey(c *config.C) {
|
|
// reload and check in all cases
|
|
cs, err := NewCertStateFromConfig(c)
|
|
if err != nil {
|
|
f.l.WithError(err).Error("Could not refresh client cert")
|
|
return
|
|
}
|
|
|
|
// did IP in cert change? if so, don't set
|
|
oldIPs := f.certState.certificate.Details.Ips
|
|
newIPs := cs.certificate.Details.Ips
|
|
if len(oldIPs) > 0 && len(newIPs) > 0 && oldIPs[0].String() != newIPs[0].String() {
|
|
f.l.WithField("new_ip", newIPs[0]).WithField("old_ip", oldIPs[0]).Error("IP in new cert was different from old")
|
|
return
|
|
}
|
|
|
|
f.certState = cs
|
|
f.l.WithField("cert", cs.certificate).Info("Client cert refreshed from disk")
|
|
}
|
|
|
|
func (f *Interface) reloadFirewall(c *config.C) {
|
|
//TODO: need to trigger/detect if the certificate changed too
|
|
if c.HasChanged("firewall") == false {
|
|
f.l.Debug("No firewall config change detected")
|
|
return
|
|
}
|
|
|
|
fw, err := NewFirewallFromConfig(f.l, f.certState.certificate, c)
|
|
if err != nil {
|
|
f.l.WithError(err).Error("Error while creating firewall during reload")
|
|
return
|
|
}
|
|
|
|
oldFw := f.firewall
|
|
conntrack := oldFw.Conntrack
|
|
conntrack.Lock()
|
|
defer conntrack.Unlock()
|
|
|
|
fw.rulesVersion = oldFw.rulesVersion + 1
|
|
// If rulesVersion is back to zero, we have wrapped all the way around. Be
|
|
// safe and just reset conntrack in this case.
|
|
if fw.rulesVersion == 0 {
|
|
f.l.WithField("firewallHash", fw.GetRuleHash()).
|
|
WithField("oldFirewallHash", oldFw.GetRuleHash()).
|
|
WithField("rulesVersion", fw.rulesVersion).
|
|
Warn("firewall rulesVersion has overflowed, resetting conntrack")
|
|
} else {
|
|
fw.Conntrack = conntrack
|
|
}
|
|
|
|
f.firewall = fw
|
|
|
|
oldFw.Destroy()
|
|
f.l.WithField("firewallHash", fw.GetRuleHash()).
|
|
WithField("oldFirewallHash", oldFw.GetRuleHash()).
|
|
WithField("rulesVersion", fw.rulesVersion).
|
|
Info("New firewall has been installed")
|
|
}
|
|
|
|
func (f *Interface) reloadSendRecvError(c *config.C) {
|
|
if c.InitialLoad() || c.HasChanged("listen.send_recv_error") {
|
|
stringValue := c.GetString("listen.send_recv_error", "always")
|
|
|
|
switch stringValue {
|
|
case "always":
|
|
f.sendRecvErrorConfig = sendRecvErrorAlways
|
|
case "never":
|
|
f.sendRecvErrorConfig = sendRecvErrorNever
|
|
case "private":
|
|
f.sendRecvErrorConfig = sendRecvErrorPrivate
|
|
default:
|
|
if c.GetBool("listen.send_recv_error", true) {
|
|
f.sendRecvErrorConfig = sendRecvErrorAlways
|
|
} else {
|
|
f.sendRecvErrorConfig = sendRecvErrorNever
|
|
}
|
|
}
|
|
|
|
f.l.WithField("sendRecvError", f.sendRecvErrorConfig.String()).
|
|
Info("Loaded send_recv_error config")
|
|
}
|
|
}
|
|
|
|
func (f *Interface) emitStats(ctx context.Context, i time.Duration) {
|
|
ticker := time.NewTicker(i)
|
|
defer ticker.Stop()
|
|
|
|
udpStats := udp.NewUDPStatsEmitter(f.writers)
|
|
|
|
var rawStats func()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
f.firewall.EmitStats()
|
|
f.handshakeManager.EmitStats()
|
|
udpStats()
|
|
if f.udpRaw != nil {
|
|
if rawStats == nil {
|
|
rawStats = udp.NewRawStatsEmitter(f.udpRaw)
|
|
}
|
|
rawStats()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (f *Interface) Close() error {
|
|
atomic.StoreInt32(&f.closed, 1)
|
|
|
|
// Release the tun device
|
|
return f.inside.Close()
|
|
}
|