mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-16 04:47:38 +02:00
udp setsockopt correctness fixes
This commit is contained in:
@@ -12,7 +12,7 @@ import (
|
|||||||
// kernel stamps one outer codepoint per entry, so a run that straddled the
|
// kernel stamps one outer codepoint per entry, so a run that straddled the
|
||||||
// boundary would silently lose information).
|
// boundary would silently lose information).
|
||||||
func TestPlanRunBreaksOnECNChange(t *testing.T) {
|
func TestPlanRunBreaksOnECNChange(t *testing.T) {
|
||||||
u := &StdConn{gsoSupported: true}
|
u := &StdConn{gsoSupported: true, maxGSOSegments: 63}
|
||||||
dst := netip.MustParseAddrPort("10.0.0.1:4242")
|
dst := netip.MustParseAddrPort("10.0.0.1:4242")
|
||||||
|
|
||||||
bufs := [][]byte{
|
bufs := [][]byte{
|
||||||
|
|||||||
@@ -6,10 +6,13 @@ package udp
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net"
|
"net"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
@@ -56,6 +59,7 @@ type StdConn struct {
|
|||||||
// destination consecutive packets into a single sendmmsg entry with a
|
// destination consecutive packets into a single sendmmsg entry with a
|
||||||
// UDP_SEGMENT cmsg; otherwise each packet is its own entry.
|
// UDP_SEGMENT cmsg; otherwise each packet is its own entry.
|
||||||
gsoSupported bool
|
gsoSupported bool
|
||||||
|
maxGSOSegments int
|
||||||
|
|
||||||
// UDP GRO (recvmsg with UDP_GRO cmsg) support. groSupported is probed
|
// UDP GRO (recvmsg with UDP_GRO cmsg) support. groSupported is probed
|
||||||
// once at socket creation. When true, listenOutBatch allocates larger
|
// once at socket creation. When true, listenOutBatch allocates larger
|
||||||
@@ -106,6 +110,7 @@ func NewListener(l *slog.Logger, ip netip.Addr, port int, multi bool, batch int)
|
|||||||
rawConn: rawConn,
|
rawConn: rawConn,
|
||||||
l: l,
|
l: l,
|
||||||
batch: batch,
|
batch: batch,
|
||||||
|
maxGSOSegments: 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
af, err := out.getSockOptInt(unix.SO_DOMAIN)
|
af, err := out.getSockOptInt(unix.SO_DOMAIN)
|
||||||
@@ -188,19 +193,6 @@ func (u *StdConn) prepareWriteMessages(n int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// maxGSOSegments caps the per-sendmsg GSO fan-out. Linux kernels have
|
|
||||||
// historically capped UDP_MAX_SEGMENTS at 64; newer kernels raise it to 128.
|
|
||||||
// We stay one below 64 because the kernel's check is
|
|
||||||
//
|
|
||||||
// if (cork->length > cork->gso_size * UDP_MAX_SEGMENTS) return -EINVAL;
|
|
||||||
//
|
|
||||||
// and cork->length includes the 8-byte UDP header (udp_sendmsg passes
|
|
||||||
// ulen = len + sizeof(udphdr) to ip_append_data). Packing exactly 64
|
|
||||||
// same-size segments puts cork->length at gso_size*64 + 8, which is one
|
|
||||||
// UDP-header over the bound and the kernel rejects the whole sendmmsg
|
|
||||||
// with EINVAL. 63 leaves room for the header for any segSize >= 8.
|
|
||||||
const maxGSOSegments = 63
|
|
||||||
|
|
||||||
// maxGSOBytes bounds the total payload per sendmsg() when UDP_SEGMENT is
|
// maxGSOBytes bounds the total payload per sendmsg() when UDP_SEGMENT is
|
||||||
// set. The kernel stitches all iovecs into a single skb whose length the
|
// set. The kernel stitches all iovecs into a single skb whose length the
|
||||||
// UDP length field can represent, and also enforces sk_gso_max_size (which
|
// UDP length field can represent, and also enforces sk_gso_max_size (which
|
||||||
@@ -211,6 +203,8 @@ const maxGSOBytes = 65000
|
|||||||
// prepareGSO probes UDP_SEGMENT support and sets u.gsoSupported on success.
|
// prepareGSO probes UDP_SEGMENT support and sets u.gsoSupported on success.
|
||||||
// Best-effort; failure leaves it false.
|
// Best-effort; failure leaves it false.
|
||||||
func (u *StdConn) prepareGSO() {
|
func (u *StdConn) prepareGSO() {
|
||||||
|
u.maxGSOSegments = 63 //gotta be one less than the max so we can still attach a header
|
||||||
|
|
||||||
var probeErr error
|
var probeErr error
|
||||||
if err := u.rawConn.Control(func(fd uintptr) {
|
if err := u.rawConn.Control(func(fd uintptr) {
|
||||||
probeErr = unix.SetsockoptInt(int(fd), unix.IPPROTO_UDP, unix.UDP_SEGMENT, 0)
|
probeErr = unix.SetsockoptInt(int(fd), unix.IPPROTO_UDP, unix.UDP_SEGMENT, 0)
|
||||||
@@ -224,8 +218,20 @@ func (u *StdConn) prepareGSO() {
|
|||||||
recordCapability("udp.gso.enabled", false)
|
recordCapability("udp.gso.enabled", false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var un unix.Utsname
|
||||||
|
if err := unix.Uname(&un); err != nil {
|
||||||
|
u.l.Info("udp: GSO disabled", "reason", "kernel uname probe failed", "error", err)
|
||||||
|
recordCapability("udp.gso.enabled", false)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
major, minor := parseRelease(string(un.Release[:]))
|
||||||
|
if major > 5 || (major == 5 && minor >= 5) {
|
||||||
|
u.maxGSOSegments = 127
|
||||||
|
}
|
||||||
|
|
||||||
u.gsoSupported = true
|
u.gsoSupported = true
|
||||||
u.l.Info("udp: GSO enabled")
|
u.l.Info("udp: GSO enabled", "maxGSOSegments", u.maxGSOSegments)
|
||||||
recordCapability("udp.gso.enabled", true)
|
recordCapability("udp.gso.enabled", true)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -269,27 +275,42 @@ func (u *StdConn) prepareGRO() {
|
|||||||
// codepoint through the EncReader for RFC 6040 combine on the decap side.
|
// codepoint through the EncReader for RFC 6040 combine on the decap side.
|
||||||
// Best-effort: we keep going on failure.
|
// Best-effort: we keep going on failure.
|
||||||
func (u *StdConn) prepareECNRecv() {
|
func (u *StdConn) prepareECNRecv() {
|
||||||
var probeErr error
|
var v4err, v6err error
|
||||||
if err := u.rawConn.Control(func(fd uintptr) {
|
if err := u.rawConn.Control(func(fd uintptr) {
|
||||||
if u.isV4 {
|
v4err = unix.SetsockoptInt(int(fd), unix.IPPROTO_IP, unix.IP_RECVTOS, 1)
|
||||||
probeErr = unix.SetsockoptInt(int(fd), unix.IPPROTO_IP, unix.IP_RECVTOS, 1)
|
if !u.isV4 {
|
||||||
} else {
|
v6err = unix.SetsockoptInt(int(fd), unix.IPPROTO_IPV6, unix.IPV6_RECVTCLASS, 1)
|
||||||
probeErr = unix.SetsockoptInt(int(fd), unix.IPPROTO_IPV6, unix.IPV6_RECVTCLASS, 1)
|
|
||||||
}
|
}
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
u.l.Info("udp: outer-ECN RX disabled", "reason", "rawconn control failed", "error", err)
|
u.l.Info("udp: outer-ECN RX disabled", "reason", "rawconn control failed", "error", err)
|
||||||
recordCapability("udp.ecn_rx.enabled", false)
|
recordCapability("udp.ecn_rx.enabled", false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if probeErr != nil {
|
if u.isV4 { //only check the V4 attempt
|
||||||
u.l.Info("udp: outer-ECN RX disabled", "reason", "kernel rejected probe", "error", probeErr)
|
if v4err != nil {
|
||||||
|
u.l.Info("udp: outer-ECN RX disabled", "reason", "kernel rejected probe", "error", v4err)
|
||||||
recordCapability("udp.ecn_rx.enabled", false)
|
recordCapability("udp.ecn_rx.enabled", false)
|
||||||
return
|
} else {
|
||||||
}
|
|
||||||
u.ecnRecvSupported = true
|
u.ecnRecvSupported = true
|
||||||
u.l.Info("udp: outer-ECN RX enabled")
|
u.l.Info("udp: outer-ECN RX enabled")
|
||||||
recordCapability("udp.ecn_rx.enabled", true)
|
recordCapability("udp.ecn_rx.enabled", true)
|
||||||
}
|
}
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
if v6err != nil { //no V6 ECN? disable it.
|
||||||
|
u.l.Info("udp: outer-ECN RX disabled", "reason", "kernel rejected probe", "error", errors.Join(v4err, v6err))
|
||||||
|
recordCapability("udp.ecn_rx.enabled", false)
|
||||||
|
return
|
||||||
|
} else if v4err != nil { //no V4, but yes V6? Low level warning. Could be a V6-specific bind.
|
||||||
|
u.l.Debug("udp: outer-ECN RX degraded", "reason", "kernel rejected probe on IPv4", "error", v4err)
|
||||||
|
}
|
||||||
|
// all good
|
||||||
|
u.ecnRecvSupported = true
|
||||||
|
u.l.Info("udp: outer-ECN RX enabled")
|
||||||
|
recordCapability("udp.ecn_rx.enabled", true)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// recordCapability registers (or updates) a boolean gauge for one of the
|
// recordCapability registers (or updates) a boolean gauge for one of the
|
||||||
// kernel-feature probes. Gauges go to 1 when the feature is enabled, 0 when
|
// kernel-feature probes. Gauges go to 1 when the feature is enabled, 0 when
|
||||||
@@ -712,7 +733,7 @@ func (u *StdConn) planRun(bufs [][]byte, addrs []netip.AddrPort, ecns []byte, st
|
|||||||
if ecns != nil {
|
if ecns != nil {
|
||||||
ecn = ecns[start]
|
ecn = ecns[start]
|
||||||
}
|
}
|
||||||
maxLen := maxGSOSegments
|
maxLen := u.maxGSOSegments
|
||||||
if iovBudget < maxLen {
|
if iovBudget < maxLen {
|
||||||
maxLen = iovBudget
|
maxLen = iovBudget
|
||||||
}
|
}
|
||||||
@@ -919,3 +940,22 @@ func NewUDPStatsEmitter(udpConns []Conn) func() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func parseRelease(r string) (major, minor int) {
|
||||||
|
// strip anything after the second dot or any non-digit
|
||||||
|
parts := strings.SplitN(r, ".", 3)
|
||||||
|
if len(parts) < 2 {
|
||||||
|
return 0, 0
|
||||||
|
}
|
||||||
|
major, _ = strconv.Atoi(parts[0])
|
||||||
|
// minor may have trailing junk like "15-generic"
|
||||||
|
mp := parts[1]
|
||||||
|
for i, c := range mp {
|
||||||
|
if c < '0' || c > '9' {
|
||||||
|
mp = mp[:i]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
minor, _ = strconv.Atoi(mp)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user