From 0d8bd118188e8d0256b50ec563b64692cc34f257 Mon Sep 17 00:00:00 2001 From: Ryan Huber Date: Mon, 3 Nov 2025 11:06:07 +0000 Subject: [PATCH] reuse GRO slices --- udp/udp_linux.go | 67 +++++++++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/udp/udp_linux.go b/udp/udp_linux.go index 0e56efc..58791e7 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -52,10 +52,9 @@ type StdConn struct { gsoMaxBytes int gsoFlushTimeout time.Duration - groSegmentPool sync.Pool - groBufSize atomic.Int64 - rxBufferPool chan []byte - gsoBufferPool sync.Pool + groBufSize atomic.Int64 + rxBufferPool chan []byte + gsoBufferPool sync.Pool gsoBatches metrics.Counter gsoSegments metrics.Counter @@ -276,9 +275,6 @@ func (u *StdConn) setGroBufferSize(size int) { size = defaultGROReadBufferSize } u.groBufSize.Store(int64(size)) - u.groSegmentPool = sync.Pool{New: func() any { - return make([]byte, size) - }} if u.rxBufferPool == nil { poolSize := u.batch * 4 if poolSize < u.batch { @@ -2272,6 +2268,13 @@ func (u *StdConn) logIoUringResult(addr netip.AddrPort, expected, written int, e } func (u *StdConn) emitSegments(r EncReader, addr netip.AddrPort, payload []byte, segSize, segCount int, release func()) bool { + var releaseFlag atomic.Bool + releaseOnce := func() { + if release != nil && releaseFlag.CompareAndSwap(false, true) { + release() + } + } + if segSize <= 0 || segSize >= len(payload) { u.l.WithFields(logrus.Fields{ "tag": "gro-debug", @@ -2281,6 +2284,7 @@ func (u *StdConn) emitSegments(r EncReader, addr netip.AddrPort, payload []byte, "seg_size": segSize, "seg_count": segCount, }).Debug("gro-debug skip emit") + releaseOnce() return false } @@ -2297,42 +2301,27 @@ func (u *StdConn) emitSegments(r EncReader, addr netip.AddrPort, payload []byte, "seg_size": segSize, "seg_count": segCount, }).Debug("gro-debug skip emit") + releaseOnce() return false } - defer func() { - if release != nil { - release() - } - }() - - actualSegments := 0 - start := 0 + starts := make([]int, 0, segCount) + lens := make([]int, 0, segCount) debugEnabled := u.l.IsLevelEnabled(logrus.DebugLevel) var firstHeader header.H var firstParsed bool var firstCounter uint64 var firstRemote uint32 + actualSegments := 0 + start := 0 for start < totalLen && actualSegments < segCount { end := start + segSize if end > totalLen { end = totalLen } - segLen := end - start - bufAny := u.groSegmentPool.Get() - var segBuf []byte - if bufAny == nil { - segBuf = make([]byte, segLen) - } else { - segBuf = bufAny.([]byte) - if cap(segBuf) < segLen { - segBuf = make([]byte, segLen) - } - } - segment := segBuf[:segLen] - copy(segment, payload[start:end]) + segment := payload[start:end] if debugEnabled && !firstParsed { if err := firstHeader.Parse(segment); err == nil { @@ -2353,11 +2342,10 @@ func (u *StdConn) emitSegments(r EncReader, addr netip.AddrPort, payload []byte, } } + starts = append(starts, start) + lens = append(lens, segLen) start = end actualSegments++ - r(addr, segment, func() { - u.groSegmentPool.Put(segBuf[:cap(segBuf)]) - }) if debugEnabled && actualSegments == segCount && segLen < segSize { var tail header.H @@ -2372,7 +2360,22 @@ func (u *StdConn) emitSegments(r EncReader, addr netip.AddrPort, payload []byte, }).Debug("gro-debug tail segment metadata") } } + } + if actualSegments == 0 { + releaseOnce() + return false + } + + var remaining int32 = int32(actualSegments) + for i := range starts { + segment := payload[starts[i] : starts[i]+lens[i]] + segmentRelease := func() { + if atomic.AddInt32(&remaining, -1) == 0 { + releaseOnce() + } + } + r(addr, segment, segmentRelease) } if u.groBatches != nil { @@ -2384,7 +2387,7 @@ func (u *StdConn) emitSegments(r EncReader, addr netip.AddrPort, payload []byte, u.groBatchTick.Add(1) u.groSegmentsTick.Add(int64(actualSegments)) - if debugEnabled && actualSegments > 0 { + if debugEnabled { lastLen := segSize if tail := totalLen % segSize; tail != 0 { lastLen = tail