From 030b7e27638b7c5663140f984ad2731e560e4930 Mon Sep 17 00:00:00 2001 From: Jay Wren Date: Wed, 4 Feb 2026 11:02:06 -0500 Subject: [PATCH] claude implements UDP GRO --- udp/conn.go | 4 ++ udp/udp_darwin.go | 4 ++ udp/udp_generic.go | 4 ++ udp/udp_linux.go | 124 ++++++++++++++++++++++++++++++++++++----- udp/udp_linux_32.go | 30 +++++++++- udp/udp_linux_64.go | 30 +++++++++- udp/udp_rio_windows.go | 4 ++ udp/udp_tester.go | 4 ++ 8 files changed, 183 insertions(+), 21 deletions(-) diff --git a/udp/conn.go b/udp/conn.go index 39aa2ccb..1138b483 100644 --- a/udp/conn.go +++ b/udp/conn.go @@ -28,6 +28,7 @@ type Conn interface { ReloadConfig(c *config.C) SupportsMultipleReaders() bool SupportsGSO() bool + SupportsGRO() bool Close() error } @@ -48,6 +49,9 @@ func (NoopConn) SupportsMultipleReaders() bool { func (NoopConn) SupportsGSO() bool { return false } +func (NoopConn) SupportsGRO() bool { + return false +} func (NoopConn) WriteTo(_ []byte, _ netip.AddrPort) error { return nil } diff --git a/udp/udp_darwin.go b/udp/udp_darwin.go index 7b146a02..52c544d1 100644 --- a/udp/udp_darwin.go +++ b/udp/udp_darwin.go @@ -192,6 +192,10 @@ func (u *StdConn) SupportsGSO() bool { return false } +func (u *StdConn) SupportsGRO() bool { + return false +} + func (u *StdConn) Rebind() error { var err error if u.isV4 { diff --git a/udp/udp_generic.go b/udp/udp_generic.go index f2d41d1a..29642c23 100644 --- a/udp/udp_generic.go +++ b/udp/udp_generic.go @@ -106,6 +106,10 @@ func (u *GenericConn) SupportsGSO() bool { return false } +func (u *GenericConn) SupportsGRO() bool { + return false +} + func (u *GenericConn) WriteBatch(pkts []BatchPacket) (int, error) { for i := range pkts { if err := u.WriteTo(pkts[i].Payload, pkts[i].Addr); err != nil { diff --git a/udp/udp_linux.go b/udp/udp_linux.go index a151aebf..04855a9f 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -24,6 +24,7 @@ type StdConn struct { l *logrus.Logger batch int gsoSupported bool + groSupported bool } func maybeIPV4(ip net.IP) (net.IP, bool) { @@ -41,9 +42,21 @@ func supportsUDPOffload(fd int) bool { return err == nil } +// supportsUDPGRO checks if the kernel supports UDP GRO (Generic Receive Offload) +// and attempts to enable it on the socket. +func supportsUDPGRO(fd int) bool { + // Try to enable UDP_GRO + err := unix.SetsockoptInt(fd, unix.IPPROTO_UDP, unix.UDP_GRO, 1) + return err == nil +} + const ( - // Maximum number of datagrams that can be coalesced with GSO + // Maximum number of datagrams that can be coalesced with GSO/GRO udpSegmentMaxDatagrams = 64 + + // Maximum size of a GRO coalesced packet (64KB is the practical limit) + // This is udpSegmentMaxDatagrams * MTU but capped at 65535 + groMaxPacketSize = 65535 ) // setGSOSize writes a UDP_SEGMENT control message to the provided buffer @@ -62,6 +75,38 @@ func setGSOSize(control []byte, gsoSize uint16) int { return unix.CmsgSpace(2) // aligned size } +// getGROSize parses a control message buffer to extract the UDP_GRO segment size. +// Returns 0 if no GRO control message is present (meaning the packet is not coalesced). +func getGROSize(control []byte, controlLen int) uint16 { + if controlLen < unix.SizeofCmsghdr { + return 0 + } + + // Parse control messages + for offset := 0; offset < controlLen; { + if offset+unix.SizeofCmsghdr > controlLen { + break + } + + cmsg := (*unix.Cmsghdr)(unsafe.Pointer(&control[offset])) + cmsgDataLen := int(cmsg.Len) - unix.SizeofCmsghdr + if cmsgDataLen < 0 { + break + } + + if cmsg.Level == unix.IPPROTO_UDP && cmsg.Type == unix.UDP_GRO { + if cmsgDataLen >= 2 { + return binary.NativeEndian.Uint16(control[offset+unix.SizeofCmsghdr:]) + } + } + + // Move to next control message (aligned) + offset += unix.CmsgSpace(cmsgDataLen) + } + + return 0 +} + func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch int) (Conn, error) { af := unix.AF_INET6 if ip.Is4() { @@ -104,7 +149,12 @@ func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch in l.Info("UDP GSO offload is supported") } - return &StdConn{sysFd: fd, isV4: ip.Is4(), l: l, batch: batch, gsoSupported: gsoSupported}, err + groSupported := supportsUDPGRO(fd) + if groSupported { + l.Info("UDP GRO offload is supported and enabled") + } + + return &StdConn{sysFd: fd, isV4: ip.Is4(), l: l, batch: batch, gsoSupported: gsoSupported, groSupported: groSupported}, err } func (u *StdConn) SupportsMultipleReaders() bool { @@ -115,6 +165,10 @@ func (u *StdConn) SupportsGSO() bool { return u.gsoSupported } +func (u *StdConn) SupportsGRO() bool { + return u.groSupported +} + func (u *StdConn) Rebind() error { return nil } @@ -164,13 +218,27 @@ func (u *StdConn) LocalAddr() (netip.AddrPort, error) { func (u *StdConn) ListenOut(r EncReader) { var ip netip.Addr - msgs, buffers, names := u.PrepareRawMessages(u.batch) + msgs, buffers, names, controls := u.PrepareRawMessages(u.batch) read := u.ReadMulti if u.batch == 1 { read = u.ReadSingle } + // Store the original control buffer size for resetting after each read + controlLen := 0 + if u.groSupported && len(controls) > 0 && len(controls[0]) > 0 { + controlLen = len(controls[0]) + } + for { + // Reset Controllen before each read - the kernel updates this field + // after recvmsg to indicate actual received control data length + if controlLen > 0 { + for i := range msgs { + setMsghdrControllen(&msgs[i].Hdr, controlLen) + } + } + n, err := read(msgs) if err != nil { u.l.WithError(err).Debug("udp socket is closed, exiting read loop") @@ -178,13 +246,36 @@ func (u *StdConn) ListenOut(r EncReader) { } for i := 0; i < n; i++ { - // Its ok to skip the ok check here, the slicing is the only error that can occur and it will panic + // Extract source address if u.isV4 { ip, _ = netip.AddrFromSlice(names[i][4:8]) } else { ip, _ = netip.AddrFromSlice(names[i][8:24]) } - r(netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4])), buffers[i][:msgs[i].Len]) + srcAddr := netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4])) + + // Check for GRO coalesced packet + totalLen := int(msgs[i].Len) + segmentSize := uint16(0) + if controlLen > 0 { + segmentSize = getGROSize(controls[i], getMsghdrControllen(&msgs[i].Hdr)) + } + + if segmentSize > 0 && totalLen > int(segmentSize) { + // This is a GRO coalesced packet - split it into individual datagrams + for offset := 0; offset < totalLen; { + packetLen := int(segmentSize) + if offset+packetLen > totalLen { + // Last packet may be smaller + packetLen = totalLen - offset + } + r(srcAddr, buffers[i][offset:offset+packetLen]) + offset += packetLen + } + } else { + // Single packet, no coalescing + r(srcAddr, buffers[i][:totalLen]) + } } } } @@ -519,20 +610,23 @@ func (u *StdConn) sendGSO(pkts []BatchPacket, dst netip.AddrPort, segmentSize, t hdr.Control = &control[0] setMsghdrControllen(&hdr, controlLen) + // Declare sockaddr at function scope so it remains valid for the syscall + // (must not go out of scope before the syscall is made) + var rsa4 unix.RawSockaddrInet4 + var rsa6 unix.RawSockaddrInet6 + // Set destination address if u.isV4 { - var rsa unix.RawSockaddrInet4 - rsa.Family = unix.AF_INET - rsa.Addr = dst.Addr().As4() - binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa.Port))[:], dst.Port()) - hdr.Name = (*byte)(unsafe.Pointer(&rsa)) + rsa4.Family = unix.AF_INET + rsa4.Addr = dst.Addr().As4() + binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa4.Port))[:], dst.Port()) + hdr.Name = (*byte)(unsafe.Pointer(&rsa4)) hdr.Namelen = unix.SizeofSockaddrInet4 } else { - var rsa unix.RawSockaddrInet6 - rsa.Family = unix.AF_INET6 - rsa.Addr = dst.Addr().As16() - binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa.Port))[:], dst.Port()) - hdr.Name = (*byte)(unsafe.Pointer(&rsa)) + rsa6.Family = unix.AF_INET6 + rsa6.Addr = dst.Addr().As16() + binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa6.Port))[:], dst.Port()) + hdr.Name = (*byte)(unsafe.Pointer(&rsa6)) hdr.Namelen = unix.SizeofSockaddrInet6 } diff --git a/udp/udp_linux_32.go b/udp/udp_linux_32.go index 10e73dea..c12cfff1 100644 --- a/udp/udp_linux_32.go +++ b/udp/udp_linux_32.go @@ -30,13 +30,26 @@ type rawMessage struct { Len uint32 } -func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) { +func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte, [][]byte) { msgs := make([]rawMessage, n) buffers := make([][]byte, n) names := make([][]byte, n) + controls := make([][]byte, n) + + // Use larger buffers if GRO is enabled to hold coalesced packets + bufSize := MTU + if u.groSupported { + bufSize = groMaxPacketSize + } + + // Control buffer size for receiving UDP_GRO segment size + controlSize := 0 + if u.groSupported { + controlSize = unix.CmsgSpace(2) // space for uint16 segment size + } for i := range msgs { - buffers[i] = make([]byte, MTU) + buffers[i] = make([]byte, bufSize) names[i] = make([]byte, unix.SizeofSockaddrInet6) vs := []iovec{ @@ -48,9 +61,16 @@ func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) { msgs[i].Hdr.Name = &names[i][0] msgs[i].Hdr.Namelen = uint32(len(names[i])) + + // Set up control message buffer for GRO + if controlSize > 0 { + controls[i] = make([]byte, controlSize) + msgs[i].Hdr.Control = &controls[i][0] + msgs[i].Hdr.Controllen = uint32(controlSize) + } } - return msgs, buffers, names + return msgs, buffers, names, controls } func setIovecBase(iov *iovec, base *byte) { @@ -68,3 +88,7 @@ func setMsghdrIovlen(hdr *msghdr, l int) { func setMsghdrControllen(hdr *msghdr, l int) { hdr.Controllen = uint32(l) } + +func getMsghdrControllen(hdr *msghdr) int { + return int(hdr.Controllen) +} diff --git a/udp/udp_linux_64.go b/udp/udp_linux_64.go index c5c3ae58..ba186c2a 100644 --- a/udp/udp_linux_64.go +++ b/udp/udp_linux_64.go @@ -33,13 +33,26 @@ type rawMessage struct { Pad0 [4]byte } -func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) { +func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte, [][]byte) { msgs := make([]rawMessage, n) buffers := make([][]byte, n) names := make([][]byte, n) + controls := make([][]byte, n) + + // Use larger buffers if GRO is enabled to hold coalesced packets + bufSize := MTU + if u.groSupported { + bufSize = groMaxPacketSize + } + + // Control buffer size for receiving UDP_GRO segment size + controlSize := 0 + if u.groSupported { + controlSize = unix.CmsgSpace(2) // space for uint16 segment size + } for i := range msgs { - buffers[i] = make([]byte, MTU) + buffers[i] = make([]byte, bufSize) names[i] = make([]byte, unix.SizeofSockaddrInet6) vs := []iovec{ @@ -51,9 +64,16 @@ func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) { msgs[i].Hdr.Name = &names[i][0] msgs[i].Hdr.Namelen = uint32(len(names[i])) + + // Set up control message buffer for GRO + if controlSize > 0 { + controls[i] = make([]byte, controlSize) + msgs[i].Hdr.Control = &controls[i][0] + msgs[i].Hdr.Controllen = uint64(controlSize) + } } - return msgs, buffers, names + return msgs, buffers, names, controls } func setIovecBase(iov *iovec, base *byte) { @@ -71,3 +91,7 @@ func setMsghdrIovlen(hdr *msghdr, l int) { func setMsghdrControllen(hdr *msghdr, l int) { hdr.Controllen = uint64(l) } + +func getMsghdrControllen(hdr *msghdr) int { + return int(hdr.Controllen) +} diff --git a/udp/udp_rio_windows.go b/udp/udp_rio_windows.go index f6ec71f0..9df952ac 100644 --- a/udp/udp_rio_windows.go +++ b/udp/udp_rio_windows.go @@ -336,6 +336,10 @@ func (u *RIOConn) SupportsGSO() bool { return false } +func (u *RIOConn) SupportsGRO() bool { + return false +} + func (u *RIOConn) Rebind() error { return nil } diff --git a/udp/udp_tester.go b/udp/udp_tester.go index 6d40fd7b..ae234981 100644 --- a/udp/udp_tester.go +++ b/udp/udp_tester.go @@ -135,6 +135,10 @@ func (u *TesterConn) SupportsGSO() bool { return false } +func (u *TesterConn) SupportsGRO() bool { + return false +} + func (u *TesterConn) Rebind() error { return nil }