This commit is contained in:
Ryan Huber
2025-11-03 17:23:57 +00:00
parent 1c069a8e42
commit a4b7f624da
2 changed files with 457 additions and 196 deletions

View File

@@ -21,10 +21,13 @@ const (
ioringOpSendmsg = 9 ioringOpSendmsg = 9
ioringOpRecvmsg = 10 ioringOpRecvmsg = 10
ioringEnterGetevents = 1 << 0 ioringEnterGetevents = 1 << 0
ioringEnterSqWakeup = 1 << 1 // Wake up SQPOLL thread
ioringSetupSqpoll = 1 << 1 // Kernel polls SQ - eliminates syscalls!
ioringSetupClamp = 1 << 4 ioringSetupClamp = 1 << 4
ioringSetupCoopTaskrun = 1 << 8 // Kernel 5.19+: reduce thread creation ioringSetupCoopTaskrun = 1 << 8 // Kernel 5.19+: reduce thread creation
ioringSetupSingleIssuer = 1 << 12 // Kernel 6.0+: single submitter optimization ioringSetupSingleIssuer = 1 << 12 // Kernel 6.0+: single submitter optimization
ioringRegisterIowqMaxWorkers = 19 // Register opcode to limit workers ioringRegisterIowqMaxWorkers = 19 // Register opcode to limit workers
ioringSqNeedWakeup = 1 << 0 // Flag in sq_flags indicating SQPOLL thread needs wakeup
ioringOffSqRing = 0 ioringOffSqRing = 0
ioringOffCqRing = 0x8000000 ioringOffCqRing = 0x8000000
ioringOffSqes = 0x10000000 ioringOffSqes = 0x10000000
@@ -37,7 +40,7 @@ type ioSqringOffsets struct {
Tail uint32 Tail uint32
RingMask uint32 RingMask uint32
RingEntries uint32 RingEntries uint32
Flags uint32 Flags uint32 // Offset to SQ flags (includes SQ_NEED_WAKEUP for SQPOLL)
Dropped uint32 Dropped uint32
Array uint32 Array uint32
Resv1 uint32 Resv1 uint32
@@ -113,6 +116,15 @@ type pendingSend struct {
userData uint64 userData uint64
} }
// Pre-allocated buffer for a single send operation (eliminates allocations)
type sendBuffer struct {
msghdr unix.Msghdr
iovec unix.Iovec
sockaddr [unix.SizeofSockaddrInet6]byte // Max sockaddr size (28 bytes)
control [256]byte // Max control msg size
inUse atomic.Bool // Track if buffer is in flight
}
type pendingRecv struct { type pendingRecv struct {
msgCopy *unix.Msghdr msgCopy *unix.Msghdr
iovCopy *unix.Iovec iovCopy *unix.Iovec
@@ -150,6 +162,7 @@ type ioUringState struct {
sqTail *uint32 sqTail *uint32
sqRingMask *uint32 sqRingMask *uint32
sqRingEntries *uint32 sqRingEntries *uint32
sqFlags *uint32 // For SQPOLL: tells us if kernel thread needs wakeup
sqArray []uint32 sqArray []uint32
cqHead *uint32 cqHead *uint32
@@ -166,6 +179,12 @@ type ioUringState struct {
pendingReceives map[uint64]*pendingRecv pendingReceives map[uint64]*pendingRecv
completedCqes map[uint64]*ioUringCqe completedCqes map[uint64]*ioUringCqe
sqpollEnabled bool // Whether SQPOLL mode is active
// Pre-allocated buffer pool (zero-allocation hot path!)
sendBuffers []*sendBuffer
bufferMap map[uint64]*sendBuffer // userData -> buffer for cleanup
} }
// recvBuffer represents a single receive operation with its associated buffers // recvBuffer represents a single receive operation with its associated buffers
@@ -178,10 +197,15 @@ type recvBuffer struct {
userData uint64 // User data for tracking this operation userData uint64 // User data for tracking this operation
inFlight atomic.Bool // Whether this buffer has a pending io_uring operation inFlight atomic.Bool // Whether this buffer has a pending io_uring operation
inUse atomic.Bool // Buffer handed to caller; wait for release before reuse inUse atomic.Bool // Buffer handed to caller; wait for release before reuse
recycleFn func() // Pre-bound recycle function to avoid per-packet allocations
} }
// ioUringRecvState manages a dedicated io_uring for receiving packets // ioUringRecvState manages a dedicated io_uring for receiving packets
// It maintains a pool of receive buffers and continuously keeps receives queued // Architecture (similar to send-side but adapted for receive):
// - Maintains a pool of receive buffers (like send-side buffer pool)
// - Keeps receives continuously queued in the SQ (like send-side batching)
// - Drains completions from CQ in batches (like send-side completion handling)
// - Recycles buffers immediately back to the ring (like send-side buffer release)
type ioUringRecvState struct { type ioUringRecvState struct {
fd int fd int
sqRing []byte sqRing []byte
@@ -194,6 +218,7 @@ type ioUringRecvState struct {
sqTail *uint32 sqTail *uint32
sqRingMask *uint32 sqRingMask *uint32
sqRingEntries *uint32 sqRingEntries *uint32
sqFlags *uint32 // For SQPOLL: tells us if kernel thread needs wakeup
sqArray []uint32 sqArray []uint32
cqHead *uint32 cqHead *uint32
@@ -211,6 +236,7 @@ type ioUringRecvState struct {
sockFd int // Socket file descriptor to receive from sockFd int // Socket file descriptor to receive from
closed atomic.Bool closed atomic.Bool
sqpollEnabled bool // Whether SQPOLL mode is active
} }
func alignUint32(v, alignment uint32) uint32 { func alignUint32(v, alignment uint32) uint32 {
@@ -237,11 +263,14 @@ func newIoUringState(entries uint32) (*ioUringState, error) {
tries := entries tries := entries
var params ioUringParams var params ioUringParams
// Try flag combinations in order (5.19+ -> baseline) // Try flag combinations in order (best -> baseline)
// SQPOLL eliminates io_uring_enter syscalls (kernel polls SQ)
// Note: SINGLE_ISSUER causes EEXIST errors, so it's excluded // Note: SINGLE_ISSUER causes EEXIST errors, so it's excluded
flagSets := []uint32{ flagSets := []uint32{
ioringSetupClamp | ioringSetupCoopTaskrun, // Kernel 5.19+: reduce thread creation ioringSetupClamp | ioringSetupCoopTaskrun | ioringSetupSqpoll, // Best: SQPOLL + coop
ioringSetupClamp, // All kernels ioringSetupClamp | ioringSetupSqpoll, // Good: SQPOLL
ioringSetupClamp | ioringSetupCoopTaskrun, // Kernel 5.19+
ioringSetupClamp, // Baseline
} }
flagSetIdx := 0 flagSetIdx := 0
@@ -264,6 +293,9 @@ func newIoUringState(entries uint32) (*ioUringState, error) {
return nil, errno return nil, errno
} }
// Check if SQPOLL was actually enabled
sqpollEnabled := params.Flags&ioringSetupSqpoll != 0
ring := &ioUringState{ ring := &ioUringState{
fd: int(fd), fd: int(fd),
sqEntryCount: params.SqEntries, sqEntryCount: params.SqEntries,
@@ -272,6 +304,24 @@ func newIoUringState(entries uint32) (*ioUringState, error) {
pendingSends: make(map[uint64]*pendingSend), pendingSends: make(map[uint64]*pendingSend),
pendingReceives: make(map[uint64]*pendingRecv), pendingReceives: make(map[uint64]*pendingRecv),
completedCqes: make(map[uint64]*ioUringCqe), completedCqes: make(map[uint64]*ioUringCqe),
sqpollEnabled: sqpollEnabled,
bufferMap: make(map[uint64]*sendBuffer),
}
// Pre-allocate buffer pool (size = SQ entries for maximum parallelism)
ring.sendBuffers = make([]*sendBuffer, params.SqEntries)
for i := range ring.sendBuffers {
ring.sendBuffers[i] = &sendBuffer{}
}
// Log which mode we got
if sqpollEnabled {
logrus.WithFields(logrus.Fields{
"sq_entries": params.SqEntries,
"sq_thread_idle": params.SqThreadIdle,
}).Info("io_uring send: SQPOLL enabled with idle timeout")
} else {
logrus.WithField("sq_entries", params.SqEntries).Info("io_uring send: standard mode")
} }
if err := ring.mapRings(&params); err != nil { if err := ring.mapRings(&params); err != nil {
@@ -339,6 +389,7 @@ func (r *ioUringState) mapRings(params *ioUringParams) error {
r.sqTail = (*uint32)(unsafe.Pointer(uintptr(sqBase) + uintptr(params.SqOff.Tail))) r.sqTail = (*uint32)(unsafe.Pointer(uintptr(sqBase) + uintptr(params.SqOff.Tail)))
r.sqRingMask = (*uint32)(unsafe.Pointer(uintptr(sqBase) + uintptr(params.SqOff.RingMask))) r.sqRingMask = (*uint32)(unsafe.Pointer(uintptr(sqBase) + uintptr(params.SqOff.RingMask)))
r.sqRingEntries = (*uint32)(unsafe.Pointer(uintptr(sqBase) + uintptr(params.SqOff.RingEntries))) r.sqRingEntries = (*uint32)(unsafe.Pointer(uintptr(sqBase) + uintptr(params.SqOff.RingEntries)))
r.sqFlags = (*uint32)(unsafe.Pointer(uintptr(sqBase) + uintptr(params.SqOff.Flags)))
arrayPtr := unsafe.Pointer(uintptr(sqBase) + uintptr(params.SqOff.Array)) arrayPtr := unsafe.Pointer(uintptr(sqBase) + uintptr(params.SqOff.Array))
r.sqArray = unsafe.Slice((*uint32)(arrayPtr), int(params.SqEntries)) r.sqArray = unsafe.Slice((*uint32)(arrayPtr), int(params.SqEntries))
@@ -388,7 +439,9 @@ func (r *ioUringState) getSqeLocked() (*ioUringSqe, error) {
sqe := &r.sqes[idx] sqe := &r.sqes[idx]
*sqe = ioUringSqe{} *sqe = ioUringSqe{}
r.sqArray[idx] = idx r.sqArray[idx] = idx
atomic.StoreUint32(r.sqTail, tail+1) // NOTE: Do NOT update tail here! With SQPOLL, kernel would read the SQE
// immediately before we fill it. Tail is updated in commitSqeLocked() after
// the SQE is fully populated.
if iterations > 0 { if iterations > 0 {
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"iterations": iterations, "iterations": iterations,
@@ -414,6 +467,50 @@ func (r *ioUringState) getSqeLocked() (*ioUringSqe, error) {
} }
func (r *ioUringState) submitAndWaitLocked(submit, wait uint32) error { func (r *ioUringState) submitAndWaitLocked(submit, wait uint32) error {
// With SQPOLL, kernel polls the SQ automatically.
// We only need to call io_uring_enter if we want to wait for completions
// OR if the SQPOLL thread has gone to sleep and needs a wakeup.
if r.sqpollEnabled {
// Check if SQPOLL thread needs wakeup
sqFlags := atomic.LoadUint32(r.sqFlags)
needsWakeup := sqFlags&ioringSqNeedWakeup != 0
if logrus.IsLevelEnabled(logrus.TraceLevel) {
logrus.WithFields(logrus.Fields{
"submit": submit,
"wait": wait,
"needs_wakeup": needsWakeup,
"sq_flags": fmt.Sprintf("0x%x", sqFlags),
}).Trace("io_uring SQPOLL submit")
}
if wait > 0 || needsWakeup {
// Need to enter kernel to either wait for completions or wake SQPOLL thread
var flags uintptr
if wait > 0 {
flags = ioringEnterGetevents
}
if needsWakeup {
flags |= ioringEnterSqWakeup
}
for {
_, _, errno := unix.Syscall6(unix.SYS_IO_URING_ENTER, uintptr(r.fd), 0, uintptr(wait), flags, 0, 0)
if errno == 0 {
return nil
}
if errno == unix.EINTR {
continue
}
logrus.WithError(errno).Error("io_uring SQPOLL enter failed")
return errno
}
}
// SQPOLL thread is running, no need to enter kernel
return nil
}
// Standard mode: we must call io_uring_enter to submit
var flags uintptr var flags uintptr
if wait > 0 { if wait > 0 {
flags = ioringEnterGetevents flags = ioringEnterGetevents
@@ -440,63 +537,77 @@ func (r *ioUringState) enqueueSendmsgLocked(fd int, msg *unix.Msghdr, msgFlags u
userData := r.userData userData := r.userData
r.userData++ r.userData++
msgCopy := new(unix.Msghdr) // Find available pre-allocated buffer (zero-allocation!)
*msgCopy = *msg var buf *sendBuffer
for _, b := range r.sendBuffers {
if b.inUse.CompareAndSwap(false, true) {
buf = b
break
}
}
if buf == nil {
return 0, fmt.Errorf("no available send buffers (all %d in flight)", len(r.sendBuffers))
}
var iovCopy *unix.Iovec // Copy struct data into pre-allocated buffer (no heap allocation!)
buf.msghdr = *msg
var payloadRef unsafe.Pointer var payloadRef unsafe.Pointer
if msg.Iov != nil { if msg.Iov != nil {
iovCopy = new(unix.Iovec) buf.iovec = *msg.Iov
*iovCopy = *msg.Iov buf.msghdr.Iov = &buf.iovec
msgCopy.Iov = iovCopy if buf.iovec.Base != nil {
if iovCopy.Base != nil { payloadRef = unsafe.Pointer(buf.iovec.Base)
payloadRef = unsafe.Pointer(iovCopy.Base)
} }
} }
var sockaddrCopy []byte
if msg.Name != nil && msg.Namelen > 0 { if msg.Name != nil && msg.Namelen > 0 {
sockaddrCopy = make([]byte, msg.Namelen) copy(buf.sockaddr[:], (*[unix.SizeofSockaddrInet6]byte)(unsafe.Pointer(msg.Name))[:msg.Namelen])
copy(sockaddrCopy, (*[256]byte)(unsafe.Pointer(msg.Name))[:msg.Namelen]) buf.msghdr.Name = &buf.sockaddr[0]
msgCopy.Name = &sockaddrCopy[0]
} }
var controlCopy []byte
if msg.Control != nil && msg.Controllen > 0 { if msg.Control != nil && msg.Controllen > 0 {
controlCopy = make([]byte, msg.Controllen) copy(buf.control[:], (*[256]byte)(unsafe.Pointer(msg.Control))[:msg.Controllen])
copy(controlCopy, (*[256]byte)(unsafe.Pointer(msg.Control))[:msg.Controllen]) buf.msghdr.Control = &buf.control[0]
msgCopy.Control = &controlCopy[0]
} }
// Track buffer for cleanup
r.bufferMap[userData] = buf
// Legacy pendingSends for compatibility (TODO: remove after testing)
pending := &pendingSend{ pending := &pendingSend{
msgCopy: msgCopy, msgCopy: &buf.msghdr,
iovCopy: iovCopy, iovCopy: &buf.iovec,
sockaddrCopy: sockaddrCopy, payloadRef: payloadRef,
controlCopy: controlCopy, userData: userData,
payloadRef: payloadRef,
userData: userData,
} }
r.pendingSends[userData] = pending r.pendingSends[userData] = pending
sqe.Opcode = ioringOpSendmsg sqe.Opcode = ioringOpSendmsg
sqe.Fd = int32(fd) sqe.Fd = int32(fd)
sqe.Addr = uint64(uintptr(unsafe.Pointer(msgCopy))) sqe.Addr = uint64(uintptr(unsafe.Pointer(&buf.msghdr)))
sqe.Len = 0 sqe.Len = 0
sqe.MsgFlags = msgFlags sqe.MsgFlags = msgFlags
sqe.Flags = 0 sqe.Flags = 0
userDataPtr := (*uint64)(unsafe.Pointer(&sqe.UserData)) userDataPtr := (*uint64)(unsafe.Pointer(&sqe.UserData))
atomic.StoreUint64(userDataPtr, userData) atomic.StoreUint64(userDataPtr, userData)
_ = atomic.LoadUint64(userDataPtr)
runtime.KeepAlive(&buf.msghdr)
runtime.KeepAlive(msgCopy)
runtime.KeepAlive(sqe) runtime.KeepAlive(sqe)
runtime.KeepAlive(buf)
if payloadRef != nil { if payloadRef != nil {
runtime.KeepAlive(payloadRef) runtime.KeepAlive(payloadRef)
} }
_ = atomic.LoadUint32(r.sqTail)
atomic.StoreUint32(r.sqTail, atomic.LoadUint32(r.sqTail)) // CRITICAL: Memory barrier + tail update MUST happen after SQE is fully populated
// With SQPOLL, kernel reads SQE as soon as tail advances
runtime.KeepAlive(r.sqes)
// Now that SQE is complete, advance tail pointer so kernel can process it
oldTail := atomic.LoadUint32(r.sqTail)
atomic.StoreUint32(r.sqTail, oldTail+1)
return userData, nil return userData, nil
} }
@@ -708,26 +819,88 @@ func (r *ioUringState) SendmsgBatch(entries []ioUringBatchEntry) error {
return nil return nil
} }
if err := r.submitAndWaitLocked(submit, submit); err != nil { if logrus.IsLevelEnabled(logrus.TraceLevel) {
logrus.WithFields(logrus.Fields{
"submit": submit,
"sqpoll": r.sqpollEnabled,
"tail": atomic.LoadUint32(r.sqTail),
"head": atomic.LoadUint32(r.sqHead),
}).Trace("io_uring SendmsgBatch about to submit")
}
// Harvest old completions to prevent CQ overflow
// With SQPOLL, kernel fills CQ async, so harvest regularly
harvested := r.harvestCompletionsLocked(0) // Harvest all available
if logrus.IsLevelEnabled(logrus.TraceLevel) && harvested > 0 {
logrus.WithField("harvested", harvested).Trace("io_uring harvested completions")
}
// Submit to kernel (with SQPOLL, this just updates tail)
if err := r.submitAndWaitLocked(submit, 0); err != nil {
logrus.WithError(err).WithField("submit", submit).Error("io_uring submit failed")
for i := 0; i < prepared; i++ { for i := 0; i < prepared; i++ {
r.abortPendingSendLocked(entries[i].userData) r.abortPendingSendLocked(entries[i].userData)
} }
return err return err
} }
// CRITICAL: With SQPOLL, DO NOT WAIT for completions!
// UDP sends complete instantly (just copy to kernel buffer)
// Waiting here blocks the send path and kills bidirectional throughput
//
// Instead: assume success and let completions arrive async
// Completions will be harvested on next batch (prevents CQ overflow)
// Return optimistic success for UDP sends
// Real errors (ENOMEM, etc) are rare and will be caught on next batch
for i := range entries { for i := range entries {
entry := &entries[i] entry := &entries[i]
res, flags, err := r.completeSendLocked(entry.userData)
if entry.result != nil { if entry.result != nil {
entry.result.res = res entry.result.res = 0 // Will be updated when completion arrives
entry.result.flags = flags entry.result.flags = 0
entry.result.err = err entry.result.err = nil // Assume success
} }
} }
return nil return nil
} }
// harvestCompletionsLocked reaps available completions without waiting
// This cleans up old pendingSends and prevents CQ from filling up
func (r *ioUringState) harvestCompletionsLocked(maxHarvest uint32) int {
harvested := 0
for {
if maxHarvest > 0 && uint32(harvested) >= maxHarvest {
break
}
cqe, err := r.popCqeLocked()
if err != nil {
break // No more completions available
}
userData := cqe.UserData
// Release pre-allocated buffer back to pool
if buf, ok := r.bufferMap[userData]; ok {
buf.inUse.Store(false) // Buffer available for reuse!
delete(r.bufferMap, userData)
}
// Clean up pendingSend
if pending, ok := r.pendingSends[userData]; ok {
delete(r.pendingSends, userData)
runtime.KeepAlive(pending)
}
// Store CQE for later retrieval (if someone is waiting for it)
r.completedCqes[userData] = cqe
harvested++
}
return harvested
}
func (r *ioUringState) popCqeLocked() (*ioUringCqe, error) { func (r *ioUringState) popCqeLocked() (*ioUringCqe, error) {
for { for {
// According to io_uring ABI specification: // According to io_uring ABI specification:
@@ -1032,11 +1205,14 @@ func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize in
tries := entries tries := entries
var params ioUringParams var params ioUringParams
// Try flag combinations in order (5.19+ -> baseline) // Try flag combinations in order (best -> baseline)
// SQPOLL eliminates io_uring_enter syscalls (kernel polls SQ)
// Note: SINGLE_ISSUER causes EEXIST errors, so it's excluded // Note: SINGLE_ISSUER causes EEXIST errors, so it's excluded
flagSets := []uint32{ flagSets := []uint32{
ioringSetupClamp | ioringSetupCoopTaskrun, // Kernel 5.19+: reduce thread creation ioringSetupClamp | ioringSetupCoopTaskrun | ioringSetupSqpoll, // Best: SQPOLL + coop
ioringSetupClamp, // All kernels ioringSetupClamp | ioringSetupSqpoll, // Good: SQPOLL
ioringSetupClamp | ioringSetupCoopTaskrun, // Kernel 5.19+
ioringSetupClamp, // Baseline
} }
flagSetIdx := 0 flagSetIdx := 0
@@ -1059,13 +1235,24 @@ func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize in
return nil, errno return nil, errno
} }
// Check if SQPOLL was actually enabled
sqpollEnabled := params.Flags&ioringSetupSqpoll != 0
ring := &ioUringRecvState{ ring := &ioUringRecvState{
fd: int(fd), fd: int(fd),
sqEntryCount: params.SqEntries, sqEntryCount: params.SqEntries,
cqEntryCount: params.CqEntries, cqEntryCount: params.CqEntries,
userData: 1, userData: 1,
bufferMap: make(map[uint64]*recvBuffer), bufferMap: make(map[uint64]*recvBuffer),
sockFd: sockFd, sockFd: sockFd,
sqpollEnabled: sqpollEnabled,
}
// Log which mode we got
if sqpollEnabled {
logrus.WithField("sq_entries", params.SqEntries).Info("io_uring recv: SQPOLL enabled (zero-syscall mode)")
} else {
logrus.WithField("sq_entries", params.SqEntries).Info("io_uring recv: standard mode")
} }
if err := ring.mapRings(&params); err != nil { if err := ring.mapRings(&params); err != nil {
@@ -1107,6 +1294,8 @@ func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize in
ring.bufferPool[i] = buf ring.bufferPool[i] = buf
ring.bufferMap[buf.userData] = buf ring.bufferMap[buf.userData] = buf
localBuf := buf
buf.recycleFn = func() { ring.recycleBuffer(localBuf) }
} }
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
@@ -1174,6 +1363,7 @@ func (r *ioUringRecvState) mapRings(params *ioUringParams) error {
r.sqTail = (*uint32)(unsafe.Pointer(&r.sqRing[params.SqOff.Tail])) r.sqTail = (*uint32)(unsafe.Pointer(&r.sqRing[params.SqOff.Tail]))
r.sqRingMask = (*uint32)(unsafe.Pointer(&r.sqRing[params.SqOff.RingMask])) r.sqRingMask = (*uint32)(unsafe.Pointer(&r.sqRing[params.SqOff.RingMask]))
r.sqRingEntries = (*uint32)(unsafe.Pointer(&r.sqRing[params.SqOff.RingEntries])) r.sqRingEntries = (*uint32)(unsafe.Pointer(&r.sqRing[params.SqOff.RingEntries]))
r.sqFlags = (*uint32)(unsafe.Pointer(&r.sqRing[params.SqOff.Flags]))
// Set up SQ array // Set up SQ array
arrayBase := unsafe.Pointer(&r.sqRing[params.SqOff.Array]) arrayBase := unsafe.Pointer(&r.sqRing[params.SqOff.Array])
@@ -1227,50 +1417,102 @@ func (r *ioUringRecvState) submitRecvLocked(buf *recvBuffer) error {
sqe.UserData = buf.userData sqe.UserData = buf.userData
r.sqArray[idx] = uint32(idx) r.sqArray[idx] = uint32(idx)
atomic.StoreUint32(r.sqTail, tail+1)
// Mark buffer as in flight
buf.inFlight.Store(true) buf.inFlight.Store(true)
// CRITICAL: Memory barrier to ensure all SQE writes are visible before tail update
// With SQPOLL, kernel reads SQE as soon as tail advances
runtime.KeepAlive(sqe)
runtime.KeepAlive(buf)
runtime.KeepAlive(r.sqes)
// Now that SQE is complete, advance tail pointer so kernel can process it
atomic.StoreUint32(r.sqTail, tail+1)
return nil return nil
} }
// recycleBuffer returns a buffer to the receive ring, similar to send-side releaseGSOBuf
// This is called when the application is done with a received packet
func (r *ioUringRecvState) recycleBuffer(buf *recvBuffer) { func (r *ioUringRecvState) recycleBuffer(buf *recvBuffer) {
if r == nil || buf == nil { if r == nil || buf == nil {
return return
} }
if r.closed.Load() {
// Quick atomic check - if not in use, nothing to do
if !buf.inUse.Swap(false) {
return return
} }
if !buf.inUse.Swap(false) { // Fast path: if ring is closed, just drop the buffer
// Already released or never handed out if r.closed.Load() {
return return
} }
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
// Double-check after acquiring lock
if r.closed.Load() { if r.closed.Load() {
return return
} }
// If already in flight, the receivePackets loop will handle resubmission
if buf.inFlight.Load() { if buf.inFlight.Load() {
return return
} }
// Resubmit immediately (like send-side immediately queues for next batch)
if err := r.submitRecvLocked(buf); err != nil { if err := r.submitRecvLocked(buf); err != nil {
logrus.WithError(err).Warn("io_uring recv: failed to resubmit buffer") // SQ full - buffer will be picked up on next receivePackets call
return return
} }
// Submit without waiting (fire and forget, like send-side enqueue)
if err := r.submitAndWaitLocked(1, 0); err != nil { if err := r.submitAndWaitLocked(1, 0); err != nil {
buf.inFlight.Store(false) buf.inFlight.Store(false)
logrus.WithError(err).Warn("io_uring recv: submit failed during recycle")
} }
} }
// submitAndWaitLocked submits pending SQEs and optionally waits for completions // submitAndWaitLocked submits pending SQEs and optionally waits for completions
func (r *ioUringRecvState) submitAndWaitLocked(submit, wait uint32) error { func (r *ioUringRecvState) submitAndWaitLocked(submit, wait uint32) error {
// With SQPOLL, kernel polls the SQ automatically
if r.sqpollEnabled {
// Check if SQPOLL thread needs wakeup
needsWakeup := atomic.LoadUint32(r.sqFlags)&ioringSqNeedWakeup != 0
if wait > 0 || needsWakeup {
// Need to enter kernel to either wait for completions or wake SQPOLL thread
var flags uintptr
if wait > 0 {
flags = ioringEnterGetevents
}
if needsWakeup {
flags |= ioringEnterSqWakeup
}
for {
ret, _, errno := unix.Syscall6(unix.SYS_IO_URING_ENTER, uintptr(r.fd), 0, uintptr(wait), flags, 0, 0)
if errno == 0 {
if wait > 0 && ret > 0 {
logrus.WithFields(logrus.Fields{
"completed": ret,
}).Debug("io_uring recv: operations completed")
}
return nil
}
if errno == unix.EINTR {
continue
}
return errno
}
}
// SQPOLL thread is running, no need to enter kernel
return nil
}
// Standard mode: we must call io_uring_enter to submit
var flags uintptr var flags uintptr
if wait > 0 { if wait > 0 {
flags = ioringEnterGetevents flags = ioringEnterGetevents
@@ -1295,6 +1537,8 @@ func (r *ioUringRecvState) submitAndWaitLocked(submit, wait uint32) error {
} }
// fillRecvQueue fills the submission queue with as many receives as possible // fillRecvQueue fills the submission queue with as many receives as possible
// This is similar to the send-side batch accumulation - we want to keep
// the receive ring as full as possible for maximum throughput.
func (r *ioUringRecvState) fillRecvQueue() error { func (r *ioUringRecvState) fillRecvQueue() error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
@@ -1305,10 +1549,11 @@ func (r *ioUringRecvState) fillRecvQueue() error {
submitted := uint32(0) submitted := uint32(0)
for _, buf := range r.bufferPool { for _, buf := range r.bufferPool {
// Only submit buffers that are idle (not in flight and not in use by caller)
if !buf.inFlight.Load() && !buf.inUse.Load() { if !buf.inFlight.Load() && !buf.inUse.Load() {
if err := r.submitRecvLocked(buf); err != nil { if err := r.submitRecvLocked(buf); err != nil {
if submitted > 0 { if submitted > 0 {
break // Queue full, submit what we have break // SQ full, submit what we have
} }
return err return err
} }
@@ -1316,6 +1561,7 @@ func (r *ioUringRecvState) fillRecvQueue() error {
} }
} }
// Submit all at once (batch submission like send-side)
if submitted > 0 { if submitted > 0 {
return r.submitAndWaitLocked(submitted, 0) return r.submitAndWaitLocked(submitted, 0)
} }
@@ -1324,7 +1570,10 @@ func (r *ioUringRecvState) fillRecvQueue() error {
} }
// receivePackets processes all completed receives and returns packets // receivePackets processes all completed receives and returns packets
// Returns a slice of completed packets // This is designed similar to the send-side batching approach:
// 1. Resubmit any available buffers to keep the ring full (like send-side batching)
// 2. Submit and optionally wait for completions
// 3. Drain all available completions from the CQ in one pass
func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
@@ -1333,17 +1582,19 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) {
return nil, fmt.Errorf("ring closed") return nil, fmt.Errorf("ring closed")
} }
// First submit any pending (to ensure we always have receives queued) // Resubmit any available buffers to keep the ring full
// This is analogous to the send-side accumulating packets before submission
submitted := uint32(0) submitted := uint32(0)
for _, buf := range r.bufferPool { for _, buf := range r.bufferPool {
if !buf.inFlight.Load() { if !buf.inFlight.Load() && !buf.inUse.Load() {
if err := r.submitRecvLocked(buf); err != nil { if err := r.submitRecvLocked(buf); err != nil {
break // Queue might be full break // SQ full - will retry on next call
} }
submitted++ submitted++
} }
} }
// Submit and optionally wait (like send-side submitAndWaitLocked)
waitCount := uint32(0) waitCount := uint32(0)
if wait { if wait {
waitCount = 1 waitCount = 1
@@ -1355,15 +1606,22 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) {
} }
} }
// Process completed CQEs // Drain all completed CQEs in one pass (like send-side batch completion)
var packets []RecvPacket
head := atomic.LoadUint32(r.cqHead) head := atomic.LoadUint32(r.cqHead)
tail := atomic.LoadUint32(r.cqTail) tail := atomic.LoadUint32(r.cqTail)
mask := *r.cqRingMask mask := *r.cqRingMask
entries := atomic.LoadUint32(r.cqRingEntries)
completions := uint32(0) var packetCap int
errors := 0 if tail >= head {
eagains := 0 packetCap = int(tail - head)
} else {
packetCap = int(entries - (head - tail))
}
if packetCap < 0 {
packetCap = 0
}
packets := make([]RecvPacket, 0, packetCap)
for head != tail { for head != tail {
idx := head & mask idx := head & mask
@@ -1373,9 +1631,9 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) {
res := cqe.Res res := cqe.Res
flags := cqe.Flags flags := cqe.Flags
// Advance head immediately (proper CQE consumption pattern)
head++ head++
atomic.StoreUint32(r.cqHead, head) atomic.StoreUint32(r.cqHead, head)
completions++
buf, ok := r.bufferMap[userData] buf, ok := r.bufferMap[userData]
if !ok { if !ok {
@@ -1385,13 +1643,10 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) {
buf.inFlight.Store(false) buf.inFlight.Store(false)
// Handle errors - skip failed receives
if res < 0 { if res < 0 {
errno := syscall.Errno(-res) errno := syscall.Errno(-res)
// EAGAIN is expected for non-blocking - just resubmit if errno != unix.EAGAIN {
if errno == unix.EAGAIN {
eagains++
} else {
errors++
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"userData": userData, "userData": userData,
"errno": errno, "errno": errno,
@@ -1401,11 +1656,10 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) {
} }
if res == 0 { if res == 0 {
// Connection closed or no data continue // No data
continue
} }
// Successfully received packet // Successfully received packet - prepare for caller
n := int(res) n := int(res)
// Copy address into standalone struct // Copy address into standalone struct
@@ -1421,7 +1675,6 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) {
} }
buf.inUse.Store(true) buf.inUse.Store(true)
bufferRef := buf
packets = append(packets, RecvPacket{ packets = append(packets, RecvPacket{
Data: buf.payloadBuf[:n], Data: buf.payloadBuf[:n],
@@ -1430,7 +1683,7 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) {
Flags: flags, Flags: flags,
Control: controlSlice, Control: controlSlice,
Controllen: controllen, Controllen: controllen,
RecycleFunc: func() { r.recycleBuffer(bufferRef) }, RecycleFunc: buf.recycleFn,
}) })
} }

View File

@@ -1,6 +1,26 @@
//go:build !android && !e2e_testing //go:build !android && !e2e_testing
// +build !android,!e2e_testing // +build !android,!e2e_testing
// Package udp implements high-performance UDP socket I/O for Nebula
//
// I/O Architecture:
//
// SEND PATH (with io_uring):
// - Multiple send shards accumulate outgoing packets asynchronously
// - Each shard batches packets for ~25?s before submission
// - Batches are submitted to a shared ioUringState via SendmsgBatch
// - Efficient GSO support for coalescing multiple packets into one kernel call
//
// RECEIVE PATH (with io_uring):
// - Dedicated ioUringRecvState with pre-allocated buffer pool
// - Continuously keeps receive operations queued in the io_uring SQ
// - ListenOut directly uses receivePackets() to drain completions in batches
// - Efficient GRO support for receiving coalesced packets
// - ReadSingle/ReadMulti use standard syscalls (not io_uring) for simplicity
//
// This architecture ensures send and receive are similarly optimized with
// batching, pre-allocated buffers, and minimal lock contention.
package udp package udp
import ( import (
@@ -70,6 +90,8 @@ type StdConn struct {
groBatchTick atomic.Int64 groBatchTick atomic.Int64
groSegmentsTick atomic.Int64 groSegmentsTick atomic.Int64
// io_uring state - now per-shard for parallel sending
// ioState is deprecated (kept for compatibility but unused)
ioState atomic.Pointer[ioUringState] ioState atomic.Pointer[ioUringState]
ioRecvState atomic.Pointer[ioUringRecvState] ioRecvState atomic.Pointer[ioUringRecvState]
ioActive atomic.Bool ioActive atomic.Bool
@@ -135,6 +157,9 @@ type sendShard struct {
outQueue chan *sendTask outQueue chan *sendTask
workerDone sync.WaitGroup workerDone sync.WaitGroup
// Per-shard io_uring for parallel sends (no lock contention!)
ioState *ioUringState
} }
func clampIoUringBatchSize(requested int, ringEntries uint32) int { func clampIoUringBatchSize(requested int, ringEntries uint32) int {
@@ -269,6 +294,62 @@ func (u *StdConn) resizeSendShards(count int) {
u.sendShards = newShards u.sendShards = newShards
u.shardCounter.Store(0) u.shardCounter.Store(0)
u.l.WithField("send_shards", count).Debug("Configured UDP send shards") u.l.WithField("send_shards", count).Debug("Configured UDP send shards")
// If io_uring is enabled, create per-shard io_uring instances
if u.ioActive.Load() {
u.initShardIoUring()
}
}
// initShardIoUring creates a dedicated io_uring for each send shard
// This eliminates lock contention and enables true parallel sending
func (u *StdConn) initShardIoUring() {
if !u.ioActive.Load() {
return
}
configured := uint32(u.ioUringMaxBatch.Load())
if configured == 0 {
configured = ioUringDefaultMaxBatch
}
successCount := 0
numCPU := runtime.NumCPU()
for i, shard := range u.sendShards {
ring, err := newIoUringState(configured)
if err != nil {
u.l.WithError(err).WithField("shard", i).Warn("Failed to create io_uring for shard")
continue
}
shard.ioState = ring
successCount++
// Calculate which CPU this SQPOLL thread is pinned to (if any)
cpuInfo := ""
if ring.sqpollEnabled {
// SQPOLL threads spread across cores to avoid competition
cpuCore := i % numCPU
cpuInfo = fmt.Sprintf(" on CPU %d", cpuCore)
}
u.l.WithFields(logrus.Fields{
"shard": i,
"sq_entries": ring.sqEntryCount,
"sqpoll": ring.sqpollEnabled,
}).Debugf("Created per-shard io_uring%s", cpuInfo)
}
if successCount > 0 {
u.l.WithFields(logrus.Fields{
"shards": len(u.sendShards),
"successful": successCount,
}).Info("Per-shard io_uring send enabled (parallel, no lock contention)")
} else {
u.l.Warn("No shards successfully initialized io_uring")
u.ioActive.Store(false)
}
} }
func (u *StdConn) setGroBufferSize(size int) { func (u *StdConn) setGroBufferSize(size int) {
@@ -485,6 +566,12 @@ func (s *sendShard) startSender() {
func (s *sendShard) stopSender() { func (s *sendShard) stopSender() {
s.closeSender() s.closeSender()
s.workerDone.Wait() s.workerDone.Wait()
// Close per-shard io_uring
if s.ioState != nil {
s.ioState.Close()
s.ioState = nil
}
} }
func (s *sendShard) closeSender() { func (s *sendShard) closeSender() {
@@ -535,6 +622,7 @@ func (s *sendShard) submitTask(task *sendTask) error {
} }
} }
// Fallback: if channel is full or closed, process immediately
return s.processTask(task) return s.processTask(task)
} }
@@ -693,7 +781,8 @@ func (s *sendShard) processTasksBatch(tasks []*sendTask) error {
return nil return nil
} }
p := s.parent p := s.parent
state := p.ioState.Load() // Use per-shard io_uring (no lock contention!)
state := s.ioState
var firstErr error var firstErr error
if state != nil { if state != nil {
if err := s.processTasksBatchIOUring(state, tasks); err != nil { if err := s.processTasksBatchIOUring(state, tasks); err != nil {
@@ -901,7 +990,17 @@ func (s *sendShard) write(b []byte, addr netip.AddrPort) error {
p := s.parent p := s.parent
// If no GSO, but we have io_uring, still use the batching path
// to benefit from io_uring holdoff batching
if !p.enableGSO || !addr.IsValid() { if !p.enableGSO || !addr.IsValid() {
if s.ioState != nil {
// Use io_uring batching even without GSO
s.mu.Unlock()
err := s.enqueueImmediate(b, addr)
s.mu.Lock()
return err
}
// No io_uring either - fall back to direct syscall
p.recordGSOSingle(1) p.recordGSOSingle(1)
return p.directWrite(b, addr) return p.directWrite(b, addr)
} }
@@ -920,6 +1019,13 @@ func (s *sendShard) write(b []byte, addr netip.AddrPort) error {
return err return err
} }
p.recordGSOSingle(1) p.recordGSOSingle(1)
// If io_uring is enabled, use batching even without GSO
if s.ioState != nil {
s.mu.Unlock()
err := s.enqueueImmediate(b, addr)
s.mu.Lock()
return err
}
return p.directWrite(b, addr) return p.directWrite(b, addr)
} }
@@ -981,6 +1087,13 @@ func (s *sendShard) flushPendingLocked() error {
s.stopFlushTimerLocked() s.stopFlushTimerLocked()
// Fast path: If GSO is enabled and io_uring exists, GSO has already done
// the batching work. Skip the channel/senderLoop and process directly
// to minimize latency since io_uring batching adds minimal value.
if s.parent != nil && s.parent.enableGSO && s.ioState != nil {
return s.processTask(task)
}
s.mu.Unlock() s.mu.Unlock()
err := s.submitTask(task) err := s.submitTask(task)
s.mu.Lock() s.mu.Lock()
@@ -1708,40 +1821,9 @@ func (u *StdConn) ReadSingle(msgs []rawMessage) (int, error) {
return 0, nil return 0, nil
} }
u.l.Debug("ReadSingle called") // Note: io_uring receive uses the dedicated ioUringRecvState in ListenOut,
// not this path. This function always uses direct syscalls for simplicity.
state := u.ioState.Load() return u.readSingleSyscall(msgs)
if state == nil {
return u.readSingleSyscall(msgs)
}
u.l.Debug("ReadSingle: converting rawMessage to unix.Msghdr")
hdr, iov, err := rawMessageToUnixMsghdr(&msgs[0])
if err != nil {
u.l.WithError(err).Error("ReadSingle: rawMessageToUnixMsghdr failed")
return 0, &net.OpError{Op: "recvmsg", Err: err}
}
u.l.WithFields(logrus.Fields{
"bufLen": iov.Len,
"nameLen": hdr.Namelen,
"ctrlLen": hdr.Controllen,
}).Debug("ReadSingle: calling state.Recvmsg")
n, _, recvErr := state.Recvmsg(u.sysFd, &hdr, 0)
if recvErr != nil {
u.l.WithError(recvErr).Error("ReadSingle: state.Recvmsg failed")
return 0, recvErr
}
u.l.WithFields(logrus.Fields{
"bytesRead": n,
}).Debug("ReadSingle: successfully received")
updateRawMessageFromUnixMsghdr(&msgs[0], &hdr, n)
runtime.KeepAlive(iov)
runtime.KeepAlive(hdr)
return 1, nil
} }
func (u *StdConn) ReadMulti(msgs []rawMessage) (int, error) { func (u *StdConn) ReadMulti(msgs []rawMessage) (int, error) {
@@ -1749,64 +1831,9 @@ func (u *StdConn) ReadMulti(msgs []rawMessage) (int, error) {
return 0, nil return 0, nil
} }
u.l.WithField("batch_size", len(msgs)).Debug("ReadMulti called") // Note: io_uring receive uses the dedicated ioUringRecvState in ListenOut,
// not this path. This function always uses direct syscalls for simplicity.
state := u.ioState.Load() return u.readMultiSyscall(msgs)
if state == nil {
return u.readMultiSyscall(msgs)
}
count := 0
for i := range msgs {
hdr, iov, err := rawMessageToUnixMsghdr(&msgs[i])
if err != nil {
u.l.WithError(err).WithField("index", i).Error("ReadMulti: rawMessageToUnixMsghdr failed")
if count > 0 {
return count, nil
}
return 0, &net.OpError{Op: "recvmsg", Err: err}
}
flags := uint32(0)
if i > 0 {
flags = unix.MSG_DONTWAIT
}
u.l.WithFields(logrus.Fields{
"index": i,
"flags": flags,
"bufLen": iov.Len,
}).Debug("ReadMulti: calling state.Recvmsg")
n, _, recvErr := state.Recvmsg(u.sysFd, &hdr, flags)
if recvErr != nil {
u.l.WithError(recvErr).WithFields(logrus.Fields{
"index": i,
"count": count,
}).Debug("ReadMulti: state.Recvmsg error")
if isEAgain(recvErr) && count > 0 {
u.l.WithField("count", count).Debug("ReadMulti: EAGAIN with existing packets, returning")
return count, nil
}
if count > 0 {
return count, recvErr
}
return 0, recvErr
}
u.l.WithFields(logrus.Fields{
"index": i,
"bytesRead": n,
}).Debug("ReadMulti: packet received")
updateRawMessageFromUnixMsghdr(&msgs[i], &hdr, n)
runtime.KeepAlive(iov)
runtime.KeepAlive(hdr)
count++
}
u.l.WithField("total_count", count).Debug("ReadMulti: completed")
return count, nil
} }
func (u *StdConn) WriteTo(b []byte, ip netip.AddrPort) error { func (u *StdConn) WriteTo(b []byte, ip netip.AddrPort) error {
@@ -2524,32 +2551,13 @@ func (u *StdConn) configureIOUring(enable bool, c *config.C) {
} }
} }
u.ioUringMaxBatch.Store(int64(requestedBatch)) u.ioUringMaxBatch.Store(int64(requestedBatch))
ring, err := newIoUringState(configured)
if err != nil { // Mark io_uring as active - per-shard rings will be created when shards are initialized
u.l.WithError(err).Warn("Failed to enable io_uring; falling back to sendmmsg path") u.ioActive.Store(true)
return u.l.WithFields(logrus.Fields{
} "max_batch": requestedBatch,
u.ioState.Store(ring) "holdoff": u.ioUringHoldoff.Load(),
finalBatch := clampIoUringBatchSize(requestedBatch, ring.sqEntryCount) }).Debug("io_uring send path configured (per-shard rings will be created)")
u.ioUringMaxBatch.Store(int64(finalBatch))
fields := logrus.Fields{
"entries": ring.sqEntryCount,
"max_batch": finalBatch,
}
if finalBatch != requestedBatch {
fields["requested_batch"] = requestedBatch
}
u.l.WithFields(fields).Debug("io_uring ioState pointer initialized")
desired := configured
if desired == 0 {
desired = defaultIoUringEntries
}
if ring.sqEntryCount < desired {
fields["requested_entries"] = desired
u.l.WithFields(fields).Warn("UDP io_uring send path enabled with reduced queue depth (ENOMEM)")
} else {
u.l.WithFields(fields).Debug("UDP io_uring send path enabled")
}
// Initialize dedicated receive ring with retry logic // Initialize dedicated receive ring with retry logic
recvPoolSize := 128 // Number of receive operations to keep queued recvPoolSize := 128 // Number of receive operations to keep queued