claude implements UDP GRO

This commit is contained in:
Jay Wren
2026-02-04 11:02:06 -05:00
parent 6b6a4bc1cc
commit 030b7e2763
8 changed files with 183 additions and 21 deletions

View File

@@ -28,6 +28,7 @@ type Conn interface {
ReloadConfig(c *config.C) ReloadConfig(c *config.C)
SupportsMultipleReaders() bool SupportsMultipleReaders() bool
SupportsGSO() bool SupportsGSO() bool
SupportsGRO() bool
Close() error Close() error
} }
@@ -48,6 +49,9 @@ func (NoopConn) SupportsMultipleReaders() bool {
func (NoopConn) SupportsGSO() bool { func (NoopConn) SupportsGSO() bool {
return false return false
} }
func (NoopConn) SupportsGRO() bool {
return false
}
func (NoopConn) WriteTo(_ []byte, _ netip.AddrPort) error { func (NoopConn) WriteTo(_ []byte, _ netip.AddrPort) error {
return nil return nil
} }

View File

@@ -192,6 +192,10 @@ func (u *StdConn) SupportsGSO() bool {
return false return false
} }
func (u *StdConn) SupportsGRO() bool {
return false
}
func (u *StdConn) Rebind() error { func (u *StdConn) Rebind() error {
var err error var err error
if u.isV4 { if u.isV4 {

View File

@@ -106,6 +106,10 @@ func (u *GenericConn) SupportsGSO() bool {
return false return false
} }
func (u *GenericConn) SupportsGRO() bool {
return false
}
func (u *GenericConn) WriteBatch(pkts []BatchPacket) (int, error) { func (u *GenericConn) WriteBatch(pkts []BatchPacket) (int, error) {
for i := range pkts { for i := range pkts {
if err := u.WriteTo(pkts[i].Payload, pkts[i].Addr); err != nil { if err := u.WriteTo(pkts[i].Payload, pkts[i].Addr); err != nil {

View File

@@ -24,6 +24,7 @@ type StdConn struct {
l *logrus.Logger l *logrus.Logger
batch int batch int
gsoSupported bool gsoSupported bool
groSupported bool
} }
func maybeIPV4(ip net.IP) (net.IP, bool) { func maybeIPV4(ip net.IP) (net.IP, bool) {
@@ -41,9 +42,21 @@ func supportsUDPOffload(fd int) bool {
return err == nil return err == nil
} }
// supportsUDPGRO checks if the kernel supports UDP GRO (Generic Receive Offload)
// and attempts to enable it on the socket.
func supportsUDPGRO(fd int) bool {
// Try to enable UDP_GRO
err := unix.SetsockoptInt(fd, unix.IPPROTO_UDP, unix.UDP_GRO, 1)
return err == nil
}
const ( const (
// Maximum number of datagrams that can be coalesced with GSO // Maximum number of datagrams that can be coalesced with GSO/GRO
udpSegmentMaxDatagrams = 64 udpSegmentMaxDatagrams = 64
// Maximum size of a GRO coalesced packet (64KB is the practical limit)
// This is udpSegmentMaxDatagrams * MTU but capped at 65535
groMaxPacketSize = 65535
) )
// setGSOSize writes a UDP_SEGMENT control message to the provided buffer // setGSOSize writes a UDP_SEGMENT control message to the provided buffer
@@ -62,6 +75,38 @@ func setGSOSize(control []byte, gsoSize uint16) int {
return unix.CmsgSpace(2) // aligned size return unix.CmsgSpace(2) // aligned size
} }
// getGROSize parses a control message buffer to extract the UDP_GRO segment size.
// Returns 0 if no GRO control message is present (meaning the packet is not coalesced).
func getGROSize(control []byte, controlLen int) uint16 {
if controlLen < unix.SizeofCmsghdr {
return 0
}
// Parse control messages
for offset := 0; offset < controlLen; {
if offset+unix.SizeofCmsghdr > controlLen {
break
}
cmsg := (*unix.Cmsghdr)(unsafe.Pointer(&control[offset]))
cmsgDataLen := int(cmsg.Len) - unix.SizeofCmsghdr
if cmsgDataLen < 0 {
break
}
if cmsg.Level == unix.IPPROTO_UDP && cmsg.Type == unix.UDP_GRO {
if cmsgDataLen >= 2 {
return binary.NativeEndian.Uint16(control[offset+unix.SizeofCmsghdr:])
}
}
// Move to next control message (aligned)
offset += unix.CmsgSpace(cmsgDataLen)
}
return 0
}
func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch int) (Conn, error) { func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch int) (Conn, error) {
af := unix.AF_INET6 af := unix.AF_INET6
if ip.Is4() { if ip.Is4() {
@@ -104,7 +149,12 @@ func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch in
l.Info("UDP GSO offload is supported") l.Info("UDP GSO offload is supported")
} }
return &StdConn{sysFd: fd, isV4: ip.Is4(), l: l, batch: batch, gsoSupported: gsoSupported}, err groSupported := supportsUDPGRO(fd)
if groSupported {
l.Info("UDP GRO offload is supported and enabled")
}
return &StdConn{sysFd: fd, isV4: ip.Is4(), l: l, batch: batch, gsoSupported: gsoSupported, groSupported: groSupported}, err
} }
func (u *StdConn) SupportsMultipleReaders() bool { func (u *StdConn) SupportsMultipleReaders() bool {
@@ -115,6 +165,10 @@ func (u *StdConn) SupportsGSO() bool {
return u.gsoSupported return u.gsoSupported
} }
func (u *StdConn) SupportsGRO() bool {
return u.groSupported
}
func (u *StdConn) Rebind() error { func (u *StdConn) Rebind() error {
return nil return nil
} }
@@ -164,13 +218,27 @@ func (u *StdConn) LocalAddr() (netip.AddrPort, error) {
func (u *StdConn) ListenOut(r EncReader) { func (u *StdConn) ListenOut(r EncReader) {
var ip netip.Addr var ip netip.Addr
msgs, buffers, names := u.PrepareRawMessages(u.batch) msgs, buffers, names, controls := u.PrepareRawMessages(u.batch)
read := u.ReadMulti read := u.ReadMulti
if u.batch == 1 { if u.batch == 1 {
read = u.ReadSingle read = u.ReadSingle
} }
// Store the original control buffer size for resetting after each read
controlLen := 0
if u.groSupported && len(controls) > 0 && len(controls[0]) > 0 {
controlLen = len(controls[0])
}
for { for {
// Reset Controllen before each read - the kernel updates this field
// after recvmsg to indicate actual received control data length
if controlLen > 0 {
for i := range msgs {
setMsghdrControllen(&msgs[i].Hdr, controlLen)
}
}
n, err := read(msgs) n, err := read(msgs)
if err != nil { if err != nil {
u.l.WithError(err).Debug("udp socket is closed, exiting read loop") u.l.WithError(err).Debug("udp socket is closed, exiting read loop")
@@ -178,13 +246,36 @@ func (u *StdConn) ListenOut(r EncReader) {
} }
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
// Its ok to skip the ok check here, the slicing is the only error that can occur and it will panic // Extract source address
if u.isV4 { if u.isV4 {
ip, _ = netip.AddrFromSlice(names[i][4:8]) ip, _ = netip.AddrFromSlice(names[i][4:8])
} else { } else {
ip, _ = netip.AddrFromSlice(names[i][8:24]) ip, _ = netip.AddrFromSlice(names[i][8:24])
} }
r(netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4])), buffers[i][:msgs[i].Len]) srcAddr := netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4]))
// Check for GRO coalesced packet
totalLen := int(msgs[i].Len)
segmentSize := uint16(0)
if controlLen > 0 {
segmentSize = getGROSize(controls[i], getMsghdrControllen(&msgs[i].Hdr))
}
if segmentSize > 0 && totalLen > int(segmentSize) {
// This is a GRO coalesced packet - split it into individual datagrams
for offset := 0; offset < totalLen; {
packetLen := int(segmentSize)
if offset+packetLen > totalLen {
// Last packet may be smaller
packetLen = totalLen - offset
}
r(srcAddr, buffers[i][offset:offset+packetLen])
offset += packetLen
}
} else {
// Single packet, no coalescing
r(srcAddr, buffers[i][:totalLen])
}
} }
} }
} }
@@ -519,20 +610,23 @@ func (u *StdConn) sendGSO(pkts []BatchPacket, dst netip.AddrPort, segmentSize, t
hdr.Control = &control[0] hdr.Control = &control[0]
setMsghdrControllen(&hdr, controlLen) setMsghdrControllen(&hdr, controlLen)
// Declare sockaddr at function scope so it remains valid for the syscall
// (must not go out of scope before the syscall is made)
var rsa4 unix.RawSockaddrInet4
var rsa6 unix.RawSockaddrInet6
// Set destination address // Set destination address
if u.isV4 { if u.isV4 {
var rsa unix.RawSockaddrInet4 rsa4.Family = unix.AF_INET
rsa.Family = unix.AF_INET rsa4.Addr = dst.Addr().As4()
rsa.Addr = dst.Addr().As4() binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa4.Port))[:], dst.Port())
binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa.Port))[:], dst.Port()) hdr.Name = (*byte)(unsafe.Pointer(&rsa4))
hdr.Name = (*byte)(unsafe.Pointer(&rsa))
hdr.Namelen = unix.SizeofSockaddrInet4 hdr.Namelen = unix.SizeofSockaddrInet4
} else { } else {
var rsa unix.RawSockaddrInet6 rsa6.Family = unix.AF_INET6
rsa.Family = unix.AF_INET6 rsa6.Addr = dst.Addr().As16()
rsa.Addr = dst.Addr().As16() binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa6.Port))[:], dst.Port())
binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa.Port))[:], dst.Port()) hdr.Name = (*byte)(unsafe.Pointer(&rsa6))
hdr.Name = (*byte)(unsafe.Pointer(&rsa))
hdr.Namelen = unix.SizeofSockaddrInet6 hdr.Namelen = unix.SizeofSockaddrInet6
} }

View File

@@ -30,13 +30,26 @@ type rawMessage struct {
Len uint32 Len uint32
} }
func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) { func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte, [][]byte) {
msgs := make([]rawMessage, n) msgs := make([]rawMessage, n)
buffers := make([][]byte, n) buffers := make([][]byte, n)
names := make([][]byte, n) names := make([][]byte, n)
controls := make([][]byte, n)
// Use larger buffers if GRO is enabled to hold coalesced packets
bufSize := MTU
if u.groSupported {
bufSize = groMaxPacketSize
}
// Control buffer size for receiving UDP_GRO segment size
controlSize := 0
if u.groSupported {
controlSize = unix.CmsgSpace(2) // space for uint16 segment size
}
for i := range msgs { for i := range msgs {
buffers[i] = make([]byte, MTU) buffers[i] = make([]byte, bufSize)
names[i] = make([]byte, unix.SizeofSockaddrInet6) names[i] = make([]byte, unix.SizeofSockaddrInet6)
vs := []iovec{ vs := []iovec{
@@ -48,9 +61,16 @@ func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
msgs[i].Hdr.Name = &names[i][0] msgs[i].Hdr.Name = &names[i][0]
msgs[i].Hdr.Namelen = uint32(len(names[i])) msgs[i].Hdr.Namelen = uint32(len(names[i]))
// Set up control message buffer for GRO
if controlSize > 0 {
controls[i] = make([]byte, controlSize)
msgs[i].Hdr.Control = &controls[i][0]
msgs[i].Hdr.Controllen = uint32(controlSize)
}
} }
return msgs, buffers, names return msgs, buffers, names, controls
} }
func setIovecBase(iov *iovec, base *byte) { func setIovecBase(iov *iovec, base *byte) {
@@ -68,3 +88,7 @@ func setMsghdrIovlen(hdr *msghdr, l int) {
func setMsghdrControllen(hdr *msghdr, l int) { func setMsghdrControllen(hdr *msghdr, l int) {
hdr.Controllen = uint32(l) hdr.Controllen = uint32(l)
} }
func getMsghdrControllen(hdr *msghdr) int {
return int(hdr.Controllen)
}

View File

@@ -33,13 +33,26 @@ type rawMessage struct {
Pad0 [4]byte Pad0 [4]byte
} }
func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) { func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte, [][]byte) {
msgs := make([]rawMessage, n) msgs := make([]rawMessage, n)
buffers := make([][]byte, n) buffers := make([][]byte, n)
names := make([][]byte, n) names := make([][]byte, n)
controls := make([][]byte, n)
// Use larger buffers if GRO is enabled to hold coalesced packets
bufSize := MTU
if u.groSupported {
bufSize = groMaxPacketSize
}
// Control buffer size for receiving UDP_GRO segment size
controlSize := 0
if u.groSupported {
controlSize = unix.CmsgSpace(2) // space for uint16 segment size
}
for i := range msgs { for i := range msgs {
buffers[i] = make([]byte, MTU) buffers[i] = make([]byte, bufSize)
names[i] = make([]byte, unix.SizeofSockaddrInet6) names[i] = make([]byte, unix.SizeofSockaddrInet6)
vs := []iovec{ vs := []iovec{
@@ -51,9 +64,16 @@ func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
msgs[i].Hdr.Name = &names[i][0] msgs[i].Hdr.Name = &names[i][0]
msgs[i].Hdr.Namelen = uint32(len(names[i])) msgs[i].Hdr.Namelen = uint32(len(names[i]))
// Set up control message buffer for GRO
if controlSize > 0 {
controls[i] = make([]byte, controlSize)
msgs[i].Hdr.Control = &controls[i][0]
msgs[i].Hdr.Controllen = uint64(controlSize)
}
} }
return msgs, buffers, names return msgs, buffers, names, controls
} }
func setIovecBase(iov *iovec, base *byte) { func setIovecBase(iov *iovec, base *byte) {
@@ -71,3 +91,7 @@ func setMsghdrIovlen(hdr *msghdr, l int) {
func setMsghdrControllen(hdr *msghdr, l int) { func setMsghdrControllen(hdr *msghdr, l int) {
hdr.Controllen = uint64(l) hdr.Controllen = uint64(l)
} }
func getMsghdrControllen(hdr *msghdr) int {
return int(hdr.Controllen)
}

View File

@@ -336,6 +336,10 @@ func (u *RIOConn) SupportsGSO() bool {
return false return false
} }
func (u *RIOConn) SupportsGRO() bool {
return false
}
func (u *RIOConn) Rebind() error { func (u *RIOConn) Rebind() error {
return nil return nil
} }

View File

@@ -135,6 +135,10 @@ func (u *TesterConn) SupportsGSO() bool {
return false return false
} }
func (u *TesterConn) SupportsGRO() bool {
return false
}
func (u *TesterConn) Rebind() error { func (u *TesterConn) Rebind() error {
return nil return nil
} }