Compare commits

...

12 Commits

Author SHA1 Message Date
Ryan
a0f8cb2098 works properly 2025-11-05 22:09:06 -05:00
Ryan
d18d1aea67 first 2025-11-05 20:34:02 -05:00
Ryan
f5ff534671 make it work with dnclient 2025-11-05 19:25:32 -05:00
Nate Brown
2ea8a72d5c dunno 2025-10-05 23:23:30 -05:00
Nate Brown
663232e1fc Testing the concept 2025-10-05 23:23:10 -05:00
Nate Brown
2f48529e8b Cleanup and note more work 2025-10-05 23:23:08 -05:00
Nate Brown
f3e1ad64cd Try the timeout 2025-10-05 23:22:29 -05:00
Nate Brown
1d8112a329 Revert "More playing" way too much garbage emitted
This reverts commit fa098c551a.
2025-10-05 23:22:29 -05:00
Nate Brown
31eea0cc94 More playing 2025-10-05 23:22:29 -05:00
Nate Brown
dbba4a4c77 Playing 2025-10-05 23:22:29 -05:00
Nate Brown
194fde45da non-blocking io for linux 2025-10-05 23:22:27 -05:00
Nate Brown
f46b83f2c4 Remove more os.Exit calls and give a more reliable wait for stop function 2025-10-05 23:20:43 -05:00
18 changed files with 904 additions and 87 deletions

View File

@@ -5,6 +5,7 @@ import (
"github.com/sirupsen/logrus"
)
// TODO: Pretty sure this is just all sorts of racy now, we need it to be atomic
type Bits struct {
length uint64
current uint64
@@ -43,7 +44,7 @@ func (b *Bits) Check(l logrus.FieldLogger, i uint64) bool {
}
// Not within the window
l.Debugf("rejected a packet (top) %d %d\n", b.current, i)
l.Error("rejected a packet (top) %d %d\n", b.current, i)
return false
}

View File

@@ -1,8 +1,10 @@
package cert
import (
"encoding/hex"
"encoding/pem"
"fmt"
"time"
"golang.org/x/crypto/ed25519"
)
@@ -138,6 +140,101 @@ func MarshalSigningPrivateKeyToPEM(curve Curve, b []byte) []byte {
}
}
// Backward compatibility functions for older API
func MarshalX25519PublicKey(b []byte) []byte {
return MarshalPublicKeyToPEM(Curve_CURVE25519, b)
}
func MarshalX25519PrivateKey(b []byte) []byte {
return MarshalPrivateKeyToPEM(Curve_CURVE25519, b)
}
func MarshalPublicKey(curve Curve, b []byte) []byte {
return MarshalPublicKeyToPEM(curve, b)
}
func MarshalPrivateKey(curve Curve, b []byte) []byte {
return MarshalPrivateKeyToPEM(curve, b)
}
// NebulaCertificate is a compatibility wrapper for the old API
type NebulaCertificate struct {
Details NebulaCertificateDetails
Signature []byte
cert Certificate
}
// NebulaCertificateDetails is a compatibility wrapper for certificate details
type NebulaCertificateDetails struct {
Name string
NotBefore time.Time
NotAfter time.Time
PublicKey []byte
IsCA bool
Issuer []byte
Curve Curve
}
// UnmarshalNebulaCertificateFromPEM provides backward compatibility with the old API
func UnmarshalNebulaCertificateFromPEM(b []byte) (*NebulaCertificate, []byte, error) {
c, rest, err := UnmarshalCertificateFromPEM(b)
if err != nil {
return nil, rest, err
}
issuerBytes, err := func() ([]byte, error) {
issuer := c.Issuer()
if issuer == "" {
return nil, nil
}
decoded, err := hex.DecodeString(issuer)
if err != nil {
return nil, fmt.Errorf("failed to decode issuer fingerprint: %w", err)
}
return decoded, nil
}()
if err != nil {
return nil, rest, err
}
pubKey := c.PublicKey()
if pubKey != nil {
pubKey = append([]byte(nil), pubKey...)
}
sig := c.Signature()
if sig != nil {
sig = append([]byte(nil), sig...)
}
return &NebulaCertificate{
Details: NebulaCertificateDetails{
Name: c.Name(),
NotBefore: c.NotBefore(),
NotAfter: c.NotAfter(),
PublicKey: pubKey,
IsCA: c.IsCA(),
Issuer: issuerBytes,
Curve: c.Curve(),
},
Signature: sig,
cert: c,
}, rest, nil
}
// IssuerString returns the issuer in hex format for compatibility
func (n *NebulaCertificate) IssuerString() string {
if n.Details.Issuer == nil {
return ""
}
return hex.EncodeToString(n.Details.Issuer)
}
// Certificate returns the underlying certificate (read-only)
func (n *NebulaCertificate) Certificate() Certificate {
return n.cert
}
// UnmarshalPrivateKeyFromPEM will try to unmarshal the first pem block in a byte array, returning any non
// consumed data or an error on failure
func UnmarshalPrivateKeyFromPEM(b []byte) ([]byte, []byte, Curve, error) {

View File

@@ -65,8 +65,16 @@ func main() {
}
if !*configTest {
ctrl.Start()
ctrl.ShutdownBlock()
wait, err := ctrl.Start()
if err != nil {
util.LogWithContextIfNeeded("Error while running", err, l)
os.Exit(1)
}
go ctrl.ShutdownBlock()
wait()
l.Info("Goodbye")
}
os.Exit(0)

View File

@@ -3,6 +3,9 @@ package main
import (
"flag"
"fmt"
"log"
"net/http"
_ "net/http/pprof"
"os"
"github.com/sirupsen/logrus"
@@ -58,10 +61,22 @@ func main() {
os.Exit(1)
}
go func() {
log.Println(http.ListenAndServe("0.0.0.0:6060", nil))
}()
if !*configTest {
ctrl.Start()
wait, err := ctrl.Start()
if err != nil {
util.LogWithContextIfNeeded("Error while running", err, l)
os.Exit(1)
}
go ctrl.ShutdownBlock()
notifyReady(l)
ctrl.ShutdownBlock()
wait()
l.Info("Goodbye")
}
os.Exit(0)

View File

@@ -13,6 +13,8 @@ import (
"github.com/slackhq/nebula/noiseutil"
)
// TODO: In a 5Gbps test, 1024 is not sufficient. With a 1400 MTU this is about 1.4Gbps of window, assuming full packets.
// 4092 should be sufficient for 5Gbps
const ReplayWindow = 1024
type ConnectionState struct {

View File

@@ -2,9 +2,11 @@ package nebula
import (
"context"
"errors"
"net/netip"
"os"
"os/signal"
"sync"
"syscall"
"github.com/sirupsen/logrus"
@@ -13,6 +15,16 @@ import (
"github.com/slackhq/nebula/overlay"
)
type RunState int
const (
Stopped RunState = 0 // The control has yet to be started
Started RunState = 1 // The control has been started
Stopping RunState = 2 // The control is stopping
)
var ErrAlreadyStarted = errors.New("nebula is already started")
// Every interaction here needs to take extra care to copy memory and not return or use arguments "as is" when touching
// core. This means copying IP objects, slices, de-referencing pointers and taking the actual value, etc
@@ -26,6 +38,9 @@ type controlHostLister interface {
}
type Control struct {
stateLock sync.Mutex
state RunState
f *Interface
l *logrus.Logger
ctx context.Context
@@ -49,10 +64,21 @@ type ControlHostInfo struct {
CurrentRelaysThroughMe []netip.Addr `json:"currentRelaysThroughMe"`
}
// Start actually runs nebula, this is a nonblocking call. To block use Control.ShutdownBlock()
func (c *Control) Start() {
// Start actually runs nebula, this is a nonblocking call.
// The returned function can be used to wait for nebula to fully stop.
func (c *Control) Start() (func(), error) {
c.stateLock.Lock()
if c.state != Stopped {
c.stateLock.Unlock()
return nil, ErrAlreadyStarted
}
// Activate the interface
c.f.activate()
err := c.f.activate()
if err != nil {
c.stateLock.Unlock()
return nil, err
}
// Call all the delayed funcs that waited patiently for the interface to be created.
if c.sshStart != nil {
@@ -72,15 +98,33 @@ func (c *Control) Start() {
}
// Start reading packets.
c.f.run()
c.state = Started
c.stateLock.Unlock()
return c.f.run(c.ctx)
}
func (c *Control) State() RunState {
c.stateLock.Lock()
defer c.stateLock.Unlock()
return c.state
}
func (c *Control) Context() context.Context {
return c.ctx
}
// Stop signals nebula to shutdown and close all tunnels, returns after the shutdown is complete
// Stop is a non-blocking call that signals nebula to close all tunnels and shut down
func (c *Control) Stop() {
c.stateLock.Lock()
if c.state != Started {
c.stateLock.Unlock()
// We are stopping or stopped already
return
}
c.state = Stopping
c.stateLock.Unlock()
// Stop the handshakeManager (and other services), to prevent new tunnels from
// being created while we're shutting them all down.
c.cancel()
@@ -89,7 +133,7 @@ func (c *Control) Stop() {
if err := c.f.Close(); err != nil {
c.l.WithError(err).Error("Close interface failed")
}
c.l.Info("Goodbye")
c.state = Stopped
}
// ShutdownBlock will listen for and block on term and interrupt signals, calling Control.Stop() once signalled

View File

@@ -6,8 +6,8 @@ import (
"fmt"
"io"
"net/netip"
"os"
"runtime"
"sync"
"sync/atomic"
"time"
@@ -18,6 +18,7 @@ import (
"github.com/slackhq/nebula/firewall"
"github.com/slackhq/nebula/header"
"github.com/slackhq/nebula/overlay"
"github.com/slackhq/nebula/packet"
"github.com/slackhq/nebula/udp"
)
@@ -87,12 +88,19 @@ type Interface struct {
writers []udp.Conn
readers []io.ReadWriteCloser
wg sync.WaitGroup
metricHandshakes metrics.Histogram
messageMetrics *MessageMetrics
cachedPacketMetrics *cachedPacketMetrics
l *logrus.Logger
inPool sync.Pool
inbound chan *packet.Packet
outPool sync.Pool
outbound chan *[]byte
}
type EncWriter interface {
@@ -194,9 +202,22 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
dropped: metrics.GetOrRegisterCounter("hostinfo.cached_packets.dropped", nil),
},
//TODO: configurable size
inbound: make(chan *packet.Packet, 1028),
outbound: make(chan *[]byte, 1028),
l: c.l,
}
ifce.inPool = sync.Pool{New: func() any {
return packet.New()
}}
ifce.outPool = sync.Pool{New: func() any {
t := make([]byte, mtu)
return &t
}}
ifce.tryPromoteEvery.Store(c.tryPromoteEvery)
ifce.reQueryEvery.Store(c.reQueryEvery)
ifce.reQueryWait.Store(int64(c.reQueryWait))
@@ -209,7 +230,7 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
// activate creates the interface on the host. After the interface is created, any
// other services that want to bind listeners to its IP may do so successfully. However,
// the interface isn't going to process anything until run() is called.
func (f *Interface) activate() {
func (f *Interface) activate() error {
// actually turn on tun dev
addr, err := f.outside.LocalAddr()
@@ -230,33 +251,44 @@ func (f *Interface) activate() {
if i > 0 {
reader, err = f.inside.NewMultiQueueReader()
if err != nil {
f.l.Fatal(err)
return err
}
}
f.readers[i] = reader
}
if err := f.inside.Activate(); err != nil {
if err = f.inside.Activate(); err != nil {
f.inside.Close()
f.l.Fatal(err)
return err
}
return nil
}
func (f *Interface) run() {
// Launch n queues to read packets from udp
func (f *Interface) run(c context.Context) (func(), error) {
for i := 0; i < f.routines; i++ {
// Launch n queues to read packets from udp
f.wg.Add(1)
go f.listenOut(i)
// Launch n queues to read packets from tun dev
f.wg.Add(1)
go f.listenIn(f.readers[i], i)
// Launch n queues to read packets from tun dev
f.wg.Add(1)
go f.workerIn(i, c)
// Launch n queues to read packets from tun dev
f.wg.Add(1)
go f.workerOut(i, c)
}
// Launch n queues to read packets from tun dev
for i := 0; i < f.routines; i++ {
go f.listenIn(f.readers[i], i)
}
return f.wg.Wait, nil
}
func (f *Interface) listenOut(i int) {
runtime.LockOSThread()
var li udp.Conn
if i > 0 {
li = f.writers[i]
@@ -264,41 +296,97 @@ func (f *Interface) listenOut(i int) {
li = f.outside
}
ctCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
lhh := f.lightHouse.NewRequestHandler()
plaintext := make([]byte, udp.MTU)
h := &header.H{}
fwPacket := &firewall.Packet{}
nb := make([]byte, 12, 12)
err := li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) {
p := f.inPool.Get().(*packet.Packet)
//TODO: have the listener store this in the msgs array after a read instead of doing a copy
li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) {
f.readOutsidePackets(fromUdpAddr, nil, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l))
p.Payload = p.Payload[:mtu]
copy(p.Payload, payload)
p.Payload = p.Payload[:len(payload)]
p.Addr = fromUdpAddr
f.inbound <- p
//select {
//case f.inbound <- p:
//default:
// f.l.Error("Dropped packet from inbound channel")
//}
})
if err != nil && !f.closed.Load() {
f.l.WithError(err).Error("Error while reading packet inbound packet, closing")
//TODO: Trigger Control to close
}
f.l.Debugf("underlay reader %v is done", i)
f.wg.Done()
}
func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
runtime.LockOSThread()
packet := make([]byte, mtu)
out := make([]byte, mtu)
fwPacket := &firewall.Packet{}
nb := make([]byte, 12, 12)
conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
for {
n, err := reader.Read(packet)
p := f.outPool.Get().(*[]byte)
*p = (*p)[:mtu]
n, err := reader.Read(*p)
if err != nil {
if errors.Is(err, os.ErrClosed) && f.closed.Load() {
return
if !f.closed.Load() {
f.l.WithError(err).Error("Error while reading outbound packet, closing")
//TODO: Trigger Control to close
}
f.l.WithError(err).Error("Error while reading outbound packet")
// This only seems to happen when something fatal happens to the fd, so exit.
os.Exit(2)
break
}
f.consumeInsidePacket(packet[:n], fwPacket, nb, out, i, conntrackCache.Get(f.l))
*p = (*p)[:n]
//TODO: nonblocking channel write
f.outbound <- p
//select {
//case f.outbound <- p:
//default:
// f.l.Error("Dropped packet from outbound channel")
//}
}
f.l.Debugf("overlay reader %v is done", i)
f.wg.Done()
}
func (f *Interface) workerIn(i int, ctx context.Context) {
lhh := f.lightHouse.NewRequestHandler()
conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
fwPacket2 := &firewall.Packet{}
nb2 := make([]byte, 12, 12)
result2 := make([]byte, mtu)
h := &header.H{}
for {
select {
case p := <-f.inbound:
f.readOutsidePackets(p.Addr, nil, result2[:0], p.Payload, h, fwPacket2, lhh, nb2, i, conntrackCache.Get(f.l))
p.Payload = p.Payload[:mtu]
f.inPool.Put(p)
case <-ctx.Done():
f.wg.Done()
return
}
}
}
func (f *Interface) workerOut(i int, ctx context.Context) {
conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
fwPacket1 := &firewall.Packet{}
nb1 := make([]byte, 12, 12)
result1 := make([]byte, mtu)
for {
select {
case data := <-f.outbound:
f.consumeInsidePacket(*data, fwPacket1, nb1, result1, i, conntrackCache.Get(f.l))
*data = (*data)[:mtu]
f.outPool.Put(data)
case <-ctx.Done():
f.wg.Done()
return
}
}
}
@@ -451,6 +539,7 @@ func (f *Interface) GetCertState() *CertState {
func (f *Interface) Close() error {
f.closed.Store(true)
// Release the udp readers
for _, u := range f.writers {
err := u.Close()
if err != nil {
@@ -458,6 +547,13 @@ func (f *Interface) Close() error {
}
}
// Release the tun device
return f.inside.Close()
// Release the tun readers
for _, u := range f.readers {
err := u.Close()
if err != nil {
f.l.WithError(err).Error("Error while closing tun device")
}
}
return nil
}

18
main.go
View File

@@ -284,14 +284,14 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
}
return &Control{
ifce,
l,
ctx,
cancel,
sshStart,
statsStart,
dnsStart,
lightHouse.StartUpdateWorker,
connManager.Start,
f: ifce,
l: l,
ctx: ctx,
cancel: cancel,
sshStart: sshStart,
statsStart: statsStart,
dnsStart: dnsStart,
lighthouseStart: lightHouse.StartUpdateWorker,
connectionManagerStart: connManager.Start,
}, nil
}

View File

@@ -29,7 +29,7 @@ func (f *Interface) readOutsidePackets(ip netip.AddrPort, via *ViaSender, out []
return
}
//l.Error("in packet ", header, packet[HeaderLen:])
//f.l.Error("in packet ", h)
if ip.IsValid() {
if f.myVpnNetworksTable.Contains(ip.Addr()) {
if f.l.Level >= logrus.DebugLevel {
@@ -245,6 +245,7 @@ func (f *Interface) handleHostRoaming(hostinfo *HostInfo, udpAddr netip.AddrPort
return
}
//TODO: Seems we have a bunch of stuff racing here, since we don't have a lock on hostinfo anymore we announce roaming in bursts
hostinfo.logger(f.l).WithField("udpAddr", hostinfo.remote).WithField("newAddr", udpAddr).
Info("Host roamed to new udp ip/port.")
hostinfo.lastRoam = time.Now()
@@ -470,7 +471,7 @@ func (f *Interface) decryptToTun(hostinfo *HostInfo, messageCounter uint64, out
out, err = hostinfo.ConnectionState.dKey.DecryptDanger(out, packet[:header.Len], packet[header.Len:], messageCounter, nb)
if err != nil {
hostinfo.logger(f.l).WithError(err).Error("Failed to decrypt packet")
hostinfo.logger(f.l).WithError(err).WithField("fwPacket", fwPacket).Error("Failed to decrypt packet")
return false
}

12
packet/packet.go Normal file
View File

@@ -0,0 +1,12 @@
package packet
import "net/netip"
type Packet struct {
Payload []byte
Addr netip.AddrPort
}
func New() *Packet {
return &Packet{Payload: make([]byte, 9001)}
}

View File

@@ -9,10 +9,13 @@ import (
"math"
"net"
"net/netip"
"os"
"strings"
"sync"
"github.com/sirupsen/logrus"
"github.com/slackhq/nebula"
"github.com/slackhq/nebula/config"
"github.com/slackhq/nebula/overlay"
"golang.org/x/sync/errgroup"
"gvisor.dev/gvisor/pkg/buffer"
@@ -43,8 +46,19 @@ type Service struct {
}
}
func New(control *nebula.Control) (*Service, error) {
control.Start()
func New(config *config.C) (*Service, error) {
logger := logrus.New()
logger.Out = os.Stdout
control, err := nebula.Main(config, false, "custom-app", logger, overlay.NewUserDeviceFromConfig)
if err != nil {
return nil, err
}
wait, err := control.Start()
if err != nil {
return nil, err
}
ctx := control.Context()
eg, ctx := errgroup.WithContext(ctx)
@@ -141,6 +155,12 @@ func New(control *nebula.Control) (*Service, error) {
}
})
// Add the nebula wait function to the group
eg.Go(func() error {
wait()
return nil
})
return &s, nil
}

View File

@@ -16,7 +16,7 @@ type EncReader func(
type Conn interface {
Rebind() error
LocalAddr() (netip.AddrPort, error)
ListenOut(r EncReader)
ListenOut(r EncReader) error
WriteTo(b []byte, addr netip.AddrPort) error
ReloadConfig(c *config.C)
Close() error

View File

@@ -165,7 +165,7 @@ func NewUDPStatsEmitter(udpConns []Conn) func() {
return func() {}
}
func (u *StdConn) ListenOut(r EncReader) {
func (u *StdConn) ListenOut(r EncReader) error {
buffer := make([]byte, MTU)
for {
@@ -174,14 +174,17 @@ func (u *StdConn) ListenOut(r EncReader) {
if err != nil {
if errors.Is(err, net.ErrClosed) {
u.l.WithError(err).Debug("udp socket is closed, exiting read loop")
return
return err
}
u.l.WithError(err).Error("unexpected udp socket receive error")
continue
}
r(netip.AddrPortFrom(rua.Addr().Unmap(), rua.Port()), buffer[:n])
}
return nil
}
func (u *StdConn) Rebind() error {

View File

@@ -71,15 +71,14 @@ type rawMessage struct {
Len uint32
}
func (u *GenericConn) ListenOut(r EncReader) {
func (u *GenericConn) ListenOut(r EncReader) error {
buffer := make([]byte, MTU)
for {
// Just read one packet at a time
n, rua, err := u.ReadFromUDPAddrPort(buffer)
if err != nil {
u.l.WithError(err).Debug("udp socket is closed, exiting read loop")
return
return err
}
r(netip.AddrPortFrom(rua.Addr().Unmap(), rua.Port()), buffer[:n])

View File

@@ -5,10 +5,13 @@ package udp
import (
"encoding/binary"
"errors"
"fmt"
"net"
"net/netip"
"sync"
"syscall"
"time"
"unsafe"
"github.com/rcrowley/go-metrics"
@@ -17,19 +20,40 @@ import (
"golang.org/x/sys/unix"
)
var readTimeout = unix.NsecToTimeval(int64(time.Millisecond * 500))
const (
defaultGSOMaxSegments = 8
defaultGSOFlushTimeout = 150 * time.Microsecond
defaultGROReadBufferSize = MTU * defaultGSOMaxSegments
maxGSOBatchBytes = 0xFFFF
)
var (
errGSOFallback = errors.New("udp gso fallback")
errGSODisabled = errors.New("udp gso disabled")
)
type StdConn struct {
sysFd int
isV4 bool
l *logrus.Logger
batch int
}
func maybeIPV4(ip net.IP) (net.IP, bool) {
ip4 := ip.To4()
if ip4 != nil {
return ip4, true
}
return ip, false
enableGRO bool
enableGSO bool
gsoMu sync.Mutex
gsoBuf []byte
gsoAddr netip.AddrPort
gsoSegSize int
gsoSegments int
gsoMaxSegments int
gsoMaxBytes int
gsoFlushTimeout time.Duration
gsoTimer *time.Timer
groBufSize int
}
func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch int) (Conn, error) {
@@ -55,6 +79,11 @@ func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch in
}
}
// Set a read timeout
if err = unix.SetsockoptTimeval(fd, unix.SOL_SOCKET, unix.SO_RCVTIMEO, &readTimeout); err != nil {
return nil, fmt.Errorf("unable to set SO_RCVTIMEO: %s", err)
}
var sa unix.Sockaddr
if ip.Is4() {
sa4 := &unix.SockaddrInet4{Port: port}
@@ -69,7 +98,16 @@ func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch in
return nil, fmt.Errorf("unable to bind to socket: %s", err)
}
return &StdConn{sysFd: fd, isV4: ip.Is4(), l: l, batch: batch}, err
return &StdConn{
sysFd: fd,
isV4: ip.Is4(),
l: l,
batch: batch,
gsoMaxSegments: defaultGSOMaxSegments,
gsoMaxBytes: MTU * defaultGSOMaxSegments,
gsoFlushTimeout: defaultGSOFlushTimeout,
groBufSize: MTU,
}, err
}
func (u *StdConn) Rebind() error {
@@ -118,20 +156,46 @@ func (u *StdConn) LocalAddr() (netip.AddrPort, error) {
}
}
func (u *StdConn) ListenOut(r EncReader) {
var ip netip.Addr
func (u *StdConn) ListenOut(r EncReader) error {
var (
ip netip.Addr
controls [][]byte
)
msgs, buffers, names := u.PrepareRawMessages(u.batch)
bufSize := u.readBufferSize()
msgs, buffers, names := u.PrepareRawMessages(u.batch, bufSize)
read := u.ReadMulti
if u.batch == 1 {
read = u.ReadSingle
}
for {
desired := u.readBufferSize()
if len(buffers) == 0 || cap(buffers[0]) < desired {
msgs, buffers, names = u.PrepareRawMessages(u.batch, desired)
controls = nil
}
if u.enableGRO {
if controls == nil {
controls = make([][]byte, len(msgs))
for i := range controls {
controls[i] = make([]byte, unix.CmsgSpace(4))
}
}
for i := range msgs {
setRawMessageControl(&msgs[i], controls[i])
}
} else if controls != nil {
for i := range msgs {
setRawMessageControl(&msgs[i], nil)
}
controls = nil
}
n, err := read(msgs)
if err != nil {
u.l.WithError(err).Debug("udp socket is closed, exiting read loop")
return
return err
}
for i := 0; i < n; i++ {
@@ -141,11 +205,82 @@ func (u *StdConn) ListenOut(r EncReader) {
} else {
ip, _ = netip.AddrFromSlice(names[i][8:24])
}
r(netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4])), buffers[i][:msgs[i].Len])
addr := netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4]))
payload := buffers[i][:msgs[i].Len]
if u.enableGRO && u.l.IsLevelEnabled(logrus.DebugLevel) {
ctrlLen := getRawMessageControlLen(&msgs[i])
msgFlags := getRawMessageFlags(&msgs[i])
u.l.WithFields(logrus.Fields{
"tag": "gro-debug",
"stage": "recv",
"payload_len": len(payload),
"ctrl_len": ctrlLen,
"msg_flags": msgFlags,
}).Debug("gro batch data")
if controls != nil && ctrlLen > 0 {
maxDump := ctrlLen
if maxDump > 16 {
maxDump = 16
}
u.l.WithFields(logrus.Fields{
"tag": "gro-debug",
"stage": "control-bytes",
"control_hex": fmt.Sprintf("%x", controls[i][:maxDump]),
"datalen": ctrlLen,
}).Debug("gro control dump")
}
}
sawControl := false
if controls != nil {
if ctrlLen := getRawMessageControlLen(&msgs[i]); ctrlLen > 0 {
if segSize, segCount := parseGROControl(controls[i][:ctrlLen]); segSize > 0 {
sawControl = true
if u.l.IsLevelEnabled(logrus.DebugLevel) {
u.l.WithFields(logrus.Fields{
"tag": "gro-debug",
"stage": "control",
"seg_size": segSize,
"seg_count": segCount,
"payloadLen": len(payload),
}).Debug("gro control parsed")
}
segSize = normalizeGROSegSize(segSize, segCount, len(payload))
if segSize > 0 && segSize < len(payload) {
if u.emitGROSegments(r, addr, payload, segSize) {
continue
}
}
}
}
}
if u.enableGRO && len(payload) > MTU {
if !sawControl && u.l.IsLevelEnabled(logrus.DebugLevel) {
u.l.WithFields(logrus.Fields{
"tag": "gro-debug",
"stage": "fallback",
"payload_len": len(payload),
}).Debug("gro control missing; splitting payload by MTU")
}
if u.emitGROSegments(r, addr, payload, MTU) {
continue
}
}
r(addr, payload)
}
}
}
func (u *StdConn) readBufferSize() int {
if u.enableGRO && u.groBufSize > MTU {
return u.groBufSize
}
return MTU
}
func (u *StdConn) ReadSingle(msgs []rawMessage) (int, error) {
for {
n, _, err := unix.Syscall6(
@@ -159,6 +294,9 @@ func (u *StdConn) ReadSingle(msgs []rawMessage) (int, error) {
)
if err != 0 {
if err == unix.EAGAIN || err == unix.EINTR {
continue
}
return 0, &net.OpError{Op: "recvmsg", Err: err}
}
@@ -180,6 +318,9 @@ func (u *StdConn) ReadMulti(msgs []rawMessage) (int, error) {
)
if err != 0 {
if err == unix.EAGAIN || err == unix.EINTR {
continue
}
return 0, &net.OpError{Op: "recvmmsg", Err: err}
}
@@ -188,6 +329,14 @@ func (u *StdConn) ReadMulti(msgs []rawMessage) (int, error) {
}
func (u *StdConn) WriteTo(b []byte, ip netip.AddrPort) error {
if u.enableGSO && ip.IsValid() {
if err := u.queueGSOPacket(b, ip); err == nil {
return nil
} else if !errors.Is(err, errGSOFallback) {
return err
}
}
if u.isV4 {
return u.writeTo4(b, ip)
}
@@ -221,7 +370,7 @@ func (u *StdConn) writeTo6(b []byte, ip netip.AddrPort) error {
func (u *StdConn) writeTo4(b []byte, ip netip.AddrPort) error {
if !ip.Addr().Is4() {
return ErrInvalidIPv6RemoteForSocket
return fmt.Errorf("Listener is IPv4, but writing to IPv6 remote")
}
var rsa unix.RawSockaddrInet4
@@ -294,6 +443,94 @@ func (u *StdConn) ReloadConfig(c *config.C) {
u.l.WithError(err).Error("Failed to set listen.so_mark")
}
}
u.configureGRO(c)
u.configureGSO(c)
}
func (u *StdConn) configureGRO(c *config.C) {
if c == nil {
return
}
enable := c.GetBool("listen.enable_gro", false)
if enable == u.enableGRO {
if enable {
if size := c.GetInt("listen.gro_read_buffer", 0); size > 0 {
u.setGROBufferSize(size)
}
}
return
}
if enable {
if err := unix.SetsockoptInt(u.sysFd, unix.SOL_UDP, unix.UDP_GRO, 1); err != nil {
u.l.WithError(err).Warn("Failed to enable UDP GRO")
return
}
u.enableGRO = true
u.setGROBufferSize(c.GetInt("listen.gro_read_buffer", defaultGROReadBufferSize))
u.l.WithField("buffer_size", u.groBufSize).Info("UDP GRO enabled")
return
}
if err := unix.SetsockoptInt(u.sysFd, unix.SOL_UDP, unix.UDP_GRO, 0); err != nil && err != unix.ENOPROTOOPT {
u.l.WithError(err).Warn("Failed to disable UDP GRO")
}
u.enableGRO = false
u.groBufSize = MTU
}
func (u *StdConn) configureGSO(c *config.C) {
enable := c.GetBool("listen.enable_gso", false)
if !enable {
u.disableGSO()
} else {
u.enableGSO = true
}
segments := c.GetInt("listen.gso_max_segments", defaultGSOMaxSegments)
if segments < 1 {
segments = 1
}
u.gsoMaxSegments = segments
maxBytes := c.GetInt("listen.gso_max_bytes", 0)
if maxBytes <= 0 {
maxBytes = MTU * segments
}
if maxBytes > maxGSOBatchBytes {
u.l.WithField("requested", maxBytes).Warn("listen.gso_max_bytes larger than UDP limit; clamping")
maxBytes = maxGSOBatchBytes
}
u.gsoMaxBytes = maxBytes
timeout := c.GetDuration("listen.gso_flush_timeout", defaultGSOFlushTimeout)
if timeout < 0 {
timeout = 0
}
u.gsoFlushTimeout = timeout
}
func (u *StdConn) setGROBufferSize(size int) {
if size < MTU {
size = defaultGROReadBufferSize
}
if size > maxGSOBatchBytes {
size = maxGSOBatchBytes
}
u.groBufSize = size
}
func (u *StdConn) disableGSO() {
u.gsoMu.Lock()
defer u.gsoMu.Unlock()
u.enableGSO = false
_ = u.flushGSOlocked()
u.gsoBuf = nil
u.gsoSegments = 0
u.gsoSegSize = 0
u.stopGSOTimerLocked()
}
func (u *StdConn) getMemInfo(meminfo *[unix.SK_MEMINFO_VARS]uint32) error {
@@ -305,7 +542,239 @@ func (u *StdConn) getMemInfo(meminfo *[unix.SK_MEMINFO_VARS]uint32) error {
return nil
}
func (u *StdConn) queueGSOPacket(b []byte, addr netip.AddrPort) error {
if len(b) == 0 {
return nil
}
u.gsoMu.Lock()
defer u.gsoMu.Unlock()
if !u.enableGSO || !addr.IsValid() || len(b) > u.gsoMaxBytes {
if err := u.flushGSOlocked(); err != nil {
return err
}
return errGSOFallback
}
if u.gsoSegments == 0 {
if cap(u.gsoBuf) < u.gsoMaxBytes {
u.gsoBuf = make([]byte, 0, u.gsoMaxBytes)
}
u.gsoAddr = addr
u.gsoSegSize = len(b)
} else if addr != u.gsoAddr || len(b) != u.gsoSegSize {
if err := u.flushGSOlocked(); err != nil {
return err
}
if cap(u.gsoBuf) < u.gsoMaxBytes {
u.gsoBuf = make([]byte, 0, u.gsoMaxBytes)
}
u.gsoAddr = addr
u.gsoSegSize = len(b)
}
if len(u.gsoBuf)+len(b) > u.gsoMaxBytes {
if err := u.flushGSOlocked(); err != nil {
return err
}
if cap(u.gsoBuf) < u.gsoMaxBytes {
u.gsoBuf = make([]byte, 0, u.gsoMaxBytes)
}
u.gsoAddr = addr
u.gsoSegSize = len(b)
}
u.gsoBuf = append(u.gsoBuf, b...)
u.gsoSegments++
if u.gsoSegments >= u.gsoMaxSegments || u.gsoFlushTimeout <= 0 {
return u.flushGSOlocked()
}
u.scheduleGSOFlushLocked()
return nil
}
func (u *StdConn) flushGSOlocked() error {
if u.gsoSegments == 0 {
u.stopGSOTimerLocked()
return nil
}
payload := append([]byte(nil), u.gsoBuf...)
addr := u.gsoAddr
segSize := u.gsoSegSize
u.gsoBuf = u.gsoBuf[:0]
u.gsoSegments = 0
u.gsoSegSize = 0
u.stopGSOTimerLocked()
if segSize <= 0 {
return errGSOFallback
}
err := u.sendSegmented(payload, addr, segSize)
if errors.Is(err, errGSODisabled) {
u.l.WithField("addr", addr).Warn("UDP GSO disabled by kernel, falling back to sendto")
u.enableGSO = false
return u.sendSegmentsIndividually(payload, addr, segSize)
}
return err
}
func (u *StdConn) sendSegmented(payload []byte, addr netip.AddrPort, segSize int) error {
if len(payload) == 0 {
return nil
}
control := make([]byte, unix.CmsgSpace(2))
hdr := (*unix.Cmsghdr)(unsafe.Pointer(&control[0]))
hdr.Level = unix.SOL_UDP
hdr.Type = unix.UDP_SEGMENT
setCmsgLen(hdr, unix.CmsgLen(2))
binary.NativeEndian.PutUint16(control[unix.CmsgLen(0):unix.CmsgLen(0)+2], uint16(segSize))
var sa unix.Sockaddr
if addr.Addr().Is4() {
var sa4 unix.SockaddrInet4
sa4.Port = int(addr.Port())
sa4.Addr = addr.Addr().As4()
sa = &sa4
} else {
var sa6 unix.SockaddrInet6
sa6.Port = int(addr.Port())
sa6.Addr = addr.Addr().As16()
sa = &sa6
}
if _, err := unix.SendmsgN(u.sysFd, payload, control, sa, 0); err != nil {
if errno, ok := err.(syscall.Errno); ok && (errno == unix.EINVAL || errno == unix.ENOTSUP || errno == unix.EOPNOTSUPP) {
return errGSODisabled
}
return &net.OpError{Op: "sendmsg", Err: err}
}
return nil
}
func (u *StdConn) sendSegmentsIndividually(buf []byte, addr netip.AddrPort, segSize int) error {
if segSize <= 0 {
return errGSOFallback
}
for offset := 0; offset < len(buf); offset += segSize {
end := offset + segSize
if end > len(buf) {
end = len(buf)
}
var err error
if u.isV4 {
err = u.writeTo4(buf[offset:end], addr)
} else {
err = u.writeTo6(buf[offset:end], addr)
}
if err != nil {
return err
}
}
return nil
}
func (u *StdConn) scheduleGSOFlushLocked() {
if u.gsoTimer == nil {
u.gsoTimer = time.AfterFunc(u.gsoFlushTimeout, u.gsoFlushTimer)
return
}
u.gsoTimer.Reset(u.gsoFlushTimeout)
}
func (u *StdConn) stopGSOTimerLocked() {
if u.gsoTimer != nil {
u.gsoTimer.Stop()
u.gsoTimer = nil
}
}
func (u *StdConn) gsoFlushTimer() {
u.gsoMu.Lock()
defer u.gsoMu.Unlock()
_ = u.flushGSOlocked()
}
func parseGROControl(control []byte) (int, int) {
if len(control) == 0 {
return 0, 0
}
cmsgs, err := unix.ParseSocketControlMessage(control)
if err != nil {
return 0, 0
}
for _, c := range cmsgs {
if c.Header.Level == unix.SOL_UDP && c.Header.Type == unix.UDP_GRO && len(c.Data) >= 2 {
segSize := int(binary.NativeEndian.Uint16(c.Data[:2]))
segCount := 0
if len(c.Data) >= 4 {
segCount = int(binary.NativeEndian.Uint16(c.Data[2:4]))
}
return segSize, segCount
}
}
return 0, 0
}
func (u *StdConn) emitGROSegments(r EncReader, addr netip.AddrPort, payload []byte, segSize int) bool {
if segSize <= 0 {
return false
}
for offset := 0; offset < len(payload); offset += segSize {
end := offset + segSize
if end > len(payload) {
end = len(payload)
}
segment := make([]byte, end-offset)
copy(segment, payload[offset:end])
r(addr, segment)
}
return true
}
func normalizeGROSegSize(segSize, segCount, total int) int {
if segSize <= 0 || total <= 0 {
return segSize
}
if segSize > total && segCount > 0 {
segSize = total / segCount
if segSize == 0 {
segSize = total
}
}
if segCount <= 1 && segSize > 0 && total > segSize {
calculated := total / segSize
if calculated <= 1 {
calculated = (total + segSize - 1) / segSize
}
if calculated > 1 {
segCount = calculated
}
}
if segSize > MTU {
return MTU
}
return segSize
}
func (u *StdConn) Close() error {
u.disableGSO()
return syscall.Close(u.sysFd)
}

View File

@@ -30,13 +30,16 @@ type rawMessage struct {
Len uint32
}
func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
func (u *StdConn) PrepareRawMessages(n int, bufSize int) ([]rawMessage, [][]byte, [][]byte) {
if bufSize <= 0 {
bufSize = MTU
}
msgs := make([]rawMessage, n)
buffers := make([][]byte, n)
names := make([][]byte, n)
for i := range msgs {
buffers[i] = make([]byte, MTU)
buffers[i] = make([]byte, bufSize)
names[i] = make([]byte, unix.SizeofSockaddrInet6)
vs := []iovec{
@@ -52,3 +55,25 @@ func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
return msgs, buffers, names
}
func setRawMessageControl(msg *rawMessage, buf []byte) {
if len(buf) == 0 {
msg.Hdr.Control = nil
msg.Hdr.Controllen = 0
return
}
msg.Hdr.Control = &buf[0]
msg.Hdr.Controllen = uint32(len(buf))
}
func getRawMessageControlLen(msg *rawMessage) int {
return int(msg.Hdr.Controllen)
}
func getRawMessageFlags(msg *rawMessage) int {
return int(msg.Hdr.Flags)
}
func setCmsgLen(h *unix.Cmsghdr, l int) {
h.Len = uint32(l)
}

View File

@@ -33,13 +33,16 @@ type rawMessage struct {
Pad0 [4]byte
}
func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
func (u *StdConn) PrepareRawMessages(n int, bufSize int) ([]rawMessage, [][]byte, [][]byte) {
if bufSize <= 0 {
bufSize = MTU
}
msgs := make([]rawMessage, n)
buffers := make([][]byte, n)
names := make([][]byte, n)
for i := range msgs {
buffers[i] = make([]byte, MTU)
buffers[i] = make([]byte, bufSize)
names[i] = make([]byte, unix.SizeofSockaddrInet6)
vs := []iovec{
@@ -55,3 +58,25 @@ func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
return msgs, buffers, names
}
func setRawMessageControl(msg *rawMessage, buf []byte) {
if len(buf) == 0 {
msg.Hdr.Control = nil
msg.Hdr.Controllen = 0
return
}
msg.Hdr.Control = &buf[0]
msg.Hdr.Controllen = uint64(len(buf))
}
func getRawMessageControlLen(msg *rawMessage) int {
return int(msg.Hdr.Controllen)
}
func getRawMessageFlags(msg *rawMessage) int {
return int(msg.Hdr.Flags)
}
func setCmsgLen(h *unix.Cmsghdr, l int) {
h.Len = uint64(l)
}

View File

@@ -134,7 +134,7 @@ func (u *RIOConn) bind(sa windows.Sockaddr) error {
return nil
}
func (u *RIOConn) ListenOut(r EncReader) {
func (u *RIOConn) ListenOut(r EncReader) error {
buffer := make([]byte, MTU)
for {