reuse packet buffer

This commit is contained in:
Ryan Huber
2025-11-03 10:52:09 +00:00
parent c73b2dfbc7
commit 5128e2653e

View File

@@ -177,6 +177,7 @@ type recvBuffer struct {
iovec *unix.Iovec // IO vector pointing to payloadBuf iovec *unix.Iovec // IO vector pointing to payloadBuf
userData uint64 // User data for tracking this operation userData uint64 // User data for tracking this operation
inFlight atomic.Bool // Whether this buffer has a pending io_uring operation inFlight atomic.Bool // Whether this buffer has a pending io_uring operation
inUse atomic.Bool // Buffer handed to caller; wait for release before reuse
} }
// ioUringRecvState manages a dedicated io_uring for receiving packets // ioUringRecvState manages a dedicated io_uring for receiving packets
@@ -1006,20 +1007,6 @@ type RecvPacket struct {
RecycleFunc func() RecycleFunc func()
} }
var recvPacketDataPool = sync.Pool{
New: func() interface{} {
b := make([]byte, 65536) // Max UDP packet size
return &b
},
}
var recvControlDataPool = sync.Pool{
New: func() interface{} {
b := make([]byte, 256) // Max control message size
return &b
},
}
// newIoUringRecvState creates a dedicated io_uring for receiving packets // newIoUringRecvState creates a dedicated io_uring for receiving packets
// poolSize determines how many receive operations to keep queued // poolSize determines how many receive operations to keep queued
func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize int) (*ioUringRecvState, error) { func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize int) (*ioUringRecvState, error) {
@@ -1247,6 +1234,41 @@ func (r *ioUringRecvState) submitRecvLocked(buf *recvBuffer) error {
return nil return nil
} }
func (r *ioUringRecvState) recycleBuffer(buf *recvBuffer) {
if r == nil || buf == nil {
return
}
if r.closed.Load() {
return
}
if !buf.inUse.Swap(false) {
// Already released or never handed out
return
}
r.mu.Lock()
defer r.mu.Unlock()
if r.closed.Load() {
return
}
if buf.inFlight.Load() {
return
}
if err := r.submitRecvLocked(buf); err != nil {
logrus.WithError(err).Warn("io_uring recv: failed to resubmit buffer")
return
}
if err := r.submitAndWaitLocked(1, 0); err != nil {
buf.inFlight.Store(false)
logrus.WithError(err).Warn("io_uring recv: submit failed during recycle")
}
}
// submitAndWaitLocked submits pending SQEs and optionally waits for completions // submitAndWaitLocked submits pending SQEs and optionally waits for completions
func (r *ioUringRecvState) submitAndWaitLocked(submit, wait uint32) error { func (r *ioUringRecvState) submitAndWaitLocked(submit, wait uint32) error {
var flags uintptr var flags uintptr
@@ -1283,7 +1305,7 @@ func (r *ioUringRecvState) fillRecvQueue() error {
submitted := uint32(0) submitted := uint32(0)
for _, buf := range r.bufferPool { for _, buf := range r.bufferPool {
if !buf.inFlight.Load() { if !buf.inFlight.Load() && !buf.inUse.Load() {
if err := r.submitRecvLocked(buf); err != nil { if err := r.submitRecvLocked(buf); err != nil {
if submitted > 0 { if submitted > 0 {
break // Queue full, submit what we have break // Queue full, submit what we have
@@ -1386,47 +1408,29 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) {
// Successfully received packet // Successfully received packet
n := int(res) n := int(res)
// Copy address // Copy address into standalone struct
var from unix.RawSockaddrInet6 var from unix.RawSockaddrInet6
if buf.msghdr.Namelen > 0 && buf.msghdr.Namelen <= uint32(len(buf.nameBuf)) { if buf.msghdr.Namelen > 0 && buf.msghdr.Namelen <= uint32(len(buf.nameBuf)) {
copy((*(*[unix.SizeofSockaddrInet6]byte)(unsafe.Pointer(&from)))[:], buf.nameBuf[:buf.msghdr.Namelen]) copy((*(*[unix.SizeofSockaddrInet6]byte)(unsafe.Pointer(&from)))[:], buf.nameBuf[:buf.msghdr.Namelen])
} }
// Get buffer from pool and copy data
dataBufPtr := recvPacketDataPool.Get().(*[]byte)
dataBuf := *dataBufPtr
if cap(dataBuf) < n {
// Buffer too small, allocate new one
dataBuf = make([]byte, n)
} else {
dataBuf = dataBuf[:n]
}
copy(dataBuf, buf.payloadBuf[:n])
// Copy control messages if present
var controlBuf []byte
var controlBufPtr *[]byte
controllen := int(buf.msghdr.Controllen) controllen := int(buf.msghdr.Controllen)
var controlSlice []byte
if controllen > 0 && controllen <= len(buf.controlBuf) { if controllen > 0 && controllen <= len(buf.controlBuf) {
controlBufPtr = recvControlDataPool.Get().(*[]byte) controlSlice = buf.controlBuf[:controllen]
controlBuf = (*controlBufPtr)[:controllen]
copy(controlBuf, buf.controlBuf[:controllen])
} }
buf.inUse.Store(true)
bufferRef := buf
packets = append(packets, RecvPacket{ packets = append(packets, RecvPacket{
Data: dataBuf, Data: buf.payloadBuf[:n],
N: n, N: n,
From: &from, From: &from,
Flags: flags, Flags: flags,
Control: controlBuf, Control: controlSlice,
Controllen: controllen, Controllen: controllen,
RecycleFunc: func() { RecycleFunc: func() { r.recycleBuffer(bufferRef) },
// Return buffers to pool
recvPacketDataPool.Put(dataBufPtr)
if controlBufPtr != nil {
recvControlDataPool.Put(controlBufPtr)
}
},
}) })
} }