Compare commits

..

6 Commits

27 changed files with 298 additions and 1989 deletions

View File

@@ -7,30 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Added
- Experimental Linux UDP offload support: enable `listen.enable_gso` and
`listen.enable_gro` to activate UDP_SEGMENT batching and GRO receive
splitting. Includes automatic capability probing, per-packet fallbacks, and
runtime metrics/logs for visibility.
- Optional Linux TUN `virtio_net_hdr` support: set `tun.enable_vnet_hdr` to
have Nebula negotiate VNET headers and offload flags so future batches can
be delivered to the kernel with metadata instead of per-packet writes.
- Linux UDP send sharding can now be tuned with `listen.send_shards`; defaults
to `GOMAXPROCS` but can be increased to stripe heavy peers across more
goroutines.
### Changed ### Changed
- `default_local_cidr_any` now defaults to false, meaning that any firewall rule - `default_local_cidr_any` now defaults to false, meaning that any firewall rule
intended to target an `unsafe_routes` entry must explicitly declare it via the intended to target an `unsafe_routes` entry must explicitly declare it via the
`local_cidr` field. This is almost always the intended behavior. This flag is `local_cidr` field. This is almost always the intended behavior. This flag is
deprecated and will be removed in a future release. deprecated and will be removed in a future release.
- UDP receive path now enqueues into per-worker lock-free rings, restoring the
`listen.decrypt_workers`/`listen.decrypt_queue_depth` tuning knobs while
eliminating the mutex contention from the old shared channel.
- Increased replay protection window to 32k packets so high-throughput links
tolerate larger bursts of reordering without tripping the anti-replay logic.
## [1.9.4] - 2024-09-09 ## [1.9.4] - 2024-09-09

View File

@@ -13,10 +13,7 @@ import (
"github.com/slackhq/nebula/noiseutil" "github.com/slackhq/nebula/noiseutil"
) )
// ReplayWindow controls the size of the sliding window used to detect replays. const ReplayWindow = 1024
// High-bandwidth links with GRO/GSO can reorder more than a thousand packets in
// flight, so keep this comfortably above the largest expected burst.
const ReplayWindow = 32768
type ConnectionState struct { type ConnectionState struct {
eKey *NebulaCipherState eKey *NebulaCipherState

View File

@@ -29,8 +29,6 @@ type m = map[string]any
// newSimpleServer creates a nebula instance with many assumptions // newSimpleServer creates a nebula instance with many assumptions
func newSimpleServer(v cert.Version, caCrt cert.Certificate, caKey []byte, name string, sVpnNetworks string, overrides m) (*nebula.Control, []netip.Prefix, netip.AddrPort, *config.C) { func newSimpleServer(v cert.Version, caCrt cert.Certificate, caKey []byte, name string, sVpnNetworks string, overrides m) (*nebula.Control, []netip.Prefix, netip.AddrPort, *config.C) {
l := NewTestLogger()
var vpnNetworks []netip.Prefix var vpnNetworks []netip.Prefix
for _, sn := range strings.Split(sVpnNetworks, ",") { for _, sn := range strings.Split(sVpnNetworks, ",") {
vpnIpNet, err := netip.ParsePrefix(strings.TrimSpace(sn)) vpnIpNet, err := netip.ParsePrefix(strings.TrimSpace(sn))
@@ -56,6 +54,25 @@ func newSimpleServer(v cert.Version, caCrt cert.Certificate, caKey []byte, name
budpIp[3] = 239 budpIp[3] = 239
udpAddr = netip.AddrPortFrom(netip.AddrFrom16(budpIp), 4242) udpAddr = netip.AddrPortFrom(netip.AddrFrom16(budpIp), 4242)
} }
return newSimpleServerWithUdp(v, caCrt, caKey, name, sVpnNetworks, udpAddr, overrides)
}
func newSimpleServerWithUdp(v cert.Version, caCrt cert.Certificate, caKey []byte, name string, sVpnNetworks string, udpAddr netip.AddrPort, overrides m) (*nebula.Control, []netip.Prefix, netip.AddrPort, *config.C) {
l := NewTestLogger()
var vpnNetworks []netip.Prefix
for _, sn := range strings.Split(sVpnNetworks, ",") {
vpnIpNet, err := netip.ParsePrefix(strings.TrimSpace(sn))
if err != nil {
panic(err)
}
vpnNetworks = append(vpnNetworks, vpnIpNet)
}
if len(vpnNetworks) == 0 {
panic("no vpn networks")
}
_, _, myPrivKey, myPEM := cert_test.NewTestCert(v, cert.Curve_CURVE25519, caCrt, caKey, name, time.Now(), time.Now().Add(5*time.Minute), vpnNetworks, nil, []string{}) _, _, myPrivKey, myPEM := cert_test.NewTestCert(v, cert.Curve_CURVE25519, caCrt, caKey, name, time.Now(), time.Now().Add(5*time.Minute), vpnNetworks, nil, []string{})
caB, err := caCrt.MarshalPEM() caB, err := caCrt.MarshalPEM()

View File

@@ -4,6 +4,7 @@
package e2e package e2e
import ( import (
"net/netip"
"testing" "testing"
"time" "time"
@@ -55,3 +56,50 @@ func TestDropInactiveTunnels(t *testing.T) {
myControl.Stop() myControl.Stop()
theirControl.Stop() theirControl.Stop()
} }
func TestCrossStackRelaysWork(t *testing.T) {
ca, _, caKey, _ := cert_test.NewTestCaCert(cert.Version2, cert.Curve_CURVE25519, time.Now(), time.Now().Add(10*time.Minute), nil, nil, []string{})
myControl, myVpnIpNet, _, _ := newSimpleServer(cert.Version2, ca, caKey, "me ", "10.128.0.1/24,fc00::1/64", m{"relay": m{"use_relays": true}})
relayControl, relayVpnIpNet, relayUdpAddr, _ := newSimpleServer(cert.Version2, ca, caKey, "relay ", "10.128.0.128/24,fc00::128/64", m{"relay": m{"am_relay": true}})
theirUdp := netip.MustParseAddrPort("10.0.0.2:4242")
theirControl, theirVpnIpNet, theirUdpAddr, _ := newSimpleServerWithUdp(cert.Version2, ca, caKey, "them ", "fc00::2/64", theirUdp, m{"relay": m{"use_relays": true}})
//myVpnV4 := myVpnIpNet[0]
myVpnV6 := myVpnIpNet[1]
relayVpnV4 := relayVpnIpNet[0]
relayVpnV6 := relayVpnIpNet[1]
theirVpnV6 := theirVpnIpNet[0]
// Teach my how to get to the relay and that their can be reached via the relay
myControl.InjectLightHouseAddr(relayVpnV4.Addr(), relayUdpAddr)
myControl.InjectLightHouseAddr(relayVpnV6.Addr(), relayUdpAddr)
myControl.InjectRelays(theirVpnV6.Addr(), []netip.Addr{relayVpnV6.Addr()})
relayControl.InjectLightHouseAddr(theirVpnV6.Addr(), theirUdpAddr)
// Build a router so we don't have to reason who gets which packet
r := router.NewR(t, myControl, relayControl, theirControl)
defer r.RenderFlow()
// Start the servers
myControl.Start()
relayControl.Start()
theirControl.Start()
t.Log("Trigger a handshake from me to them via the relay")
myControl.InjectTunUDPPacket(theirVpnV6.Addr(), 80, myVpnV6.Addr(), 80, []byte("Hi from me"))
p := r.RouteForAllUntilTxTun(theirControl)
r.Log("Assert the tunnel works")
assertUdpPacket(t, []byte("Hi from me"), p, myVpnV6.Addr(), theirVpnV6.Addr(), 80, 80)
t.Log("reply?")
theirControl.InjectTunUDPPacket(myVpnV6.Addr(), 80, theirVpnV6.Addr(), 80, []byte("Hi from them"))
p = r.RouteForAllUntilTxTun(myControl)
assertUdpPacket(t, []byte("Hi from them"), p, theirVpnV6.Addr(), myVpnV6.Addr(), 80, 80)
r.RenderHostmaps("Final hostmaps", myControl, relayControl, theirControl)
//t.Log("finish up")
//myControl.Stop()
//theirControl.Stop()
//relayControl.Stop()
}

View File

@@ -417,6 +417,8 @@ func AddFirewallRulesFromConfig(l *logrus.Logger, inbound bool, c *config.C, fw
return nil return nil
} }
var ErrUnknownNetworkType = errors.New("unknown network type")
var ErrPeerRejected = errors.New("remote IP is not within a subnet that we handle")
var ErrInvalidRemoteIP = errors.New("remote IP is not in remote certificate subnets") var ErrInvalidRemoteIP = errors.New("remote IP is not in remote certificate subnets")
var ErrInvalidLocalIP = errors.New("local IP is not in list of handled local IPs") var ErrInvalidLocalIP = errors.New("local IP is not in list of handled local IPs")
var ErrNoMatchingRule = errors.New("no matching rule in firewall table") var ErrNoMatchingRule = errors.New("no matching rule in firewall table")
@@ -429,18 +431,31 @@ func (f *Firewall) Drop(fp firewall.Packet, incoming bool, h *HostInfo, caPool *
return nil return nil
} }
// Make sure remote address matches nebula certificate // Make sure remote address matches nebula certificate, and determine how to treat it
if h.networks != nil { if h.networks == nil {
if !h.networks.Contains(fp.RemoteAddr) {
f.metrics(incoming).droppedRemoteAddr.Inc(1)
return ErrInvalidRemoteIP
}
} else {
// Simple case: Certificate has one address and no unsafe networks // Simple case: Certificate has one address and no unsafe networks
if h.vpnAddrs[0] != fp.RemoteAddr { if h.vpnAddrs[0] != fp.RemoteAddr {
f.metrics(incoming).droppedRemoteAddr.Inc(1) f.metrics(incoming).droppedRemoteAddr.Inc(1)
return ErrInvalidRemoteIP return ErrInvalidRemoteIP
} }
} else {
nwType, ok := h.networks.Lookup(fp.RemoteAddr)
if !ok {
f.metrics(incoming).droppedRemoteAddr.Inc(1)
return ErrInvalidRemoteIP
}
switch nwType {
case NetworkTypeVPN:
break // nothing special
case NetworkTypeVPNPeer:
f.metrics(incoming).droppedRemoteAddr.Inc(1)
return ErrPeerRejected // reject for now, one day this may have different FW rules
case NetworkTypeUnsafe:
break // nothing special, one day this may have different FW rules
default:
f.metrics(incoming).droppedRemoteAddr.Inc(1)
return ErrUnknownNetworkType //should never happen
}
} }
// Make sure we are supposed to be handling this local ip address // Make sure we are supposed to be handling this local ip address

View File

@@ -8,6 +8,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/gaissmai/bart"
"github.com/slackhq/nebula/cert" "github.com/slackhq/nebula/cert"
"github.com/slackhq/nebula/config" "github.com/slackhq/nebula/config"
"github.com/slackhq/nebula/firewall" "github.com/slackhq/nebula/firewall"
@@ -149,7 +150,8 @@ func TestFirewall_Drop(t *testing.T) {
l := test.NewLogger() l := test.NewLogger()
ob := &bytes.Buffer{} ob := &bytes.Buffer{}
l.SetOutput(ob) l.SetOutput(ob)
myVpnNetworksTable := new(bart.Lite)
myVpnNetworksTable.Insert(netip.MustParsePrefix("1.1.1.1/8"))
p := firewall.Packet{ p := firewall.Packet{
LocalAddr: netip.MustParseAddr("1.2.3.4"), LocalAddr: netip.MustParseAddr("1.2.3.4"),
RemoteAddr: netip.MustParseAddr("1.2.3.4"), RemoteAddr: netip.MustParseAddr("1.2.3.4"),
@@ -174,7 +176,7 @@ func TestFirewall_Drop(t *testing.T) {
}, },
vpnAddrs: []netip.Addr{netip.MustParseAddr("1.2.3.4")}, vpnAddrs: []netip.Addr{netip.MustParseAddr("1.2.3.4")},
} }
h.buildNetworks(c.networks, c.unsafeNetworks) h.buildNetworks(myVpnNetworksTable, c.networks, c.unsafeNetworks)
fw := NewFirewall(l, time.Second, time.Minute, time.Hour, &c) fw := NewFirewall(l, time.Second, time.Minute, time.Hour, &c)
require.NoError(t, fw.AddRule(true, firewall.ProtoAny, 0, 0, []string{"any"}, "", netip.Prefix{}, netip.Prefix{}, "", "")) require.NoError(t, fw.AddRule(true, firewall.ProtoAny, 0, 0, []string{"any"}, "", netip.Prefix{}, netip.Prefix{}, "", ""))
@@ -226,6 +228,9 @@ func TestFirewall_DropV6(t *testing.T) {
ob := &bytes.Buffer{} ob := &bytes.Buffer{}
l.SetOutput(ob) l.SetOutput(ob)
myVpnNetworksTable := new(bart.Lite)
myVpnNetworksTable.Insert(netip.MustParsePrefix("fd00::/7"))
p := firewall.Packet{ p := firewall.Packet{
LocalAddr: netip.MustParseAddr("fd12::34"), LocalAddr: netip.MustParseAddr("fd12::34"),
RemoteAddr: netip.MustParseAddr("fd12::34"), RemoteAddr: netip.MustParseAddr("fd12::34"),
@@ -250,7 +255,7 @@ func TestFirewall_DropV6(t *testing.T) {
}, },
vpnAddrs: []netip.Addr{netip.MustParseAddr("fd12::34")}, vpnAddrs: []netip.Addr{netip.MustParseAddr("fd12::34")},
} }
h.buildNetworks(c.networks, c.unsafeNetworks) h.buildNetworks(myVpnNetworksTable, c.networks, c.unsafeNetworks)
fw := NewFirewall(l, time.Second, time.Minute, time.Hour, &c) fw := NewFirewall(l, time.Second, time.Minute, time.Hour, &c)
require.NoError(t, fw.AddRule(true, firewall.ProtoAny, 0, 0, []string{"any"}, "", netip.Prefix{}, netip.Prefix{}, "", "")) require.NoError(t, fw.AddRule(true, firewall.ProtoAny, 0, 0, []string{"any"}, "", netip.Prefix{}, netip.Prefix{}, "", ""))
@@ -453,6 +458,8 @@ func TestFirewall_Drop2(t *testing.T) {
l := test.NewLogger() l := test.NewLogger()
ob := &bytes.Buffer{} ob := &bytes.Buffer{}
l.SetOutput(ob) l.SetOutput(ob)
myVpnNetworksTable := new(bart.Lite)
myVpnNetworksTable.Insert(netip.MustParsePrefix("1.1.1.1/8"))
p := firewall.Packet{ p := firewall.Packet{
LocalAddr: netip.MustParseAddr("1.2.3.4"), LocalAddr: netip.MustParseAddr("1.2.3.4"),
@@ -478,7 +485,7 @@ func TestFirewall_Drop2(t *testing.T) {
}, },
vpnAddrs: []netip.Addr{network.Addr()}, vpnAddrs: []netip.Addr{network.Addr()},
} }
h.buildNetworks(c.Certificate.Networks(), c.Certificate.UnsafeNetworks()) h.buildNetworks(myVpnNetworksTable, c.Certificate.Networks(), c.Certificate.UnsafeNetworks())
c1 := cert.CachedCertificate{ c1 := cert.CachedCertificate{
Certificate: &dummyCert{ Certificate: &dummyCert{
@@ -493,7 +500,7 @@ func TestFirewall_Drop2(t *testing.T) {
peerCert: &c1, peerCert: &c1,
}, },
} }
h1.buildNetworks(c1.Certificate.Networks(), c1.Certificate.UnsafeNetworks()) h1.buildNetworks(myVpnNetworksTable, c1.Certificate.Networks(), c1.Certificate.UnsafeNetworks())
fw := NewFirewall(l, time.Second, time.Minute, time.Hour, c.Certificate) fw := NewFirewall(l, time.Second, time.Minute, time.Hour, c.Certificate)
require.NoError(t, fw.AddRule(true, firewall.ProtoAny, 0, 0, []string{"default-group", "test-group"}, "", netip.Prefix{}, netip.Prefix{}, "", "")) require.NoError(t, fw.AddRule(true, firewall.ProtoAny, 0, 0, []string{"default-group", "test-group"}, "", netip.Prefix{}, netip.Prefix{}, "", ""))
@@ -510,6 +517,8 @@ func TestFirewall_Drop3(t *testing.T) {
l := test.NewLogger() l := test.NewLogger()
ob := &bytes.Buffer{} ob := &bytes.Buffer{}
l.SetOutput(ob) l.SetOutput(ob)
myVpnNetworksTable := new(bart.Lite)
myVpnNetworksTable.Insert(netip.MustParsePrefix("1.1.1.1/8"))
p := firewall.Packet{ p := firewall.Packet{
LocalAddr: netip.MustParseAddr("1.2.3.4"), LocalAddr: netip.MustParseAddr("1.2.3.4"),
@@ -541,7 +550,7 @@ func TestFirewall_Drop3(t *testing.T) {
}, },
vpnAddrs: []netip.Addr{network.Addr()}, vpnAddrs: []netip.Addr{network.Addr()},
} }
h1.buildNetworks(c1.Certificate.Networks(), c1.Certificate.UnsafeNetworks()) h1.buildNetworks(myVpnNetworksTable, c1.Certificate.Networks(), c1.Certificate.UnsafeNetworks())
c2 := cert.CachedCertificate{ c2 := cert.CachedCertificate{
Certificate: &dummyCert{ Certificate: &dummyCert{
@@ -556,7 +565,7 @@ func TestFirewall_Drop3(t *testing.T) {
}, },
vpnAddrs: []netip.Addr{network.Addr()}, vpnAddrs: []netip.Addr{network.Addr()},
} }
h2.buildNetworks(c2.Certificate.Networks(), c2.Certificate.UnsafeNetworks()) h2.buildNetworks(myVpnNetworksTable, c2.Certificate.Networks(), c2.Certificate.UnsafeNetworks())
c3 := cert.CachedCertificate{ c3 := cert.CachedCertificate{
Certificate: &dummyCert{ Certificate: &dummyCert{
@@ -571,7 +580,7 @@ func TestFirewall_Drop3(t *testing.T) {
}, },
vpnAddrs: []netip.Addr{network.Addr()}, vpnAddrs: []netip.Addr{network.Addr()},
} }
h3.buildNetworks(c3.Certificate.Networks(), c3.Certificate.UnsafeNetworks()) h3.buildNetworks(myVpnNetworksTable, c3.Certificate.Networks(), c3.Certificate.UnsafeNetworks())
fw := NewFirewall(l, time.Second, time.Minute, time.Hour, c.Certificate) fw := NewFirewall(l, time.Second, time.Minute, time.Hour, c.Certificate)
require.NoError(t, fw.AddRule(true, firewall.ProtoAny, 1, 1, []string{}, "host1", netip.Prefix{}, netip.Prefix{}, "", "")) require.NoError(t, fw.AddRule(true, firewall.ProtoAny, 1, 1, []string{}, "host1", netip.Prefix{}, netip.Prefix{}, "", ""))
@@ -597,6 +606,8 @@ func TestFirewall_Drop3V6(t *testing.T) {
l := test.NewLogger() l := test.NewLogger()
ob := &bytes.Buffer{} ob := &bytes.Buffer{}
l.SetOutput(ob) l.SetOutput(ob)
myVpnNetworksTable := new(bart.Lite)
myVpnNetworksTable.Insert(netip.MustParsePrefix("fd00::/7"))
p := firewall.Packet{ p := firewall.Packet{
LocalAddr: netip.MustParseAddr("fd12::34"), LocalAddr: netip.MustParseAddr("fd12::34"),
@@ -620,7 +631,7 @@ func TestFirewall_Drop3V6(t *testing.T) {
}, },
vpnAddrs: []netip.Addr{network.Addr()}, vpnAddrs: []netip.Addr{network.Addr()},
} }
h.buildNetworks(c.Certificate.Networks(), c.Certificate.UnsafeNetworks()) h.buildNetworks(myVpnNetworksTable, c.Certificate.Networks(), c.Certificate.UnsafeNetworks())
// Test a remote address match // Test a remote address match
fw := NewFirewall(l, time.Second, time.Minute, time.Hour, c.Certificate) fw := NewFirewall(l, time.Second, time.Minute, time.Hour, c.Certificate)
@@ -633,6 +644,8 @@ func TestFirewall_DropConntrackReload(t *testing.T) {
l := test.NewLogger() l := test.NewLogger()
ob := &bytes.Buffer{} ob := &bytes.Buffer{}
l.SetOutput(ob) l.SetOutput(ob)
myVpnNetworksTable := new(bart.Lite)
myVpnNetworksTable.Insert(netip.MustParsePrefix("1.1.1.1/8"))
p := firewall.Packet{ p := firewall.Packet{
LocalAddr: netip.MustParseAddr("1.2.3.4"), LocalAddr: netip.MustParseAddr("1.2.3.4"),
@@ -659,7 +672,7 @@ func TestFirewall_DropConntrackReload(t *testing.T) {
}, },
vpnAddrs: []netip.Addr{network.Addr()}, vpnAddrs: []netip.Addr{network.Addr()},
} }
h.buildNetworks(c.Certificate.Networks(), c.Certificate.UnsafeNetworks()) h.buildNetworks(myVpnNetworksTable, c.Certificate.Networks(), c.Certificate.UnsafeNetworks())
fw := NewFirewall(l, time.Second, time.Minute, time.Hour, c.Certificate) fw := NewFirewall(l, time.Second, time.Minute, time.Hour, c.Certificate)
require.NoError(t, fw.AddRule(true, firewall.ProtoAny, 0, 0, []string{"any"}, "", netip.Prefix{}, netip.Prefix{}, "", "")) require.NoError(t, fw.AddRule(true, firewall.ProtoAny, 0, 0, []string{"any"}, "", netip.Prefix{}, netip.Prefix{}, "", ""))
@@ -696,6 +709,8 @@ func TestFirewall_DropIPSpoofing(t *testing.T) {
l := test.NewLogger() l := test.NewLogger()
ob := &bytes.Buffer{} ob := &bytes.Buffer{}
l.SetOutput(ob) l.SetOutput(ob)
myVpnNetworksTable := new(bart.Lite)
myVpnNetworksTable.Insert(netip.MustParsePrefix("192.0.2.1/24"))
c := cert.CachedCertificate{ c := cert.CachedCertificate{
Certificate: &dummyCert{ Certificate: &dummyCert{
@@ -717,7 +732,7 @@ func TestFirewall_DropIPSpoofing(t *testing.T) {
}, },
vpnAddrs: []netip.Addr{c1.Certificate.Networks()[0].Addr()}, vpnAddrs: []netip.Addr{c1.Certificate.Networks()[0].Addr()},
} }
h1.buildNetworks(c1.Certificate.Networks(), c1.Certificate.UnsafeNetworks()) h1.buildNetworks(myVpnNetworksTable, c1.Certificate.Networks(), c1.Certificate.UnsafeNetworks())
fw := NewFirewall(l, time.Second, time.Minute, time.Hour, c.Certificate) fw := NewFirewall(l, time.Second, time.Minute, time.Hour, c.Certificate)

View File

@@ -183,17 +183,18 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
return return
} }
var vpnAddrs []netip.Addr
var filteredNetworks []netip.Prefix
certName := remoteCert.Certificate.Name() certName := remoteCert.Certificate.Name()
certVersion := remoteCert.Certificate.Version() certVersion := remoteCert.Certificate.Version()
fingerprint := remoteCert.Fingerprint fingerprint := remoteCert.Fingerprint
issuer := remoteCert.Certificate.Issuer() issuer := remoteCert.Certificate.Issuer()
vpnNetworks := remoteCert.Certificate.Networks()
for _, network := range remoteCert.Certificate.Networks() { anyVpnAddrsInCommon := false
vpnAddrs := make([]netip.Addr, len(vpnNetworks))
for i, network := range vpnNetworks {
vpnAddr := network.Addr() vpnAddr := network.Addr()
if f.myVpnAddrsTable.Contains(vpnAddr) { if f.myVpnAddrsTable.Contains(vpnAddr) {
f.l.WithField("vpnAddr", vpnAddr).WithField("udpAddr", addr). f.l.WithField("vpnNetworks", vpnNetworks).WithField("udpAddr", addr).
WithField("certName", certName). WithField("certName", certName).
WithField("certVersion", certVersion). WithField("certVersion", certVersion).
WithField("fingerprint", fingerprint). WithField("fingerprint", fingerprint).
@@ -201,24 +202,10 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).Error("Refusing to handshake with myself") WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).Error("Refusing to handshake with myself")
return return
} }
vpnAddrs[i] = network.Addr()
// vpnAddrs outside our vpn networks are of no use to us, filter them out if f.myVpnNetworksTable.Contains(vpnAddr) {
if !f.myVpnNetworksTable.Contains(vpnAddr) { anyVpnAddrsInCommon = true
continue
} }
filteredNetworks = append(filteredNetworks, network)
vpnAddrs = append(vpnAddrs, vpnAddr)
}
if len(vpnAddrs) == 0 {
f.l.WithError(err).WithField("udpAddr", addr).
WithField("certName", certName).
WithField("certVersion", certVersion).
WithField("fingerprint", fingerprint).
WithField("issuer", issuer).
WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).Error("No usable vpn addresses from host, refusing handshake")
return
} }
if addr.IsValid() { if addr.IsValid() {
@@ -255,26 +242,30 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
}, },
} }
f.l.WithField("vpnAddrs", vpnAddrs).WithField("udpAddr", addr). msgRxL := f.l.WithFields(m{
WithField("certName", certName). "vpnAddrs": vpnAddrs,
WithField("certVersion", certVersion). "udpAddr": addr,
WithField("fingerprint", fingerprint). "certName": certName,
WithField("issuer", issuer). "certVersion": certVersion,
WithField("initiatorIndex", hs.Details.InitiatorIndex).WithField("responderIndex", hs.Details.ResponderIndex). "fingerprint": fingerprint,
WithField("remoteIndex", h.RemoteIndex).WithField("handshake", m{"stage": 1, "style": "ix_psk0"}). "issuer": issuer,
Info("Handshake message received") "initiatorIndex": hs.Details.InitiatorIndex,
"responderIndex": hs.Details.ResponderIndex,
"remoteIndex": h.RemoteIndex,
"handshake": m{"stage": 1, "style": "ix_psk0"},
})
if anyVpnAddrsInCommon {
msgRxL.Info("Handshake message received")
} else {
//todo warn if not lighthouse or relay?
msgRxL.Info("Handshake message received, but no vpnNetworks in common.")
}
hs.Details.ResponderIndex = myIndex hs.Details.ResponderIndex = myIndex
hs.Details.Cert = cs.getHandshakeBytes(ci.myCert.Version()) hs.Details.Cert = cs.getHandshakeBytes(ci.myCert.Version())
if hs.Details.Cert == nil { if hs.Details.Cert == nil {
f.l.WithField("vpnAddrs", vpnAddrs).WithField("udpAddr", addr). msgRxL.WithField("myCertVersion", ci.myCert.Version()).
WithField("certName", certName).
WithField("certVersion", certVersion).
WithField("fingerprint", fingerprint).
WithField("issuer", issuer).
WithField("initiatorIndex", hs.Details.InitiatorIndex).WithField("responderIndex", hs.Details.ResponderIndex).
WithField("remoteIndex", h.RemoteIndex).WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).
WithField("certVersion", ci.myCert.Version()).
Error("Unable to handshake with host because no certificate handshake bytes is available") Error("Unable to handshake with host because no certificate handshake bytes is available")
return return
} }
@@ -332,7 +323,7 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet
hostinfo.remotes = f.lightHouse.QueryCache(vpnAddrs) hostinfo.remotes = f.lightHouse.QueryCache(vpnAddrs)
hostinfo.SetRemote(addr) hostinfo.SetRemote(addr)
hostinfo.buildNetworks(filteredNetworks, remoteCert.Certificate.UnsafeNetworks()) hostinfo.buildNetworks(f.myVpnNetworksTable, remoteCert.Certificate.Networks(), remoteCert.Certificate.UnsafeNetworks())
existing, err := f.handshakeManager.CheckAndComplete(hostinfo, 0, f) existing, err := f.handshakeManager.CheckAndComplete(hostinfo, 0, f)
if err != nil { if err != nil {
@@ -573,30 +564,17 @@ func ixHandshakeStage2(f *Interface, addr netip.AddrPort, via *ViaSender, hh *Ha
hostinfo.relayState.InsertRelayTo(via.relayHI.vpnAddrs[0]) hostinfo.relayState.InsertRelayTo(via.relayHI.vpnAddrs[0])
} }
var vpnAddrs []netip.Addr anyVpnAddrsInCommon := false
var filteredNetworks []netip.Prefix vpnAddrs := make([]netip.Addr, len(vpnNetworks))
for _, network := range vpnNetworks { for i, network := range vpnNetworks {
// vpnAddrs outside our vpn networks are of no use to us, filter them out vpnAddrs[i] = network.Addr()
vpnAddr := network.Addr() if f.myVpnNetworksTable.Contains(network.Addr()) {
if !f.myVpnNetworksTable.Contains(vpnAddr) { anyVpnAddrsInCommon = true
continue
} }
filteredNetworks = append(filteredNetworks, network)
vpnAddrs = append(vpnAddrs, vpnAddr)
}
if len(vpnAddrs) == 0 {
f.l.WithError(err).WithField("udpAddr", addr).
WithField("certName", certName).
WithField("certVersion", certVersion).
WithField("fingerprint", fingerprint).
WithField("issuer", issuer).
WithField("handshake", m{"stage": 2, "style": "ix_psk0"}).Error("No usable vpn addresses from host, refusing handshake")
return true
} }
// Ensure the right host responded // Ensure the right host responded
// todo is it more correct to see if any of hostinfo.vpnAddrs are in the cert? it should have len==1, but one day it might not?
if !slices.Contains(vpnAddrs, hostinfo.vpnAddrs[0]) { if !slices.Contains(vpnAddrs, hostinfo.vpnAddrs[0]) {
f.l.WithField("intendedVpnAddrs", hostinfo.vpnAddrs).WithField("haveVpnNetworks", vpnNetworks). f.l.WithField("intendedVpnAddrs", hostinfo.vpnAddrs).WithField("haveVpnNetworks", vpnNetworks).
WithField("udpAddr", addr). WithField("udpAddr", addr).
@@ -609,6 +587,7 @@ func ixHandshakeStage2(f *Interface, addr netip.AddrPort, via *ViaSender, hh *Ha
f.handshakeManager.DeleteHostInfo(hostinfo) f.handshakeManager.DeleteHostInfo(hostinfo)
// Create a new hostinfo/handshake for the intended vpn ip // Create a new hostinfo/handshake for the intended vpn ip
//TODO is hostinfo.vpnAddrs[0] always the address to use?
f.handshakeManager.StartHandshake(hostinfo.vpnAddrs[0], func(newHH *HandshakeHostInfo) { f.handshakeManager.StartHandshake(hostinfo.vpnAddrs[0], func(newHH *HandshakeHostInfo) {
// Block the current used address // Block the current used address
newHH.hostinfo.remotes = hostinfo.remotes newHH.hostinfo.remotes = hostinfo.remotes
@@ -635,7 +614,7 @@ func ixHandshakeStage2(f *Interface, addr netip.AddrPort, via *ViaSender, hh *Ha
ci.window.Update(f.l, 2) ci.window.Update(f.l, 2)
duration := time.Since(hh.startTime).Nanoseconds() duration := time.Since(hh.startTime).Nanoseconds()
f.l.WithField("vpnAddrs", vpnAddrs).WithField("udpAddr", addr). msgRxL := f.l.WithField("vpnAddrs", vpnAddrs).WithField("udpAddr", addr).
WithField("certName", certName). WithField("certName", certName).
WithField("certVersion", certVersion). WithField("certVersion", certVersion).
WithField("fingerprint", fingerprint). WithField("fingerprint", fingerprint).
@@ -643,12 +622,17 @@ func ixHandshakeStage2(f *Interface, addr netip.AddrPort, via *ViaSender, hh *Ha
WithField("initiatorIndex", hs.Details.InitiatorIndex).WithField("responderIndex", hs.Details.ResponderIndex). WithField("initiatorIndex", hs.Details.InitiatorIndex).WithField("responderIndex", hs.Details.ResponderIndex).
WithField("remoteIndex", h.RemoteIndex).WithField("handshake", m{"stage": 2, "style": "ix_psk0"}). WithField("remoteIndex", h.RemoteIndex).WithField("handshake", m{"stage": 2, "style": "ix_psk0"}).
WithField("durationNs", duration). WithField("durationNs", duration).
WithField("sentCachedPackets", len(hh.packetStore)). WithField("sentCachedPackets", len(hh.packetStore))
Info("Handshake message received") if anyVpnAddrsInCommon {
msgRxL.Info("Handshake message received")
} else {
//todo warn if not lighthouse or relay?
msgRxL.Info("Handshake message received, but no vpnNetworks in common.")
}
// Build up the radix for the firewall if we have subnets in the cert // Build up the radix for the firewall if we have subnets in the cert
hostinfo.vpnAddrs = vpnAddrs hostinfo.vpnAddrs = vpnAddrs
hostinfo.buildNetworks(filteredNetworks, remoteCert.Certificate.UnsafeNetworks()) hostinfo.buildNetworks(f.myVpnNetworksTable, remoteCert.Certificate.Networks(), remoteCert.Certificate.UnsafeNetworks())
// Complete our handshake and update metrics, this will replace any existing tunnels for the vpnAddrs here // Complete our handshake and update metrics, this will replace any existing tunnels for the vpnAddrs here
f.handshakeManager.Complete(hostinfo, f) f.handshakeManager.Complete(hostinfo, f)

View File

@@ -212,6 +212,18 @@ func (rs *RelayState) InsertRelay(ip netip.Addr, idx uint32, r *Relay) {
rs.relayForByIdx[idx] = r rs.relayForByIdx[idx] = r
} }
type NetworkType uint8
const (
NetworkTypeUnknown NetworkType = iota
// NetworkTypeVPN is a network that overlaps one or more of the vpnNetworks in our certificate
NetworkTypeVPN
// NetworkTypeVPNPeer is a network that does not overlap one of our networks
NetworkTypeVPNPeer
// NetworkTypeUnsafe is a network from Certificate.UnsafeNetworks()
NetworkTypeUnsafe
)
type HostInfo struct { type HostInfo struct {
remote netip.AddrPort remote netip.AddrPort
remotes *RemoteList remotes *RemoteList
@@ -220,13 +232,11 @@ type HostInfo struct {
remoteIndexId uint32 remoteIndexId uint32
localIndexId uint32 localIndexId uint32
// vpnAddrs is a list of vpn addresses assigned to this host that are within our own vpn networks // vpnAddrs is a list of vpn addresses assigned to this host
// The host may have other vpn addresses that are outside our
// vpn networks but were removed because they are not usable
vpnAddrs []netip.Addr vpnAddrs []netip.Addr
// networks are both all vpn and unsafe networks assigned to this host // networks is a combination of specific vpn addresses (not prefixes!) and full unsafe networks assigned to this host.
networks *bart.Lite networks *bart.Table[NetworkType]
relayState RelayState relayState RelayState
// HandshakePacket records the packets used to create this hostinfo // HandshakePacket records the packets used to create this hostinfo
@@ -730,20 +740,27 @@ func (i *HostInfo) SetRemoteIfPreferred(hm *HostMap, newRemote netip.AddrPort) b
return false return false
} }
func (i *HostInfo) buildNetworks(networks, unsafeNetworks []netip.Prefix) { func (i *HostInfo) buildNetworks(myVpnNetworksTable *bart.Lite, networks, unsafeNetworks []netip.Prefix) {
if len(networks) == 1 && len(unsafeNetworks) == 0 { if len(networks) == 1 && len(unsafeNetworks) == 0 {
// Simple case, no CIDRTree needed if myVpnNetworksTable.Contains(networks[0].Addr()) {
return return // Simple case, no CIDRTree needed
}
} }
i.networks = new(bart.Lite) i.networks = new(bart.Table[NetworkType])
for _, network := range networks { for _, network := range networks {
var nwType NetworkType
if myVpnNetworksTable.Contains(network.Addr()) {
nwType = NetworkTypeVPN
} else {
nwType = NetworkTypeVPNPeer
}
nprefix := netip.PrefixFrom(network.Addr(), network.Addr().BitLen()) nprefix := netip.PrefixFrom(network.Addr(), network.Addr().BitLen())
i.networks.Insert(nprefix) i.networks.Insert(nprefix, nwType)
} }
for _, network := range unsafeNetworks { for _, network := range unsafeNetworks {
i.networks.Insert(network) i.networks.Insert(network, NetworkTypeUnsafe)
} }
} }

View File

@@ -5,11 +5,9 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"math/bits"
"net/netip" "net/netip"
"os" "os"
"runtime" "runtime"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -23,12 +21,7 @@ import (
"github.com/slackhq/nebula/udp" "github.com/slackhq/nebula/udp"
) )
const ( const mtu = 9001
mtu = 9001
tunReadBufferSize = mtu * 8
defaultDecryptWorkerFactor = 2
defaultInboundQueueDepth = 1024
)
type InterfaceConfig struct { type InterfaceConfig struct {
HostMap *HostMap HostMap *HostMap
@@ -55,8 +48,6 @@ type InterfaceConfig struct {
ConntrackCacheTimeout time.Duration ConntrackCacheTimeout time.Duration
l *logrus.Logger l *logrus.Logger
DecryptWorkers int
DecryptQueueDepth int
} }
type Interface struct { type Interface struct {
@@ -101,167 +92,7 @@ type Interface struct {
messageMetrics *MessageMetrics messageMetrics *MessageMetrics
cachedPacketMetrics *cachedPacketMetrics cachedPacketMetrics *cachedPacketMetrics
l *logrus.Logger l *logrus.Logger
ctx context.Context
udpListenWG sync.WaitGroup
inboundPool sync.Pool
decryptWG sync.WaitGroup
decryptQueues []*inboundRing
decryptWorkers int
decryptStates []decryptWorkerState
decryptCounter atomic.Uint32
}
type inboundPacket struct {
addr netip.AddrPort
payload []byte
release func()
queue int
}
type decryptWorkerState struct {
queue *inboundRing
notify chan struct{}
}
type decryptContext struct {
ctTicker *firewall.ConntrackCacheTicker
plain []byte
head header.H
fwPacket firewall.Packet
light *LightHouseHandler
nebula []byte
}
type inboundCell struct {
seq atomic.Uint64
pkt *inboundPacket
}
type inboundRing struct {
mask uint64
cells []inboundCell
enqueuePos atomic.Uint64
dequeuePos atomic.Uint64
}
func newInboundRing(capacity int) *inboundRing {
if capacity < 2 {
capacity = 2
}
size := nextPowerOfTwo(uint32(capacity))
if size < 2 {
size = 2
}
ring := &inboundRing{
mask: uint64(size - 1),
cells: make([]inboundCell, size),
}
for i := range ring.cells {
ring.cells[i].seq.Store(uint64(i))
}
return ring
}
func nextPowerOfTwo(v uint32) uint32 {
if v == 0 {
return 1
}
return 1 << (32 - bits.LeadingZeros32(v-1))
}
func (r *inboundRing) Enqueue(pkt *inboundPacket) bool {
var cell *inboundCell
pos := r.enqueuePos.Load()
for {
cell = &r.cells[pos&r.mask]
seq := cell.seq.Load()
diff := int64(seq) - int64(pos)
if diff == 0 {
if r.enqueuePos.CompareAndSwap(pos, pos+1) {
break
}
} else if diff < 0 {
return false
} else {
pos = r.enqueuePos.Load()
}
}
cell.pkt = pkt
cell.seq.Store(pos + 1)
return true
}
func (r *inboundRing) Dequeue() (*inboundPacket, bool) {
var cell *inboundCell
pos := r.dequeuePos.Load()
for {
cell = &r.cells[pos&r.mask]
seq := cell.seq.Load()
diff := int64(seq) - int64(pos+1)
if diff == 0 {
if r.dequeuePos.CompareAndSwap(pos, pos+1) {
break
}
} else if diff < 0 {
return nil, false
} else {
pos = r.dequeuePos.Load()
}
}
pkt := cell.pkt
cell.pkt = nil
cell.seq.Store(pos + r.mask + 1)
return pkt, true
}
func (f *Interface) getInboundPacket() *inboundPacket {
if pkt, ok := f.inboundPool.Get().(*inboundPacket); ok && pkt != nil {
return pkt
}
return &inboundPacket{}
}
func (f *Interface) putInboundPacket(pkt *inboundPacket) {
if pkt == nil {
return
}
pkt.addr = netip.AddrPort{}
pkt.payload = nil
pkt.release = nil
pkt.queue = 0
f.inboundPool.Put(pkt)
}
func newDecryptContext(f *Interface) *decryptContext {
return &decryptContext{
ctTicker: firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout),
plain: make([]byte, udp.MTU),
head: header.H{},
fwPacket: firewall.Packet{},
light: f.lightHouse.NewRequestHandler(),
nebula: make([]byte, 12, 12),
}
}
func (f *Interface) processInboundPacket(pkt *inboundPacket, ctx *decryptContext) {
if pkt == nil {
return
}
defer func() {
if pkt.release != nil {
pkt.release()
}
f.putInboundPacket(pkt)
}()
ctx.head = header.H{}
ctx.fwPacket = firewall.Packet{}
var cache firewall.ConntrackCache
if ctx.ctTicker != nil {
cache = ctx.ctTicker.Get(f.l)
}
f.readOutsidePackets(pkt.addr, nil, ctx.plain[:0], pkt.payload, &ctx.head, &ctx.fwPacket, ctx.light, ctx.nebula, pkt.queue, cache)
} }
type EncWriter interface { type EncWriter interface {
@@ -331,35 +162,6 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
} }
cs := c.pki.getCertState() cs := c.pki.getCertState()
decryptWorkers := c.DecryptWorkers
if decryptWorkers < 0 {
decryptWorkers = 0
}
if decryptWorkers == 0 {
decryptWorkers = c.routines * defaultDecryptWorkerFactor
if decryptWorkers < c.routines {
decryptWorkers = c.routines
}
}
if decryptWorkers < 0 {
decryptWorkers = 0
}
if runtime.GOOS != "linux" {
decryptWorkers = 0
}
queueDepth := c.DecryptQueueDepth
if queueDepth <= 0 {
queueDepth = defaultInboundQueueDepth
}
minDepth := c.routines * 64
if minDepth <= 0 {
minDepth = 64
}
if queueDepth < minDepth {
queueDepth = minDepth
}
ifce := &Interface{ ifce := &Interface{
pki: c.pki, pki: c.pki,
hostMap: c.HostMap, hostMap: c.HostMap,
@@ -392,10 +194,7 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
dropped: metrics.GetOrRegisterCounter("hostinfo.cached_packets.dropped", nil), dropped: metrics.GetOrRegisterCounter("hostinfo.cached_packets.dropped", nil),
}, },
l: c.l, l: c.l,
ctx: ctx,
inboundPool: sync.Pool{New: func() any { return &inboundPacket{} }},
decryptWorkers: decryptWorkers,
} }
ifce.tryPromoteEvery.Store(c.tryPromoteEvery) ifce.tryPromoteEvery.Store(c.tryPromoteEvery)
@@ -404,19 +203,6 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
ifce.connectionManager.intf = ifce ifce.connectionManager.intf = ifce
if decryptWorkers > 0 {
ifce.decryptQueues = make([]*inboundRing, decryptWorkers)
ifce.decryptStates = make([]decryptWorkerState, decryptWorkers)
for i := 0; i < decryptWorkers; i++ {
queue := newInboundRing(queueDepth)
ifce.decryptQueues[i] = queue
ifce.decryptStates[i] = decryptWorkerState{
queue: queue,
notify: make(chan struct{}, 1),
}
}
}
return ifce, nil return ifce, nil
} }
@@ -456,68 +242,8 @@ func (f *Interface) activate() {
} }
} }
func (f *Interface) startDecryptWorkers() {
if f.decryptWorkers <= 0 || len(f.decryptQueues) == 0 {
return
}
f.decryptWG.Add(f.decryptWorkers)
for i := 0; i < f.decryptWorkers; i++ {
go f.decryptWorker(i)
}
}
func (f *Interface) decryptWorker(id int) {
defer f.decryptWG.Done()
if id < 0 || id >= len(f.decryptStates) {
return
}
state := f.decryptStates[id]
if state.queue == nil {
return
}
ctx := newDecryptContext(f)
for {
for {
pkt, ok := state.queue.Dequeue()
if !ok {
break
}
f.processInboundPacket(pkt, ctx)
}
if f.closed.Load() || f.ctx.Err() != nil {
for {
pkt, ok := state.queue.Dequeue()
if !ok {
return
}
f.processInboundPacket(pkt, ctx)
}
}
select {
case <-f.ctx.Done():
case <-state.notify:
}
}
}
func (f *Interface) notifyDecryptWorker(idx int) {
if idx < 0 || idx >= len(f.decryptStates) {
return
}
state := f.decryptStates[idx]
if state.notify == nil {
return
}
select {
case state.notify <- struct{}{}:
default:
}
}
func (f *Interface) run() { func (f *Interface) run() {
f.startDecryptWorkers()
// Launch n queues to read packets from udp // Launch n queues to read packets from udp
f.udpListenWG.Add(f.routines)
for i := 0; i < f.routines; i++ { for i := 0; i < f.routines; i++ {
go f.listenOut(i) go f.listenOut(i)
} }
@@ -530,7 +256,6 @@ func (f *Interface) run() {
func (f *Interface) listenOut(i int) { func (f *Interface) listenOut(i int) {
runtime.LockOSThread() runtime.LockOSThread()
defer f.udpListenWG.Done()
var li udp.Conn var li udp.Conn
if i > 0 { if i > 0 {
@@ -539,78 +264,23 @@ func (f *Interface) listenOut(i int) {
li = f.outside li = f.outside
} }
useWorkers := f.decryptWorkers > 0 && len(f.decryptQueues) > 0 ctCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
var ( lhh := f.lightHouse.NewRequestHandler()
inlineTicker *firewall.ConntrackCacheTicker plaintext := make([]byte, udp.MTU)
inlineHandler *LightHouseHandler h := &header.H{}
inlinePlain []byte fwPacket := &firewall.Packet{}
inlineHeader header.H nb := make([]byte, 12, 12)
inlinePacket firewall.Packet
inlineNB []byte
inlineCtx *decryptContext
)
if useWorkers { li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) {
inlineCtx = newDecryptContext(f) f.readOutsidePackets(fromUdpAddr, nil, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l))
} else {
inlineTicker = firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
inlineHandler = f.lightHouse.NewRequestHandler()
inlinePlain = make([]byte, udp.MTU)
inlineNB = make([]byte, 12, 12)
}
li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte, release func()) {
if !useWorkers {
if release != nil {
defer release()
}
select {
case <-f.ctx.Done():
return
default:
}
inlineHeader = header.H{}
inlinePacket = firewall.Packet{}
var cache firewall.ConntrackCache
if inlineTicker != nil {
cache = inlineTicker.Get(f.l)
}
f.readOutsidePackets(fromUdpAddr, nil, inlinePlain[:0], payload, &inlineHeader, &inlinePacket, inlineHandler, inlineNB, i, cache)
return
}
if f.ctx.Err() != nil {
if release != nil {
release()
}
return
}
pkt := f.getInboundPacket()
pkt.addr = fromUdpAddr
pkt.payload = payload
pkt.release = release
pkt.queue = i
queueCount := len(f.decryptQueues)
if queueCount == 0 {
f.processInboundPacket(pkt, inlineCtx)
return
}
w := int(f.decryptCounter.Add(1)-1) % queueCount
if w < 0 || w >= queueCount || !f.decryptQueues[w].Enqueue(pkt) {
f.processInboundPacket(pkt, inlineCtx)
return
}
f.notifyDecryptWorker(w)
}) })
} }
func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) { func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
runtime.LockOSThread() runtime.LockOSThread()
packet := make([]byte, tunReadBufferSize) packet := make([]byte, mtu)
out := make([]byte, tunReadBufferSize) out := make([]byte, mtu)
fwPacket := &firewall.Packet{} fwPacket := &firewall.Packet{}
nb := make([]byte, 12, 12) nb := make([]byte, 12, 12)
@@ -788,19 +458,6 @@ func (f *Interface) Close() error {
} }
} }
f.udpListenWG.Wait()
if f.decryptWorkers > 0 {
for _, state := range f.decryptStates {
if state.notify != nil {
select {
case state.notify <- struct{}{}:
default:
}
}
}
f.decryptWG.Wait()
}
// Release the tun device // Release the tun device
return f.inside.Close() return f.inside.Close()
} }

View File

@@ -1017,17 +1017,17 @@ func (lhh *LightHouseHandler) resetMeta() *NebulaMeta {
return lhh.meta return lhh.meta
} }
func (lhh *LightHouseHandler) HandleRequest(rAddr netip.AddrPort, fromVpnAddrs []netip.Addr, p []byte, w EncWriter) { func (lhh *LightHouseHandler) HandleRequest(rAddr netip.AddrPort, hostinfo *HostInfo, p []byte, w EncWriter) {
n := lhh.resetMeta() n := lhh.resetMeta()
err := n.Unmarshal(p) err := n.Unmarshal(p)
if err != nil { if err != nil {
lhh.l.WithError(err).WithField("vpnAddrs", fromVpnAddrs).WithField("udpAddr", rAddr). lhh.l.WithError(err).WithField("vpnAddrs", hostinfo.vpnAddrs).WithField("udpAddr", rAddr).
Error("Failed to unmarshal lighthouse packet") Error("Failed to unmarshal lighthouse packet")
return return
} }
if n.Details == nil { if n.Details == nil {
lhh.l.WithField("vpnAddrs", fromVpnAddrs).WithField("udpAddr", rAddr). lhh.l.WithField("vpnAddrs", hostinfo.vpnAddrs).WithField("udpAddr", rAddr).
Error("Invalid lighthouse update") Error("Invalid lighthouse update")
return return
} }
@@ -1036,24 +1036,24 @@ func (lhh *LightHouseHandler) HandleRequest(rAddr netip.AddrPort, fromVpnAddrs [
switch n.Type { switch n.Type {
case NebulaMeta_HostQuery: case NebulaMeta_HostQuery:
lhh.handleHostQuery(n, fromVpnAddrs, rAddr, w) lhh.handleHostQuery(n, hostinfo, rAddr, w)
case NebulaMeta_HostQueryReply: case NebulaMeta_HostQueryReply:
lhh.handleHostQueryReply(n, fromVpnAddrs) lhh.handleHostQueryReply(n, hostinfo.vpnAddrs)
case NebulaMeta_HostUpdateNotification: case NebulaMeta_HostUpdateNotification:
lhh.handleHostUpdateNotification(n, fromVpnAddrs, w) lhh.handleHostUpdateNotification(n, hostinfo, w)
case NebulaMeta_HostMovedNotification: case NebulaMeta_HostMovedNotification:
case NebulaMeta_HostPunchNotification: case NebulaMeta_HostPunchNotification:
lhh.handleHostPunchNotification(n, fromVpnAddrs, w) lhh.handleHostPunchNotification(n, hostinfo.vpnAddrs, w)
case NebulaMeta_HostUpdateNotificationAck: case NebulaMeta_HostUpdateNotificationAck:
// noop // noop
} }
} }
func (lhh *LightHouseHandler) handleHostQuery(n *NebulaMeta, fromVpnAddrs []netip.Addr, addr netip.AddrPort, w EncWriter) { func (lhh *LightHouseHandler) handleHostQuery(n *NebulaMeta, hostinfo *HostInfo, addr netip.AddrPort, w EncWriter) {
// Exit if we don't answer queries // Exit if we don't answer queries
if !lhh.lh.amLighthouse { if !lhh.lh.amLighthouse {
if lhh.l.Level >= logrus.DebugLevel { if lhh.l.Level >= logrus.DebugLevel {
@@ -1065,7 +1065,7 @@ func (lhh *LightHouseHandler) handleHostQuery(n *NebulaMeta, fromVpnAddrs []neti
queryVpnAddr, useVersion, err := n.Details.GetVpnAddrAndVersion() queryVpnAddr, useVersion, err := n.Details.GetVpnAddrAndVersion()
if err != nil { if err != nil {
if lhh.l.Level >= logrus.DebugLevel { if lhh.l.Level >= logrus.DebugLevel {
lhh.l.WithField("from", fromVpnAddrs).WithField("details", n.Details). lhh.l.WithField("from", hostinfo.vpnAddrs).WithField("details", n.Details).
Debugln("Dropping malformed HostQuery") Debugln("Dropping malformed HostQuery")
} }
return return
@@ -1073,7 +1073,7 @@ func (lhh *LightHouseHandler) handleHostQuery(n *NebulaMeta, fromVpnAddrs []neti
if useVersion == cert.Version1 && queryVpnAddr.Is6() { if useVersion == cert.Version1 && queryVpnAddr.Is6() {
// this case really shouldn't be possible to represent, but reject it anyway. // this case really shouldn't be possible to represent, but reject it anyway.
if lhh.l.Level >= logrus.DebugLevel { if lhh.l.Level >= logrus.DebugLevel {
lhh.l.WithField("vpnAddrs", fromVpnAddrs).WithField("queryVpnAddr", queryVpnAddr). lhh.l.WithField("vpnAddrs", hostinfo.vpnAddrs).WithField("queryVpnAddr", queryVpnAddr).
Debugln("invalid vpn addr for v1 handleHostQuery") Debugln("invalid vpn addr for v1 handleHostQuery")
} }
return return
@@ -1099,14 +1099,14 @@ func (lhh *LightHouseHandler) handleHostQuery(n *NebulaMeta, fromVpnAddrs []neti
} }
if err != nil { if err != nil {
lhh.l.WithError(err).WithField("vpnAddrs", fromVpnAddrs).Error("Failed to marshal lighthouse host query reply") lhh.l.WithError(err).WithField("vpnAddrs", hostinfo.vpnAddrs).Error("Failed to marshal lighthouse host query reply")
return return
} }
lhh.lh.metricTx(NebulaMeta_HostQueryReply, 1) lhh.lh.metricTx(NebulaMeta_HostQueryReply, 1)
w.SendMessageToVpnAddr(header.LightHouse, 0, fromVpnAddrs[0], lhh.pb[:ln], lhh.nb, lhh.out[:0]) w.SendMessageToHostInfo(header.LightHouse, 0, hostinfo, lhh.pb[:ln], lhh.nb, lhh.out[:0])
lhh.sendHostPunchNotification(n, fromVpnAddrs, queryVpnAddr, w) lhh.sendHostPunchNotification(n, hostinfo.vpnAddrs, queryVpnAddr, w)
} }
// sendHostPunchNotification signals the other side to punch some zero byte udp packets // sendHostPunchNotification signals the other side to punch some zero byte udp packets
@@ -1115,20 +1115,34 @@ func (lhh *LightHouseHandler) sendHostPunchNotification(n *NebulaMeta, fromVpnAd
found, ln, err := lhh.lh.queryAndPrepMessage(whereToPunch, func(c *cache) (int, error) { found, ln, err := lhh.lh.queryAndPrepMessage(whereToPunch, func(c *cache) (int, error) {
n = lhh.resetMeta() n = lhh.resetMeta()
n.Type = NebulaMeta_HostPunchNotification n.Type = NebulaMeta_HostPunchNotification
targetHI := lhh.lh.ifce.GetHostInfo(punchNotifDest) punchNotifDestHI := lhh.lh.ifce.GetHostInfo(punchNotifDest)
var useVersion cert.Version var useVersion cert.Version
if targetHI == nil { if punchNotifDestHI == nil {
useVersion = lhh.lh.ifce.GetCertState().initiatingVersion useVersion = lhh.lh.ifce.GetCertState().initiatingVersion
} else { } else {
crt := targetHI.GetCert().Certificate
useVersion = crt.Version()
// we can only retarget if we have a hostinfo // we can only retarget if we have a hostinfo
newDest, ok := findNetworkUnion(crt.Networks(), fromVpnAddrs) punchNotifDestCrt := punchNotifDestHI.GetCert().Certificate
useVersion = punchNotifDestCrt.Version()
punchNotifDestNetworks := punchNotifDestCrt.Networks()
//if we (the lighthouse) don't have a network in common with punchNotifDest, try to find one
if !lhh.lh.myVpnNetworksTable.Contains(punchNotifDest) {
newPunchNotifDest, ok := findNetworkUnion(lhh.lh.myVpnNetworks, punchNotifDestHI.vpnAddrs)
if ok {
punchNotifDest = newPunchNotifDest
} else {
if lhh.l.Level >= logrus.DebugLevel {
lhh.l.WithField("to", punchNotifDestNetworks).Debugln("unable to notify host to host, no addresses in common")
}
}
}
newWhereToPunch, ok := findNetworkUnion(punchNotifDestNetworks, fromVpnAddrs)
if ok { if ok {
whereToPunch = newDest whereToPunch = newWhereToPunch
} else { } else {
if lhh.l.Level >= logrus.DebugLevel { if lhh.l.Level >= logrus.DebugLevel {
lhh.l.WithField("to", crt.Networks()).Debugln("unable to punch to host, no addresses in common") lhh.l.WithFields(m{"from": fromVpnAddrs, "to": punchNotifDestNetworks}).Debugln("unable to punch to host, no addresses in common with requestor")
} }
} }
} }
@@ -1234,7 +1248,8 @@ func (lhh *LightHouseHandler) handleHostQueryReply(n *NebulaMeta, fromVpnAddrs [
} }
} }
func (lhh *LightHouseHandler) handleHostUpdateNotification(n *NebulaMeta, fromVpnAddrs []netip.Addr, w EncWriter) { func (lhh *LightHouseHandler) handleHostUpdateNotification(n *NebulaMeta, hostinfo *HostInfo, w EncWriter) {
fromVpnAddrs := hostinfo.vpnAddrs
if !lhh.lh.amLighthouse { if !lhh.lh.amLighthouse {
if lhh.l.Level >= logrus.DebugLevel { if lhh.l.Level >= logrus.DebugLevel {
lhh.l.Debugln("I am not a lighthouse, do not take host updates: ", fromVpnAddrs) lhh.l.Debugln("I am not a lighthouse, do not take host updates: ", fromVpnAddrs)
@@ -1302,7 +1317,7 @@ func (lhh *LightHouseHandler) handleHostUpdateNotification(n *NebulaMeta, fromVp
} }
lhh.lh.metricTx(NebulaMeta_HostUpdateNotificationAck, 1) lhh.lh.metricTx(NebulaMeta_HostUpdateNotificationAck, 1)
w.SendMessageToVpnAddr(header.LightHouse, 0, fromVpnAddrs[0], lhh.pb[:ln], lhh.nb, lhh.out[:0]) w.SendMessageToHostInfo(header.LightHouse, 0, hostinfo, lhh.pb[:ln], lhh.nb, lhh.out[:0])
} }
func (lhh *LightHouseHandler) handleHostPunchNotification(n *NebulaMeta, fromVpnAddrs []netip.Addr, w EncWriter) { func (lhh *LightHouseHandler) handleHostPunchNotification(n *NebulaMeta, fromVpnAddrs []netip.Addr, w EncWriter) {

View File

@@ -132,8 +132,13 @@ func BenchmarkLighthouseHandleRequest(b *testing.B) {
) )
mw := &mockEncWriter{} mw := &mockEncWriter{}
hostinfo := &HostInfo{
hi := []netip.Addr{vpnIp2} ConnectionState: &ConnectionState{
eKey: nil,
dKey: nil,
},
vpnAddrs: []netip.Addr{vpnIp2},
}
b.Run("notfound", func(b *testing.B) { b.Run("notfound", func(b *testing.B) {
lhh := lh.NewRequestHandler() lhh := lh.NewRequestHandler()
req := &NebulaMeta{ req := &NebulaMeta{
@@ -146,7 +151,7 @@ func BenchmarkLighthouseHandleRequest(b *testing.B) {
p, err := req.Marshal() p, err := req.Marshal()
require.NoError(b, err) require.NoError(b, err)
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
lhh.HandleRequest(rAddr, hi, p, mw) lhh.HandleRequest(rAddr, hostinfo, p, mw)
} }
}) })
b.Run("found", func(b *testing.B) { b.Run("found", func(b *testing.B) {
@@ -162,7 +167,7 @@ func BenchmarkLighthouseHandleRequest(b *testing.B) {
require.NoError(b, err) require.NoError(b, err)
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
lhh.HandleRequest(rAddr, hi, p, mw) lhh.HandleRequest(rAddr, hostinfo, p, mw)
} }
}) })
} }
@@ -326,7 +331,14 @@ func newLHHostRequest(fromAddr netip.AddrPort, myVpnIp, queryVpnIp netip.Addr, l
w := &testEncWriter{ w := &testEncWriter{
metaFilter: &filter, metaFilter: &filter,
} }
lhh.HandleRequest(fromAddr, []netip.Addr{myVpnIp}, b, w) hostinfo := &HostInfo{
ConnectionState: &ConnectionState{
eKey: nil,
dKey: nil,
},
vpnAddrs: []netip.Addr{myVpnIp},
}
lhh.HandleRequest(fromAddr, hostinfo, b, w)
return w.lastReply return w.lastReply
} }
@@ -355,9 +367,15 @@ func newLHHostUpdate(fromAddr netip.AddrPort, vpnIp netip.Addr, addrs []netip.Ad
if err != nil { if err != nil {
panic(err) panic(err)
} }
hostinfo := &HostInfo{
ConnectionState: &ConnectionState{
eKey: nil,
dKey: nil,
},
vpnAddrs: []netip.Addr{vpnIp},
}
w := &testEncWriter{} w := &testEncWriter{}
lhh.HandleRequest(fromAddr, []netip.Addr{vpnIp}, b, w) lhh.HandleRequest(fromAddr, hostinfo, b, w)
} }
type testLhReply struct { type testLhReply struct {

View File

@@ -120,8 +120,6 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
l.WithField("duration", conntrackCacheTimeout).Info("Using routine-local conntrack cache") l.WithField("duration", conntrackCacheTimeout).Info("Using routine-local conntrack cache")
} }
udp.SetDisableUDPCsum(c.GetBool("listen.disable_udp_checksum", false))
var tun overlay.Device var tun overlay.Device
if !configTest { if !configTest {
c.CatchHUP(ctx) c.CatchHUP(ctx)
@@ -223,9 +221,6 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
} }
} }
decryptWorkers := c.GetInt("listen.decrypt_workers", 0)
decryptQueueDepth := c.GetInt("listen.decrypt_queue_depth", 0)
ifConfig := &InterfaceConfig{ ifConfig := &InterfaceConfig{
HostMap: hostMap, HostMap: hostMap,
Inside: tun, Inside: tun,
@@ -248,8 +243,6 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
punchy: punchy, punchy: punchy,
ConntrackCacheTimeout: conntrackCacheTimeout, ConntrackCacheTimeout: conntrackCacheTimeout,
l: l, l: l,
DecryptWorkers: decryptWorkers,
DecryptQueueDepth: decryptQueueDepth,
} }
var ifce *Interface var ifce *Interface

View File

@@ -138,7 +138,7 @@ func (f *Interface) readOutsidePackets(ip netip.AddrPort, via *ViaSender, out []
return return
} }
lhf.HandleRequest(ip, hostinfo.vpnAddrs, d, f) lhf.HandleRequest(ip, hostinfo, d, f)
// Fallthrough to the bottom to record incoming traffic // Fallthrough to the bottom to record incoming traffic
@@ -470,13 +470,7 @@ func (f *Interface) decryptToTun(hostinfo *HostInfo, messageCounter uint64, out
out, err = hostinfo.ConnectionState.dKey.DecryptDanger(out, packet[:header.Len], packet[header.Len:], messageCounter, nb) out, err = hostinfo.ConnectionState.dKey.DecryptDanger(out, packet[:header.Len], packet[header.Len:], messageCounter, nb)
if err != nil { if err != nil {
hostinfo.logger(f.l). hostinfo.logger(f.l).WithError(err).Error("Failed to decrypt packet")
WithError(err).
WithField("tag", "decrypt-debug").
WithField("remoteIndexLocal", hostinfo.localIndexId).
WithField("messageCounter", messageCounter).
WithField("packet_len", len(packet)).
Error("Failed to decrypt packet")
return false return false
} }

View File

@@ -25,17 +25,14 @@ import (
type tun struct { type tun struct {
io.ReadWriteCloser io.ReadWriteCloser
fd int fd int
Device string Device string
vpnNetworks []netip.Prefix vpnNetworks []netip.Prefix
MaxMTU int MaxMTU int
DefaultMTU int DefaultMTU int
TXQueueLen int TXQueueLen int
deviceIndex int deviceIndex int
ioctlFd uintptr ioctlFd uintptr
enableVnetHdr bool
vnetHdrLen int
queues []*tunQueue
Routes atomic.Pointer[[]Route] Routes atomic.Pointer[[]Route]
routeTree atomic.Pointer[bart.Table[routing.Gateways]] routeTree atomic.Pointer[bart.Table[routing.Gateways]]
@@ -68,90 +65,10 @@ type ifreqQLEN struct {
pad [8]byte pad [8]byte
} }
const (
virtioNetHdrLen = 12
tunDefaultMaxPacket = 65536
)
type tunQueue struct {
file *os.File
fd int
enableVnetHdr bool
vnetHdrLen int
maxPacket int
writeScratch []byte
readScratch []byte
l *logrus.Logger
}
func newTunQueue(file *os.File, enableVnetHdr bool, vnetHdrLen, maxPacket int, l *logrus.Logger) *tunQueue {
if maxPacket <= 0 {
maxPacket = tunDefaultMaxPacket
}
q := &tunQueue{
file: file,
fd: int(file.Fd()),
enableVnetHdr: enableVnetHdr,
vnetHdrLen: vnetHdrLen,
maxPacket: maxPacket,
l: l,
}
if enableVnetHdr {
q.growReadScratch(maxPacket)
}
return q
}
func (q *tunQueue) growReadScratch(packetSize int) {
needed := q.vnetHdrLen + packetSize
if needed < q.vnetHdrLen+DefaultMTU {
needed = q.vnetHdrLen + DefaultMTU
}
if q.readScratch == nil || cap(q.readScratch) < needed {
q.readScratch = make([]byte, needed)
} else {
q.readScratch = q.readScratch[:needed]
}
}
func (q *tunQueue) setMaxPacket(packet int) {
if packet <= 0 {
packet = DefaultMTU
}
q.maxPacket = packet
if q.enableVnetHdr {
q.growReadScratch(packet)
}
}
func configureVnetHdr(fd int, hdrLen int, l *logrus.Logger) error {
features, err := unix.IoctlGetInt(fd, unix.TUNGETFEATURES)
if err == nil && features&unix.IFF_VNET_HDR == 0 {
return fmt.Errorf("kernel does not support IFF_VNET_HDR")
}
if err := unix.IoctlSetInt(fd, unix.TUNSETVNETHDRSZ, hdrLen); err != nil {
return err
}
offload := unix.TUN_F_CSUM | unix.TUN_F_UFO
if err := unix.IoctlSetInt(fd, unix.TUNSETOFFLOAD, offload); err != nil {
if l != nil {
l.WithError(err).Warn("Failed to enable TUN offload features")
}
}
return nil
}
func newTunFromFd(c *config.C, l *logrus.Logger, deviceFd int, vpnNetworks []netip.Prefix) (*tun, error) { func newTunFromFd(c *config.C, l *logrus.Logger, deviceFd int, vpnNetworks []netip.Prefix) (*tun, error) {
file := os.NewFile(uintptr(deviceFd), "/dev/net/tun") file := os.NewFile(uintptr(deviceFd), "/dev/net/tun")
enableVnetHdr := c.GetBool("tun.enable_vnet_hdr", false)
if enableVnetHdr {
if err := configureVnetHdr(deviceFd, virtioNetHdrLen, l); err != nil {
l.WithError(err).Warn("Failed to configure VNET header support on provided tun fd; disabling")
enableVnetHdr = false
}
}
t, err := newTunGeneric(c, l, file, vpnNetworks, enableVnetHdr) t, err := newTunGeneric(c, l, file, vpnNetworks)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -189,25 +106,14 @@ func newTun(c *config.C, l *logrus.Logger, vpnNetworks []netip.Prefix, multiqueu
if multiqueue { if multiqueue {
req.Flags |= unix.IFF_MULTI_QUEUE req.Flags |= unix.IFF_MULTI_QUEUE
} }
enableVnetHdr := c.GetBool("tun.enable_vnet_hdr", false)
if enableVnetHdr {
req.Flags |= unix.IFF_VNET_HDR
}
copy(req.Name[:], c.GetString("tun.dev", "")) copy(req.Name[:], c.GetString("tun.dev", ""))
if err = ioctl(uintptr(fd), uintptr(unix.TUNSETIFF), uintptr(unsafe.Pointer(&req))); err != nil { if err = ioctl(uintptr(fd), uintptr(unix.TUNSETIFF), uintptr(unsafe.Pointer(&req))); err != nil {
return nil, err return nil, err
} }
name := strings.Trim(string(req.Name[:]), "\x00") name := strings.Trim(string(req.Name[:]), "\x00")
if enableVnetHdr {
if err := configureVnetHdr(fd, virtioNetHdrLen, l); err != nil {
l.WithError(err).Warn("Failed to configure VNET header support on tun device; disabling")
enableVnetHdr = false
}
}
file := os.NewFile(uintptr(fd), "/dev/net/tun") file := os.NewFile(uintptr(fd), "/dev/net/tun")
t, err := newTunGeneric(c, l, file, vpnNetworks, enableVnetHdr) t, err := newTunGeneric(c, l, file, vpnNetworks)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -217,30 +123,21 @@ func newTun(c *config.C, l *logrus.Logger, vpnNetworks []netip.Prefix, multiqueu
return t, nil return t, nil
} }
func newTunGeneric(c *config.C, l *logrus.Logger, file *os.File, vpnNetworks []netip.Prefix, enableVnetHdr bool) (*tun, error) { func newTunGeneric(c *config.C, l *logrus.Logger, file *os.File, vpnNetworks []netip.Prefix) (*tun, error) {
queue := newTunQueue(file, enableVnetHdr, virtioNetHdrLen, tunDefaultMaxPacket, l)
t := &tun{ t := &tun{
ReadWriteCloser: queue, ReadWriteCloser: file,
fd: int(file.Fd()), fd: int(file.Fd()),
vpnNetworks: vpnNetworks, vpnNetworks: vpnNetworks,
TXQueueLen: c.GetInt("tun.tx_queue", 500), TXQueueLen: c.GetInt("tun.tx_queue", 500),
useSystemRoutes: c.GetBool("tun.use_system_route_table", false), useSystemRoutes: c.GetBool("tun.use_system_route_table", false),
useSystemRoutesBufferSize: c.GetInt("tun.use_system_route_table_buffer_size", 0), useSystemRoutesBufferSize: c.GetInt("tun.use_system_route_table_buffer_size", 0),
l: l, l: l,
enableVnetHdr: enableVnetHdr,
vnetHdrLen: virtioNetHdrLen,
queues: []*tunQueue{queue},
} }
err := t.reload(c, true) err := t.reload(c, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if enableVnetHdr {
for _, q := range t.queues {
q.setMaxPacket(t.MaxMTU)
}
}
c.RegisterReloadCallback(func(c *config.C) { c.RegisterReloadCallback(func(c *config.C) {
err := t.reload(c, false) err := t.reload(c, false)
@@ -283,11 +180,6 @@ func (t *tun) reload(c *config.C, initial bool) error {
t.MaxMTU = newMaxMTU t.MaxMTU = newMaxMTU
t.DefaultMTU = newDefaultMTU t.DefaultMTU = newDefaultMTU
if t.enableVnetHdr {
for _, q := range t.queues {
q.setMaxPacket(t.MaxMTU)
}
}
// Teach nebula how to handle the routes before establishing them in the system table // Teach nebula how to handle the routes before establishing them in the system table
oldRoutes := t.Routes.Swap(&routes) oldRoutes := t.Routes.Swap(&routes)
@@ -332,87 +224,14 @@ func (t *tun) NewMultiQueueReader() (io.ReadWriteCloser, error) {
var req ifReq var req ifReq
req.Flags = uint16(unix.IFF_TUN | unix.IFF_NO_PI | unix.IFF_MULTI_QUEUE) req.Flags = uint16(unix.IFF_TUN | unix.IFF_NO_PI | unix.IFF_MULTI_QUEUE)
if t.enableVnetHdr {
req.Flags |= unix.IFF_VNET_HDR
}
copy(req.Name[:], t.Device) copy(req.Name[:], t.Device)
if err = ioctl(uintptr(fd), uintptr(unix.TUNSETIFF), uintptr(unsafe.Pointer(&req))); err != nil { if err = ioctl(uintptr(fd), uintptr(unix.TUNSETIFF), uintptr(unsafe.Pointer(&req))); err != nil {
return nil, err return nil, err
} }
file := os.NewFile(uintptr(fd), "/dev/net/tun") file := os.NewFile(uintptr(fd), "/dev/net/tun")
queue := newTunQueue(file, t.enableVnetHdr, t.vnetHdrLen, t.MaxMTU, t.l)
if t.enableVnetHdr {
if err := configureVnetHdr(fd, t.vnetHdrLen, t.l); err != nil {
queue.enableVnetHdr = false
}
}
t.queues = append(t.queues, queue)
return queue, nil return file, nil
}
func (q *tunQueue) Read(p []byte) (int, error) {
if !q.enableVnetHdr {
return q.file.Read(p)
}
if len(p)+q.vnetHdrLen > cap(q.readScratch) {
q.growReadScratch(len(p))
}
buf := q.readScratch[:cap(q.readScratch)]
n, err := q.file.Read(buf)
if n <= 0 {
return n, err
}
if n < q.vnetHdrLen {
if err == nil {
err = io.ErrUnexpectedEOF
}
return 0, err
}
payload := buf[q.vnetHdrLen:n]
if len(payload) > len(p) {
copy(p, payload[:len(p)])
if err == nil {
err = io.ErrShortBuffer
}
return len(p), err
}
copy(p, payload)
return len(payload), err
}
func (q *tunQueue) Write(b []byte) (int, error) {
if !q.enableVnetHdr {
return unix.Write(q.fd, b)
}
total := q.vnetHdrLen + len(b)
if cap(q.writeScratch) < total {
q.writeScratch = make([]byte, total)
} else {
q.writeScratch = q.writeScratch[:total]
}
for i := 0; i < q.vnetHdrLen; i++ {
q.writeScratch[i] = 0
}
copy(q.writeScratch[q.vnetHdrLen:], b)
n, err := unix.Write(q.fd, q.writeScratch)
if n >= q.vnetHdrLen {
n -= q.vnetHdrLen
} else {
n = 0
}
return n, err
}
func (q *tunQueue) Close() error {
return q.file.Close()
} }
func (t *tun) RoutesFor(ip netip.Addr) routing.Gateways { func (t *tun) RoutesFor(ip netip.Addr) routing.Gateways {

View File

@@ -1,16 +0,0 @@
package udp
import "sync/atomic"
var disableUDPCsum atomic.Bool
// SetDisableUDPCsum controls whether IPv4 UDP sockets opt out of kernel
// checksum calculation via SO_NO_CHECK. Only applicable on platforms that
// support the option (Linux). IPv6 always keeps the checksum enabled.
func SetDisableUDPCsum(disable bool) {
disableUDPCsum.Store(disable)
}
func udpChecksumDisabled() bool {
return disableUDPCsum.Load()
}

View File

@@ -11,7 +11,6 @@ const MTU = 9001
type EncReader func( type EncReader func(
addr netip.AddrPort, addr netip.AddrPort,
payload []byte, payload []byte,
release func(),
) )
type Conn interface { type Conn interface {

View File

@@ -1,25 +0,0 @@
//go:build linux && (386 || amd64p32 || arm || mips || mipsle) && !android && !e2e_testing
// +build linux
// +build 386 amd64p32 arm mips mipsle
// +build !android
// +build !e2e_testing
package udp
import "golang.org/x/sys/unix"
func controllen(n int) uint32 {
return uint32(n)
}
func setCmsgLen(h *unix.Cmsghdr, n int) {
h.Len = uint32(unix.CmsgLen(n))
}
func setIovecLen(v *unix.Iovec, n int) {
v.Len = uint32(n)
}
func setMsghdrIovlen(m *unix.Msghdr, n int) {
m.Iovlen = uint32(n)
}

View File

@@ -1,25 +0,0 @@
//go:build linux && (amd64 || arm64 || ppc64 || ppc64le || mips64 || mips64le || s390x || riscv64 || loong64) && !android && !e2e_testing
// +build linux
// +build amd64 arm64 ppc64 ppc64le mips64 mips64le s390x riscv64 loong64
// +build !android
// +build !e2e_testing
package udp
import "golang.org/x/sys/unix"
func controllen(n int) uint64 {
return uint64(n)
}
func setCmsgLen(h *unix.Cmsghdr, n int) {
h.Len = uint64(unix.CmsgLen(n))
}
func setIovecLen(v *unix.Iovec, n int) {
v.Len = uint64(n)
}
func setMsghdrIovlen(m *unix.Msghdr, n int) {
m.Iovlen = uint64(n)
}

View File

@@ -1,25 +0,0 @@
//go:build linux && (386 || amd64p32 || arm || mips || mipsle) && !android && !e2e_testing
package udp
import (
"unsafe"
"golang.org/x/sys/unix"
)
type linuxMmsgHdr struct {
Hdr unix.Msghdr
Len uint32
}
func sendmmsg(fd int, hdrs []linuxMmsgHdr, flags int) (int, error) {
if len(hdrs) == 0 {
return 0, nil
}
n, _, errno := unix.Syscall6(unix.SYS_SENDMMSG, uintptr(fd), uintptr(unsafe.Pointer(&hdrs[0])), uintptr(len(hdrs)), uintptr(flags), 0, 0)
if errno != 0 {
return int(n), errno
}
return int(n), nil
}

View File

@@ -1,26 +0,0 @@
//go:build linux && (amd64 || arm64 || ppc64 || ppc64le || mips64 || mips64le || s390x || riscv64 || loong64) && !android && !e2e_testing
package udp
import (
"unsafe"
"golang.org/x/sys/unix"
)
type linuxMmsgHdr struct {
Hdr unix.Msghdr
Len uint32
_ uint32
}
func sendmmsg(fd int, hdrs []linuxMmsgHdr, flags int) (int, error) {
if len(hdrs) == 0 {
return 0, nil
}
n, _, errno := unix.Syscall6(unix.SYS_SENDMMSG, uintptr(fd), uintptr(unsafe.Pointer(&hdrs[0])), uintptr(len(hdrs)), uintptr(flags), 0, 0)
if errno != 0 {
return int(n), errno
}
return int(n), nil
}

View File

@@ -180,7 +180,7 @@ func (u *StdConn) ListenOut(r EncReader) {
u.l.WithError(err).Error("unexpected udp socket receive error") u.l.WithError(err).Error("unexpected udp socket receive error")
} }
r(netip.AddrPortFrom(rua.Addr().Unmap(), rua.Port()), buffer[:n], nil) r(netip.AddrPortFrom(rua.Addr().Unmap(), rua.Port()), buffer[:n])
} }
} }

View File

@@ -82,6 +82,6 @@ func (u *GenericConn) ListenOut(r EncReader) {
return return
} }
r(netip.AddrPortFrom(rua.Addr().Unmap(), rua.Port()), buffer[:n], nil) r(netip.AddrPortFrom(rua.Addr().Unmap(), rua.Port()), buffer[:n])
} }
} }

File diff suppressed because it is too large Load Diff

View File

@@ -30,29 +30,17 @@ type rawMessage struct {
Len uint32 Len uint32
} }
func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte, [][]byte) { func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
controlLen := int(u.controlLen.Load())
msgs := make([]rawMessage, n) msgs := make([]rawMessage, n)
buffers := make([][]byte, n) buffers := make([][]byte, n)
names := make([][]byte, n) names := make([][]byte, n)
var controls [][]byte
if controlLen > 0 {
controls = make([][]byte, n)
}
for i := range msgs { for i := range msgs {
size := int(u.groBufSize.Load()) buffers[i] = make([]byte, MTU)
if size < MTU {
size = MTU
}
buf := u.borrowRxBuffer(size)
buffers[i] = buf
names[i] = make([]byte, unix.SizeofSockaddrInet6) names[i] = make([]byte, unix.SizeofSockaddrInet6)
vs := []iovec{ vs := []iovec{
{Base: &buf[0], Len: uint32(len(buf))}, {Base: &buffers[i][0], Len: uint32(len(buffers[i]))},
} }
msgs[i].Hdr.Iov = &vs[0] msgs[i].Hdr.Iov = &vs[0]
@@ -60,22 +48,7 @@ func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte, [
msgs[i].Hdr.Name = &names[i][0] msgs[i].Hdr.Name = &names[i][0]
msgs[i].Hdr.Namelen = uint32(len(names[i])) msgs[i].Hdr.Namelen = uint32(len(names[i]))
if controlLen > 0 {
controls[i] = make([]byte, controlLen)
msgs[i].Hdr.Control = &controls[i][0]
msgs[i].Hdr.Controllen = controllen(len(controls[i]))
} else {
msgs[i].Hdr.Control = nil
msgs[i].Hdr.Controllen = controllen(0)
}
} }
return msgs, buffers, names, controls return msgs, buffers, names
}
func setIovecBase(msg *rawMessage, buf []byte) {
iov := (*iovec)(msg.Hdr.Iov)
iov.Base = &buf[0]
iov.Len = uint32(len(buf))
} }

View File

@@ -33,50 +33,25 @@ type rawMessage struct {
Pad0 [4]byte Pad0 [4]byte
} }
func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte, [][]byte) { func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
controlLen := int(u.controlLen.Load())
msgs := make([]rawMessage, n) msgs := make([]rawMessage, n)
buffers := make([][]byte, n) buffers := make([][]byte, n)
names := make([][]byte, n) names := make([][]byte, n)
var controls [][]byte
if controlLen > 0 {
controls = make([][]byte, n)
}
for i := range msgs { for i := range msgs {
size := int(u.groBufSize.Load()) buffers[i] = make([]byte, MTU)
if size < MTU {
size = MTU
}
buf := u.borrowRxBuffer(size)
buffers[i] = buf
names[i] = make([]byte, unix.SizeofSockaddrInet6) names[i] = make([]byte, unix.SizeofSockaddrInet6)
vs := []iovec{{Base: &buf[0], Len: uint64(len(buf))}} vs := []iovec{
{Base: &buffers[i][0], Len: uint64(len(buffers[i]))},
}
msgs[i].Hdr.Iov = &vs[0] msgs[i].Hdr.Iov = &vs[0]
msgs[i].Hdr.Iovlen = uint64(len(vs)) msgs[i].Hdr.Iovlen = uint64(len(vs))
msgs[i].Hdr.Name = &names[i][0] msgs[i].Hdr.Name = &names[i][0]
msgs[i].Hdr.Namelen = uint32(len(names[i])) msgs[i].Hdr.Namelen = uint32(len(names[i]))
if controlLen > 0 {
controls[i] = make([]byte, controlLen)
msgs[i].Hdr.Control = &controls[i][0]
msgs[i].Hdr.Controllen = controllen(len(controls[i]))
} else {
msgs[i].Hdr.Control = nil
msgs[i].Hdr.Controllen = controllen(0)
}
} }
return msgs, buffers, names, controls return msgs, buffers, names
}
func setIovecBase(msg *rawMessage, buf []byte) {
iov := (*iovec)(msg.Hdr.Iov)
iov.Base = &buf[0]
iov.Len = uint64(len(buf))
} }

View File

@@ -149,7 +149,7 @@ func (u *RIOConn) ListenOut(r EncReader) {
continue continue
} }
r(netip.AddrPortFrom(netip.AddrFrom16(rua.Addr).Unmap(), (rua.Port>>8)|((rua.Port&0xff)<<8)), buffer[:n], nil) r(netip.AddrPortFrom(netip.AddrFrom16(rua.Addr).Unmap(), (rua.Port>>8)|((rua.Port&0xff)<<8)), buffer[:n])
} }
} }

View File

@@ -112,7 +112,7 @@ func (u *TesterConn) ListenOut(r EncReader) {
if !ok { if !ok {
return return
} }
r(p.From, p.Data, func() {}) r(p.From, p.Data)
} }
} }