From 6b6a4bc1cc7ac9451cfed9c2182ba3b7402eb4a0 Mon Sep 17 00:00:00 2001 From: Jay Wren Date: Wed, 4 Feb 2026 10:29:37 -0500 Subject: [PATCH] claude implements UDP GSO --- main.go | 2 +- udp/conn.go | 4 + udp/udp_darwin.go | 4 + udp/udp_generic.go | 4 + udp/udp_linux.go | 203 ++++++++++++++++++++++++++++++++++++++++- udp/udp_linux_32.go | 4 + udp/udp_linux_64.go | 4 + udp/udp_rio_windows.go | 4 + udp/udp_tester.go | 4 + 9 files changed, 227 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index ba90d0dc..5d7f8d67 100644 --- a/main.go +++ b/main.go @@ -250,7 +250,7 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg punchy: punchy, ConntrackCacheTimeout: conntrackCacheTimeout, l: l, - tunBatchSize: c.GetInt("tun.batch", 64), + tunBatchSize: c.GetInt("listen.batch", 64), } var ifce *Interface diff --git a/udp/conn.go b/udp/conn.go index 95f20d53..39aa2ccb 100644 --- a/udp/conn.go +++ b/udp/conn.go @@ -27,6 +27,7 @@ type Conn interface { WriteBatch(pkts []BatchPacket) (int, error) ReloadConfig(c *config.C) SupportsMultipleReaders() bool + SupportsGSO() bool Close() error } @@ -44,6 +45,9 @@ func (NoopConn) ListenOut(_ EncReader) { func (NoopConn) SupportsMultipleReaders() bool { return false } +func (NoopConn) SupportsGSO() bool { + return false +} func (NoopConn) WriteTo(_ []byte, _ netip.AddrPort) error { return nil } diff --git a/udp/udp_darwin.go b/udp/udp_darwin.go index 8360ed4e..7b146a02 100644 --- a/udp/udp_darwin.go +++ b/udp/udp_darwin.go @@ -188,6 +188,10 @@ func (u *StdConn) SupportsMultipleReaders() bool { return false } +func (u *StdConn) SupportsGSO() bool { + return false +} + func (u *StdConn) Rebind() error { var err error if u.isV4 { diff --git a/udp/udp_generic.go b/udp/udp_generic.go index bd47f40c..f2d41d1a 100644 --- a/udp/udp_generic.go +++ b/udp/udp_generic.go @@ -102,6 +102,10 @@ func (u *GenericConn) SupportsMultipleReaders() bool { return false } +func (u *GenericConn) SupportsGSO() bool { + return false +} + func (u *GenericConn) WriteBatch(pkts []BatchPacket) (int, error) { for i := range pkts { if err := u.WriteTo(pkts[i].Payload, pkts[i].Addr); err != nil { diff --git a/udp/udp_linux.go b/udp/udp_linux.go index 392f19ce..a151aebf 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -5,6 +5,7 @@ package udp import ( "encoding/binary" + "errors" "fmt" "net" "net/netip" @@ -18,10 +19,11 @@ import ( ) type StdConn struct { - sysFd int - isV4 bool - l *logrus.Logger - batch int + sysFd int + isV4 bool + l *logrus.Logger + batch int + gsoSupported bool } func maybeIPV4(ip net.IP) (net.IP, bool) { @@ -32,6 +34,34 @@ func maybeIPV4(ip net.IP) (net.IP, bool) { return ip, false } +// supportsUDPOffload checks if the kernel supports UDP GSO (Generic Segmentation Offload) +// by attempting to get the UDP_SEGMENT socket option. +func supportsUDPOffload(fd int) bool { + _, err := unix.GetsockoptInt(fd, unix.IPPROTO_UDP, unix.UDP_SEGMENT) + return err == nil +} + +const ( + // Maximum number of datagrams that can be coalesced with GSO + udpSegmentMaxDatagrams = 64 +) + +// setGSOSize writes a UDP_SEGMENT control message to the provided buffer +// with the given segment size. Returns the actual control message length. +func setGSOSize(control []byte, gsoSize uint16) int { + // Build the cmsghdr structure + cmsgLen := unix.CmsgLen(2) // 2 bytes for uint16 segment size + cmsg := (*unix.Cmsghdr)(unsafe.Pointer(&control[0])) + cmsg.Level = unix.IPPROTO_UDP + cmsg.Type = unix.UDP_SEGMENT + cmsg.SetLen(cmsgLen) + + // Write the segment size after the header (after cmsghdr) + binary.NativeEndian.PutUint16(control[unix.SizeofCmsghdr:], gsoSize) + + return unix.CmsgSpace(2) // aligned size +} + func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch int) (Conn, error) { af := unix.AF_INET6 if ip.Is4() { @@ -69,13 +99,22 @@ func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch in return nil, fmt.Errorf("unable to bind to socket: %s", err) } - return &StdConn{sysFd: fd, isV4: ip.Is4(), l: l, batch: batch}, err + gsoSupported := supportsUDPOffload(fd) + if gsoSupported { + l.Info("UDP GSO offload is supported") + } + + return &StdConn{sysFd: fd, isV4: ip.Is4(), l: l, batch: batch, gsoSupported: gsoSupported}, err } func (u *StdConn) SupportsMultipleReaders() bool { return true } +func (u *StdConn) SupportsGSO() bool { + return u.gsoSupported +} + func (u *StdConn) Rebind() error { return nil } @@ -318,6 +357,16 @@ func (u *StdConn) WriteBatch(pkts []BatchPacket) (int, error) { return 0, nil } + // If GSO is supported, try to coalesce packets to the same destination + if u.gsoSupported { + return u.writeBatchGSO(pkts) + } + + return u.writeBatchSendmmsg(pkts) +} + +// writeBatchSendmmsg sends packets using sendmmsg without GSO coalescing +func (u *StdConn) writeBatchSendmmsg(pkts []BatchPacket) (int, error) { msgs := make([]rawMessage, len(pkts)) iovecs := make([]iovec, len(pkts)) var names4 []unix.RawSockaddrInet4 @@ -376,6 +425,150 @@ func (u *StdConn) WriteBatch(pkts []BatchPacket) (int, error) { return sent, nil } +// writeBatchGSO sends packets using GSO coalescing when possible. +// Packets to the same destination with the same size are coalesced into a single +// GSO message. Mixed destinations or sizes fall back to individual sendmmsg calls. +func (u *StdConn) writeBatchGSO(pkts []BatchPacket) (int, error) { + // Group packets by destination and try to coalesce + totalSent := 0 + i := 0 + + for i < len(pkts) { + // Find a run of packets to the same destination with compatible sizes + startIdx := i + dst := pkts[i].Addr + segmentSize := len(pkts[i].Payload) + + // Count how many packets we can coalesce (same destination, same size except possibly last) + coalescedCount := 1 + totalSize := segmentSize + for i+coalescedCount < len(pkts) && coalescedCount < udpSegmentMaxDatagrams { + next := pkts[i+coalescedCount] + if next.Addr != dst { + break + } + nextSize := len(next.Payload) + // For GSO, all packets except the last must have the same size + // The last packet can be smaller (but not larger) + if nextSize != segmentSize { + // Check if this could be the last packet (smaller is ok) + if nextSize < segmentSize && i+coalescedCount == len(pkts)-1 { + coalescedCount++ + totalSize += nextSize + } + break + } + coalescedCount++ + totalSize += nextSize + } + + // If we have multiple packets to coalesce, use GSO + if coalescedCount > 1 { + err := u.sendGSO(pkts[startIdx:startIdx+coalescedCount], dst, segmentSize, totalSize) + if err != nil { + // If GSO fails (e.g., EIO due to NIC not supporting checksum offload), + // disable GSO and fall back to sendmmsg for the rest + if isGSOError(err) { + u.l.WithError(err).Warn("GSO send failed, disabling GSO for this connection") + u.gsoSupported = false + // Send remaining packets with sendmmsg + remaining, rerr := u.writeBatchSendmmsg(pkts[startIdx:]) + return totalSent + remaining, rerr + } + return totalSent, err + } + totalSent += coalescedCount + i += coalescedCount + } else { + // Single packet, send without GSO overhead + err := u.WriteTo(pkts[i].Payload, pkts[i].Addr) + if err != nil { + return totalSent, err + } + totalSent++ + i++ + } + } + + return totalSent, nil +} + +// sendGSO sends coalesced packets using UDP GSO +func (u *StdConn) sendGSO(pkts []BatchPacket, dst netip.AddrPort, segmentSize, totalSize int) error { + // Allocate a buffer large enough for all packet payloads + coalescedBuf := make([]byte, totalSize) + offset := 0 + for _, pkt := range pkts { + copy(coalescedBuf[offset:], pkt.Payload) + offset += len(pkt.Payload) + } + + // Prepare control message with GSO segment size + control := make([]byte, unix.CmsgSpace(2)) + controlLen := setGSOSize(control, uint16(segmentSize)) + + // Prepare the iovec + iov := iovec{} + setIovecBase(&iov, &coalescedBuf[0]) + setIovecLen(&iov, totalSize) + + // Prepare the msghdr + var hdr msghdr + hdr.Iov = &iov + setMsghdrIovlen(&hdr, 1) + hdr.Control = &control[0] + setMsghdrControllen(&hdr, controlLen) + + // Set destination address + if u.isV4 { + var rsa unix.RawSockaddrInet4 + rsa.Family = unix.AF_INET + rsa.Addr = dst.Addr().As4() + binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa.Port))[:], dst.Port()) + hdr.Name = (*byte)(unsafe.Pointer(&rsa)) + hdr.Namelen = unix.SizeofSockaddrInet4 + } else { + var rsa unix.RawSockaddrInet6 + rsa.Family = unix.AF_INET6 + rsa.Addr = dst.Addr().As16() + binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa.Port))[:], dst.Port()) + hdr.Name = (*byte)(unsafe.Pointer(&rsa)) + hdr.Namelen = unix.SizeofSockaddrInet6 + } + + for { + _, _, errno := unix.Syscall6( + unix.SYS_SENDMSG, + uintptr(u.sysFd), + uintptr(unsafe.Pointer(&hdr)), + 0, + 0, + 0, + 0, + ) + + if errno == unix.EINTR { + continue + } + + if errno != 0 { + return &net.OpError{Op: "sendmsg", Err: errno} + } + + return nil + } +} + +// isGSOError returns true if the error indicates GSO is not supported by the NIC +func isGSOError(err error) bool { + var opErr *net.OpError + if !errors.As(err, &opErr) { + return false + } + // EIO typically means the NIC doesn't support checksum offload required for GSO + return errors.Is(opErr.Err, unix.EIO) +} + func NewUDPStatsEmitter(udpConns []Conn) func() { // Check if our kernel supports SO_MEMINFO before registering the gauges var udpGauges [][unix.SK_MEMINFO_VARS]metrics.Gauge diff --git a/udp/udp_linux_32.go b/udp/udp_linux_32.go index 1dec8793..10e73dea 100644 --- a/udp/udp_linux_32.go +++ b/udp/udp_linux_32.go @@ -64,3 +64,7 @@ func setIovecLen(iov *iovec, l int) { func setMsghdrIovlen(hdr *msghdr, l int) { hdr.Iovlen = uint32(l) } + +func setMsghdrControllen(hdr *msghdr, l int) { + hdr.Controllen = uint32(l) +} diff --git a/udp/udp_linux_64.go b/udp/udp_linux_64.go index 430f16ca..c5c3ae58 100644 --- a/udp/udp_linux_64.go +++ b/udp/udp_linux_64.go @@ -67,3 +67,7 @@ func setIovecLen(iov *iovec, l int) { func setMsghdrIovlen(hdr *msghdr, l int) { hdr.Iovlen = uint64(l) } + +func setMsghdrControllen(hdr *msghdr, l int) { + hdr.Controllen = uint64(l) +} diff --git a/udp/udp_rio_windows.go b/udp/udp_rio_windows.go index a456bf1f..f6ec71f0 100644 --- a/udp/udp_rio_windows.go +++ b/udp/udp_rio_windows.go @@ -332,6 +332,10 @@ func (u *RIOConn) SupportsMultipleReaders() bool { return false } +func (u *RIOConn) SupportsGSO() bool { + return false +} + func (u *RIOConn) Rebind() error { return nil } diff --git a/udp/udp_tester.go b/udp/udp_tester.go index 6f881110..6d40fd7b 100644 --- a/udp/udp_tester.go +++ b/udp/udp_tester.go @@ -131,6 +131,10 @@ func (u *TesterConn) SupportsMultipleReaders() bool { return false } +func (u *TesterConn) SupportsGSO() bool { + return false +} + func (u *TesterConn) Rebind() error { return nil }