From 5128e2653e340ed07e9567258e50473714049a20 Mon Sep 17 00:00:00 2001 From: Ryan Huber Date: Mon, 3 Nov 2025 10:52:09 +0000 Subject: [PATCH] reuse packet buffer --- udp/io_uring_linux.go | 96 ++++++++++++++++++++++--------------------- 1 file changed, 50 insertions(+), 46 deletions(-) diff --git a/udp/io_uring_linux.go b/udp/io_uring_linux.go index 122ab33..25abd15 100644 --- a/udp/io_uring_linux.go +++ b/udp/io_uring_linux.go @@ -177,6 +177,7 @@ type recvBuffer struct { iovec *unix.Iovec // IO vector pointing to payloadBuf userData uint64 // User data for tracking this operation inFlight atomic.Bool // Whether this buffer has a pending io_uring operation + inUse atomic.Bool // Buffer handed to caller; wait for release before reuse } // ioUringRecvState manages a dedicated io_uring for receiving packets @@ -1006,20 +1007,6 @@ type RecvPacket struct { RecycleFunc func() } -var recvPacketDataPool = sync.Pool{ - New: func() interface{} { - b := make([]byte, 65536) // Max UDP packet size - return &b - }, -} - -var recvControlDataPool = sync.Pool{ - New: func() interface{} { - b := make([]byte, 256) // Max control message size - return &b - }, -} - // newIoUringRecvState creates a dedicated io_uring for receiving packets // poolSize determines how many receive operations to keep queued func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize int) (*ioUringRecvState, error) { @@ -1247,6 +1234,41 @@ func (r *ioUringRecvState) submitRecvLocked(buf *recvBuffer) error { return nil } +func (r *ioUringRecvState) recycleBuffer(buf *recvBuffer) { + if r == nil || buf == nil { + return + } + if r.closed.Load() { + return + } + + if !buf.inUse.Swap(false) { + // Already released or never handed out + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + if r.closed.Load() { + return + } + + if buf.inFlight.Load() { + return + } + + if err := r.submitRecvLocked(buf); err != nil { + logrus.WithError(err).Warn("io_uring recv: failed to resubmit buffer") + return + } + + if err := r.submitAndWaitLocked(1, 0); err != nil { + buf.inFlight.Store(false) + logrus.WithError(err).Warn("io_uring recv: submit failed during recycle") + } +} + // submitAndWaitLocked submits pending SQEs and optionally waits for completions func (r *ioUringRecvState) submitAndWaitLocked(submit, wait uint32) error { var flags uintptr @@ -1283,7 +1305,7 @@ func (r *ioUringRecvState) fillRecvQueue() error { submitted := uint32(0) for _, buf := range r.bufferPool { - if !buf.inFlight.Load() { + if !buf.inFlight.Load() && !buf.inUse.Load() { if err := r.submitRecvLocked(buf); err != nil { if submitted > 0 { break // Queue full, submit what we have @@ -1386,47 +1408,29 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { // Successfully received packet n := int(res) - // Copy address + // Copy address into standalone struct var from unix.RawSockaddrInet6 if buf.msghdr.Namelen > 0 && buf.msghdr.Namelen <= uint32(len(buf.nameBuf)) { copy((*(*[unix.SizeofSockaddrInet6]byte)(unsafe.Pointer(&from)))[:], buf.nameBuf[:buf.msghdr.Namelen]) } - // Get buffer from pool and copy data - dataBufPtr := recvPacketDataPool.Get().(*[]byte) - dataBuf := *dataBufPtr - if cap(dataBuf) < n { - // Buffer too small, allocate new one - dataBuf = make([]byte, n) - } else { - dataBuf = dataBuf[:n] - } - copy(dataBuf, buf.payloadBuf[:n]) - - // Copy control messages if present - var controlBuf []byte - var controlBufPtr *[]byte controllen := int(buf.msghdr.Controllen) + var controlSlice []byte if controllen > 0 && controllen <= len(buf.controlBuf) { - controlBufPtr = recvControlDataPool.Get().(*[]byte) - controlBuf = (*controlBufPtr)[:controllen] - copy(controlBuf, buf.controlBuf[:controllen]) + controlSlice = buf.controlBuf[:controllen] } + buf.inUse.Store(true) + bufferRef := buf + packets = append(packets, RecvPacket{ - Data: dataBuf, - N: n, - From: &from, - Flags: flags, - Control: controlBuf, - Controllen: controllen, - RecycleFunc: func() { - // Return buffers to pool - recvPacketDataPool.Put(dataBufPtr) - if controlBufPtr != nil { - recvControlDataPool.Put(controlBufPtr) - } - }, + Data: buf.payloadBuf[:n], + N: n, + From: &from, + Flags: flags, + Control: controlSlice, + Controllen: controllen, + RecycleFunc: func() { r.recycleBuffer(bufferRef) }, }) }