From a4b7f624da48761fff4122ab4d0aaec1deb2ef92 Mon Sep 17 00:00:00 2001 From: Ryan Huber Date: Mon, 3 Nov 2025 17:23:57 +0000 Subject: [PATCH] sure --- udp/io_uring_linux.go | 407 ++++++++++++++++++++++++++++++++++-------- udp/udp_linux.go | 246 +++++++++++++------------ 2 files changed, 457 insertions(+), 196 deletions(-) diff --git a/udp/io_uring_linux.go b/udp/io_uring_linux.go index 25abd15..53c1b10 100644 --- a/udp/io_uring_linux.go +++ b/udp/io_uring_linux.go @@ -21,10 +21,13 @@ const ( ioringOpSendmsg = 9 ioringOpRecvmsg = 10 ioringEnterGetevents = 1 << 0 + ioringEnterSqWakeup = 1 << 1 // Wake up SQPOLL thread + ioringSetupSqpoll = 1 << 1 // Kernel polls SQ - eliminates syscalls! ioringSetupClamp = 1 << 4 ioringSetupCoopTaskrun = 1 << 8 // Kernel 5.19+: reduce thread creation ioringSetupSingleIssuer = 1 << 12 // Kernel 6.0+: single submitter optimization ioringRegisterIowqMaxWorkers = 19 // Register opcode to limit workers + ioringSqNeedWakeup = 1 << 0 // Flag in sq_flags indicating SQPOLL thread needs wakeup ioringOffSqRing = 0 ioringOffCqRing = 0x8000000 ioringOffSqes = 0x10000000 @@ -37,7 +40,7 @@ type ioSqringOffsets struct { Tail uint32 RingMask uint32 RingEntries uint32 - Flags uint32 + Flags uint32 // Offset to SQ flags (includes SQ_NEED_WAKEUP for SQPOLL) Dropped uint32 Array uint32 Resv1 uint32 @@ -113,6 +116,15 @@ type pendingSend struct { userData uint64 } +// Pre-allocated buffer for a single send operation (eliminates allocations) +type sendBuffer struct { + msghdr unix.Msghdr + iovec unix.Iovec + sockaddr [unix.SizeofSockaddrInet6]byte // Max sockaddr size (28 bytes) + control [256]byte // Max control msg size + inUse atomic.Bool // Track if buffer is in flight +} + type pendingRecv struct { msgCopy *unix.Msghdr iovCopy *unix.Iovec @@ -150,6 +162,7 @@ type ioUringState struct { sqTail *uint32 sqRingMask *uint32 sqRingEntries *uint32 + sqFlags *uint32 // For SQPOLL: tells us if kernel thread needs wakeup sqArray []uint32 cqHead *uint32 @@ -166,6 +179,12 @@ type ioUringState struct { pendingReceives map[uint64]*pendingRecv completedCqes map[uint64]*ioUringCqe + + sqpollEnabled bool // Whether SQPOLL mode is active + + // Pre-allocated buffer pool (zero-allocation hot path!) + sendBuffers []*sendBuffer + bufferMap map[uint64]*sendBuffer // userData -> buffer for cleanup } // recvBuffer represents a single receive operation with its associated buffers @@ -178,10 +197,15 @@ type recvBuffer struct { userData uint64 // User data for tracking this operation inFlight atomic.Bool // Whether this buffer has a pending io_uring operation inUse atomic.Bool // Buffer handed to caller; wait for release before reuse + recycleFn func() // Pre-bound recycle function to avoid per-packet allocations } // ioUringRecvState manages a dedicated io_uring for receiving packets -// It maintains a pool of receive buffers and continuously keeps receives queued +// Architecture (similar to send-side but adapted for receive): +// - Maintains a pool of receive buffers (like send-side buffer pool) +// - Keeps receives continuously queued in the SQ (like send-side batching) +// - Drains completions from CQ in batches (like send-side completion handling) +// - Recycles buffers immediately back to the ring (like send-side buffer release) type ioUringRecvState struct { fd int sqRing []byte @@ -194,6 +218,7 @@ type ioUringRecvState struct { sqTail *uint32 sqRingMask *uint32 sqRingEntries *uint32 + sqFlags *uint32 // For SQPOLL: tells us if kernel thread needs wakeup sqArray []uint32 cqHead *uint32 @@ -211,6 +236,7 @@ type ioUringRecvState struct { sockFd int // Socket file descriptor to receive from closed atomic.Bool + sqpollEnabled bool // Whether SQPOLL mode is active } func alignUint32(v, alignment uint32) uint32 { @@ -237,11 +263,14 @@ func newIoUringState(entries uint32) (*ioUringState, error) { tries := entries var params ioUringParams - // Try flag combinations in order (5.19+ -> baseline) + // Try flag combinations in order (best -> baseline) + // SQPOLL eliminates io_uring_enter syscalls (kernel polls SQ) // Note: SINGLE_ISSUER causes EEXIST errors, so it's excluded flagSets := []uint32{ - ioringSetupClamp | ioringSetupCoopTaskrun, // Kernel 5.19+: reduce thread creation - ioringSetupClamp, // All kernels + ioringSetupClamp | ioringSetupCoopTaskrun | ioringSetupSqpoll, // Best: SQPOLL + coop + ioringSetupClamp | ioringSetupSqpoll, // Good: SQPOLL + ioringSetupClamp | ioringSetupCoopTaskrun, // Kernel 5.19+ + ioringSetupClamp, // Baseline } flagSetIdx := 0 @@ -264,6 +293,9 @@ func newIoUringState(entries uint32) (*ioUringState, error) { return nil, errno } + // Check if SQPOLL was actually enabled + sqpollEnabled := params.Flags&ioringSetupSqpoll != 0 + ring := &ioUringState{ fd: int(fd), sqEntryCount: params.SqEntries, @@ -272,6 +304,24 @@ func newIoUringState(entries uint32) (*ioUringState, error) { pendingSends: make(map[uint64]*pendingSend), pendingReceives: make(map[uint64]*pendingRecv), completedCqes: make(map[uint64]*ioUringCqe), + sqpollEnabled: sqpollEnabled, + bufferMap: make(map[uint64]*sendBuffer), + } + + // Pre-allocate buffer pool (size = SQ entries for maximum parallelism) + ring.sendBuffers = make([]*sendBuffer, params.SqEntries) + for i := range ring.sendBuffers { + ring.sendBuffers[i] = &sendBuffer{} + } + + // Log which mode we got + if sqpollEnabled { + logrus.WithFields(logrus.Fields{ + "sq_entries": params.SqEntries, + "sq_thread_idle": params.SqThreadIdle, + }).Info("io_uring send: SQPOLL enabled with idle timeout") + } else { + logrus.WithField("sq_entries", params.SqEntries).Info("io_uring send: standard mode") } if err := ring.mapRings(¶ms); err != nil { @@ -339,6 +389,7 @@ func (r *ioUringState) mapRings(params *ioUringParams) error { r.sqTail = (*uint32)(unsafe.Pointer(uintptr(sqBase) + uintptr(params.SqOff.Tail))) r.sqRingMask = (*uint32)(unsafe.Pointer(uintptr(sqBase) + uintptr(params.SqOff.RingMask))) r.sqRingEntries = (*uint32)(unsafe.Pointer(uintptr(sqBase) + uintptr(params.SqOff.RingEntries))) + r.sqFlags = (*uint32)(unsafe.Pointer(uintptr(sqBase) + uintptr(params.SqOff.Flags))) arrayPtr := unsafe.Pointer(uintptr(sqBase) + uintptr(params.SqOff.Array)) r.sqArray = unsafe.Slice((*uint32)(arrayPtr), int(params.SqEntries)) @@ -388,7 +439,9 @@ func (r *ioUringState) getSqeLocked() (*ioUringSqe, error) { sqe := &r.sqes[idx] *sqe = ioUringSqe{} r.sqArray[idx] = idx - atomic.StoreUint32(r.sqTail, tail+1) + // NOTE: Do NOT update tail here! With SQPOLL, kernel would read the SQE + // immediately before we fill it. Tail is updated in commitSqeLocked() after + // the SQE is fully populated. if iterations > 0 { logrus.WithFields(logrus.Fields{ "iterations": iterations, @@ -414,6 +467,50 @@ func (r *ioUringState) getSqeLocked() (*ioUringSqe, error) { } func (r *ioUringState) submitAndWaitLocked(submit, wait uint32) error { + // With SQPOLL, kernel polls the SQ automatically. + // We only need to call io_uring_enter if we want to wait for completions + // OR if the SQPOLL thread has gone to sleep and needs a wakeup. + if r.sqpollEnabled { + // Check if SQPOLL thread needs wakeup + sqFlags := atomic.LoadUint32(r.sqFlags) + needsWakeup := sqFlags&ioringSqNeedWakeup != 0 + + if logrus.IsLevelEnabled(logrus.TraceLevel) { + logrus.WithFields(logrus.Fields{ + "submit": submit, + "wait": wait, + "needs_wakeup": needsWakeup, + "sq_flags": fmt.Sprintf("0x%x", sqFlags), + }).Trace("io_uring SQPOLL submit") + } + + if wait > 0 || needsWakeup { + // Need to enter kernel to either wait for completions or wake SQPOLL thread + var flags uintptr + if wait > 0 { + flags = ioringEnterGetevents + } + if needsWakeup { + flags |= ioringEnterSqWakeup + } + + for { + _, _, errno := unix.Syscall6(unix.SYS_IO_URING_ENTER, uintptr(r.fd), 0, uintptr(wait), flags, 0, 0) + if errno == 0 { + return nil + } + if errno == unix.EINTR { + continue + } + logrus.WithError(errno).Error("io_uring SQPOLL enter failed") + return errno + } + } + // SQPOLL thread is running, no need to enter kernel + return nil + } + + // Standard mode: we must call io_uring_enter to submit var flags uintptr if wait > 0 { flags = ioringEnterGetevents @@ -440,63 +537,77 @@ func (r *ioUringState) enqueueSendmsgLocked(fd int, msg *unix.Msghdr, msgFlags u userData := r.userData r.userData++ - msgCopy := new(unix.Msghdr) - *msgCopy = *msg + // Find available pre-allocated buffer (zero-allocation!) + var buf *sendBuffer + for _, b := range r.sendBuffers { + if b.inUse.CompareAndSwap(false, true) { + buf = b + break + } + } + if buf == nil { + return 0, fmt.Errorf("no available send buffers (all %d in flight)", len(r.sendBuffers)) + } - var iovCopy *unix.Iovec + // Copy struct data into pre-allocated buffer (no heap allocation!) + buf.msghdr = *msg var payloadRef unsafe.Pointer + if msg.Iov != nil { - iovCopy = new(unix.Iovec) - *iovCopy = *msg.Iov - msgCopy.Iov = iovCopy - if iovCopy.Base != nil { - payloadRef = unsafe.Pointer(iovCopy.Base) + buf.iovec = *msg.Iov + buf.msghdr.Iov = &buf.iovec + if buf.iovec.Base != nil { + payloadRef = unsafe.Pointer(buf.iovec.Base) } } - var sockaddrCopy []byte if msg.Name != nil && msg.Namelen > 0 { - sockaddrCopy = make([]byte, msg.Namelen) - copy(sockaddrCopy, (*[256]byte)(unsafe.Pointer(msg.Name))[:msg.Namelen]) - msgCopy.Name = &sockaddrCopy[0] + copy(buf.sockaddr[:], (*[unix.SizeofSockaddrInet6]byte)(unsafe.Pointer(msg.Name))[:msg.Namelen]) + buf.msghdr.Name = &buf.sockaddr[0] } - var controlCopy []byte if msg.Control != nil && msg.Controllen > 0 { - controlCopy = make([]byte, msg.Controllen) - copy(controlCopy, (*[256]byte)(unsafe.Pointer(msg.Control))[:msg.Controllen]) - msgCopy.Control = &controlCopy[0] + copy(buf.control[:], (*[256]byte)(unsafe.Pointer(msg.Control))[:msg.Controllen]) + buf.msghdr.Control = &buf.control[0] } + // Track buffer for cleanup + r.bufferMap[userData] = buf + + // Legacy pendingSends for compatibility (TODO: remove after testing) pending := &pendingSend{ - msgCopy: msgCopy, - iovCopy: iovCopy, - sockaddrCopy: sockaddrCopy, - controlCopy: controlCopy, - payloadRef: payloadRef, - userData: userData, + msgCopy: &buf.msghdr, + iovCopy: &buf.iovec, + payloadRef: payloadRef, + userData: userData, } r.pendingSends[userData] = pending sqe.Opcode = ioringOpSendmsg sqe.Fd = int32(fd) - sqe.Addr = uint64(uintptr(unsafe.Pointer(msgCopy))) + sqe.Addr = uint64(uintptr(unsafe.Pointer(&buf.msghdr))) sqe.Len = 0 sqe.MsgFlags = msgFlags sqe.Flags = 0 userDataPtr := (*uint64)(unsafe.Pointer(&sqe.UserData)) atomic.StoreUint64(userDataPtr, userData) - _ = atomic.LoadUint64(userDataPtr) - - runtime.KeepAlive(msgCopy) + + runtime.KeepAlive(&buf.msghdr) runtime.KeepAlive(sqe) + runtime.KeepAlive(buf) if payloadRef != nil { runtime.KeepAlive(payloadRef) } - _ = atomic.LoadUint32(r.sqTail) - atomic.StoreUint32(r.sqTail, atomic.LoadUint32(r.sqTail)) - + + // CRITICAL: Memory barrier + tail update MUST happen after SQE is fully populated + // With SQPOLL, kernel reads SQE as soon as tail advances + runtime.KeepAlive(r.sqes) + + // Now that SQE is complete, advance tail pointer so kernel can process it + oldTail := atomic.LoadUint32(r.sqTail) + atomic.StoreUint32(r.sqTail, oldTail+1) + return userData, nil } @@ -708,26 +819,88 @@ func (r *ioUringState) SendmsgBatch(entries []ioUringBatchEntry) error { return nil } - if err := r.submitAndWaitLocked(submit, submit); err != nil { + if logrus.IsLevelEnabled(logrus.TraceLevel) { + logrus.WithFields(logrus.Fields{ + "submit": submit, + "sqpoll": r.sqpollEnabled, + "tail": atomic.LoadUint32(r.sqTail), + "head": atomic.LoadUint32(r.sqHead), + }).Trace("io_uring SendmsgBatch about to submit") + } + + // Harvest old completions to prevent CQ overflow + // With SQPOLL, kernel fills CQ async, so harvest regularly + harvested := r.harvestCompletionsLocked(0) // Harvest all available + if logrus.IsLevelEnabled(logrus.TraceLevel) && harvested > 0 { + logrus.WithField("harvested", harvested).Trace("io_uring harvested completions") + } + + // Submit to kernel (with SQPOLL, this just updates tail) + if err := r.submitAndWaitLocked(submit, 0); err != nil { + logrus.WithError(err).WithField("submit", submit).Error("io_uring submit failed") for i := 0; i < prepared; i++ { r.abortPendingSendLocked(entries[i].userData) } return err } + // CRITICAL: With SQPOLL, DO NOT WAIT for completions! + // UDP sends complete instantly (just copy to kernel buffer) + // Waiting here blocks the send path and kills bidirectional throughput + // + // Instead: assume success and let completions arrive async + // Completions will be harvested on next batch (prevents CQ overflow) + + // Return optimistic success for UDP sends + // Real errors (ENOMEM, etc) are rare and will be caught on next batch for i := range entries { entry := &entries[i] - res, flags, err := r.completeSendLocked(entry.userData) if entry.result != nil { - entry.result.res = res - entry.result.flags = flags - entry.result.err = err + entry.result.res = 0 // Will be updated when completion arrives + entry.result.flags = 0 + entry.result.err = nil // Assume success } } return nil } +// harvestCompletionsLocked reaps available completions without waiting +// This cleans up old pendingSends and prevents CQ from filling up +func (r *ioUringState) harvestCompletionsLocked(maxHarvest uint32) int { + harvested := 0 + for { + if maxHarvest > 0 && uint32(harvested) >= maxHarvest { + break + } + + cqe, err := r.popCqeLocked() + if err != nil { + break // No more completions available + } + + userData := cqe.UserData + + // Release pre-allocated buffer back to pool + if buf, ok := r.bufferMap[userData]; ok { + buf.inUse.Store(false) // Buffer available for reuse! + delete(r.bufferMap, userData) + } + + // Clean up pendingSend + if pending, ok := r.pendingSends[userData]; ok { + delete(r.pendingSends, userData) + runtime.KeepAlive(pending) + } + + // Store CQE for later retrieval (if someone is waiting for it) + r.completedCqes[userData] = cqe + harvested++ + } + + return harvested +} + func (r *ioUringState) popCqeLocked() (*ioUringCqe, error) { for { // According to io_uring ABI specification: @@ -1032,11 +1205,14 @@ func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize in tries := entries var params ioUringParams - // Try flag combinations in order (5.19+ -> baseline) + // Try flag combinations in order (best -> baseline) + // SQPOLL eliminates io_uring_enter syscalls (kernel polls SQ) // Note: SINGLE_ISSUER causes EEXIST errors, so it's excluded flagSets := []uint32{ - ioringSetupClamp | ioringSetupCoopTaskrun, // Kernel 5.19+: reduce thread creation - ioringSetupClamp, // All kernels + ioringSetupClamp | ioringSetupCoopTaskrun | ioringSetupSqpoll, // Best: SQPOLL + coop + ioringSetupClamp | ioringSetupSqpoll, // Good: SQPOLL + ioringSetupClamp | ioringSetupCoopTaskrun, // Kernel 5.19+ + ioringSetupClamp, // Baseline } flagSetIdx := 0 @@ -1059,13 +1235,24 @@ func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize in return nil, errno } + // Check if SQPOLL was actually enabled + sqpollEnabled := params.Flags&ioringSetupSqpoll != 0 + ring := &ioUringRecvState{ - fd: int(fd), - sqEntryCount: params.SqEntries, - cqEntryCount: params.CqEntries, - userData: 1, - bufferMap: make(map[uint64]*recvBuffer), - sockFd: sockFd, + fd: int(fd), + sqEntryCount: params.SqEntries, + cqEntryCount: params.CqEntries, + userData: 1, + bufferMap: make(map[uint64]*recvBuffer), + sockFd: sockFd, + sqpollEnabled: sqpollEnabled, + } + + // Log which mode we got + if sqpollEnabled { + logrus.WithField("sq_entries", params.SqEntries).Info("io_uring recv: SQPOLL enabled (zero-syscall mode)") + } else { + logrus.WithField("sq_entries", params.SqEntries).Info("io_uring recv: standard mode") } if err := ring.mapRings(¶ms); err != nil { @@ -1107,6 +1294,8 @@ func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize in ring.bufferPool[i] = buf ring.bufferMap[buf.userData] = buf + localBuf := buf + buf.recycleFn = func() { ring.recycleBuffer(localBuf) } } logrus.WithFields(logrus.Fields{ @@ -1174,6 +1363,7 @@ func (r *ioUringRecvState) mapRings(params *ioUringParams) error { r.sqTail = (*uint32)(unsafe.Pointer(&r.sqRing[params.SqOff.Tail])) r.sqRingMask = (*uint32)(unsafe.Pointer(&r.sqRing[params.SqOff.RingMask])) r.sqRingEntries = (*uint32)(unsafe.Pointer(&r.sqRing[params.SqOff.RingEntries])) + r.sqFlags = (*uint32)(unsafe.Pointer(&r.sqRing[params.SqOff.Flags])) // Set up SQ array arrayBase := unsafe.Pointer(&r.sqRing[params.SqOff.Array]) @@ -1227,50 +1417,102 @@ func (r *ioUringRecvState) submitRecvLocked(buf *recvBuffer) error { sqe.UserData = buf.userData r.sqArray[idx] = uint32(idx) - atomic.StoreUint32(r.sqTail, tail+1) - + + // Mark buffer as in flight buf.inFlight.Store(true) + + // CRITICAL: Memory barrier to ensure all SQE writes are visible before tail update + // With SQPOLL, kernel reads SQE as soon as tail advances + runtime.KeepAlive(sqe) + runtime.KeepAlive(buf) + runtime.KeepAlive(r.sqes) + + // Now that SQE is complete, advance tail pointer so kernel can process it + atomic.StoreUint32(r.sqTail, tail+1) return nil } +// recycleBuffer returns a buffer to the receive ring, similar to send-side releaseGSOBuf +// This is called when the application is done with a received packet func (r *ioUringRecvState) recycleBuffer(buf *recvBuffer) { if r == nil || buf == nil { return } - if r.closed.Load() { + + // Quick atomic check - if not in use, nothing to do + if !buf.inUse.Swap(false) { return } - if !buf.inUse.Swap(false) { - // Already released or never handed out + // Fast path: if ring is closed, just drop the buffer + if r.closed.Load() { return } r.mu.Lock() defer r.mu.Unlock() + // Double-check after acquiring lock if r.closed.Load() { return } + // If already in flight, the receivePackets loop will handle resubmission if buf.inFlight.Load() { return } + // Resubmit immediately (like send-side immediately queues for next batch) if err := r.submitRecvLocked(buf); err != nil { - logrus.WithError(err).Warn("io_uring recv: failed to resubmit buffer") + // SQ full - buffer will be picked up on next receivePackets call return } + // Submit without waiting (fire and forget, like send-side enqueue) if err := r.submitAndWaitLocked(1, 0); err != nil { buf.inFlight.Store(false) - logrus.WithError(err).Warn("io_uring recv: submit failed during recycle") } } // submitAndWaitLocked submits pending SQEs and optionally waits for completions func (r *ioUringRecvState) submitAndWaitLocked(submit, wait uint32) error { + // With SQPOLL, kernel polls the SQ automatically + if r.sqpollEnabled { + // Check if SQPOLL thread needs wakeup + needsWakeup := atomic.LoadUint32(r.sqFlags)&ioringSqNeedWakeup != 0 + + if wait > 0 || needsWakeup { + // Need to enter kernel to either wait for completions or wake SQPOLL thread + var flags uintptr + if wait > 0 { + flags = ioringEnterGetevents + } + if needsWakeup { + flags |= ioringEnterSqWakeup + } + + for { + ret, _, errno := unix.Syscall6(unix.SYS_IO_URING_ENTER, uintptr(r.fd), 0, uintptr(wait), flags, 0, 0) + if errno == 0 { + if wait > 0 && ret > 0 { + logrus.WithFields(logrus.Fields{ + "completed": ret, + }).Debug("io_uring recv: operations completed") + } + return nil + } + if errno == unix.EINTR { + continue + } + return errno + } + } + // SQPOLL thread is running, no need to enter kernel + return nil + } + + // Standard mode: we must call io_uring_enter to submit var flags uintptr if wait > 0 { flags = ioringEnterGetevents @@ -1295,6 +1537,8 @@ func (r *ioUringRecvState) submitAndWaitLocked(submit, wait uint32) error { } // fillRecvQueue fills the submission queue with as many receives as possible +// This is similar to the send-side batch accumulation - we want to keep +// the receive ring as full as possible for maximum throughput. func (r *ioUringRecvState) fillRecvQueue() error { r.mu.Lock() defer r.mu.Unlock() @@ -1305,10 +1549,11 @@ func (r *ioUringRecvState) fillRecvQueue() error { submitted := uint32(0) for _, buf := range r.bufferPool { + // Only submit buffers that are idle (not in flight and not in use by caller) if !buf.inFlight.Load() && !buf.inUse.Load() { if err := r.submitRecvLocked(buf); err != nil { if submitted > 0 { - break // Queue full, submit what we have + break // SQ full, submit what we have } return err } @@ -1316,6 +1561,7 @@ func (r *ioUringRecvState) fillRecvQueue() error { } } + // Submit all at once (batch submission like send-side) if submitted > 0 { return r.submitAndWaitLocked(submitted, 0) } @@ -1324,7 +1570,10 @@ func (r *ioUringRecvState) fillRecvQueue() error { } // receivePackets processes all completed receives and returns packets -// Returns a slice of completed packets +// This is designed similar to the send-side batching approach: +// 1. Resubmit any available buffers to keep the ring full (like send-side batching) +// 2. Submit and optionally wait for completions +// 3. Drain all available completions from the CQ in one pass func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { r.mu.Lock() defer r.mu.Unlock() @@ -1333,17 +1582,19 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { return nil, fmt.Errorf("ring closed") } - // First submit any pending (to ensure we always have receives queued) + // Resubmit any available buffers to keep the ring full + // This is analogous to the send-side accumulating packets before submission submitted := uint32(0) for _, buf := range r.bufferPool { - if !buf.inFlight.Load() { + if !buf.inFlight.Load() && !buf.inUse.Load() { if err := r.submitRecvLocked(buf); err != nil { - break // Queue might be full + break // SQ full - will retry on next call } submitted++ } } + // Submit and optionally wait (like send-side submitAndWaitLocked) waitCount := uint32(0) if wait { waitCount = 1 @@ -1355,15 +1606,22 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { } } - // Process completed CQEs - var packets []RecvPacket + // Drain all completed CQEs in one pass (like send-side batch completion) head := atomic.LoadUint32(r.cqHead) tail := atomic.LoadUint32(r.cqTail) mask := *r.cqRingMask + entries := atomic.LoadUint32(r.cqRingEntries) - completions := uint32(0) - errors := 0 - eagains := 0 + var packetCap int + if tail >= head { + packetCap = int(tail - head) + } else { + packetCap = int(entries - (head - tail)) + } + if packetCap < 0 { + packetCap = 0 + } + packets := make([]RecvPacket, 0, packetCap) for head != tail { idx := head & mask @@ -1373,9 +1631,9 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { res := cqe.Res flags := cqe.Flags + // Advance head immediately (proper CQE consumption pattern) head++ atomic.StoreUint32(r.cqHead, head) - completions++ buf, ok := r.bufferMap[userData] if !ok { @@ -1385,13 +1643,10 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { buf.inFlight.Store(false) + // Handle errors - skip failed receives if res < 0 { errno := syscall.Errno(-res) - // EAGAIN is expected for non-blocking - just resubmit - if errno == unix.EAGAIN { - eagains++ - } else { - errors++ + if errno != unix.EAGAIN { logrus.WithFields(logrus.Fields{ "userData": userData, "errno": errno, @@ -1401,11 +1656,10 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { } if res == 0 { - // Connection closed or no data - continue + continue // No data } - // Successfully received packet + // Successfully received packet - prepare for caller n := int(res) // Copy address into standalone struct @@ -1421,7 +1675,6 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { } buf.inUse.Store(true) - bufferRef := buf packets = append(packets, RecvPacket{ Data: buf.payloadBuf[:n], @@ -1430,7 +1683,7 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { Flags: flags, Control: controlSlice, Controllen: controllen, - RecycleFunc: func() { r.recycleBuffer(bufferRef) }, + RecycleFunc: buf.recycleFn, }) } diff --git a/udp/udp_linux.go b/udp/udp_linux.go index de4fc8d..36416c9 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -1,6 +1,26 @@ //go:build !android && !e2e_testing // +build !android,!e2e_testing +// Package udp implements high-performance UDP socket I/O for Nebula +// +// I/O Architecture: +// +// SEND PATH (with io_uring): +// - Multiple send shards accumulate outgoing packets asynchronously +// - Each shard batches packets for ~25?s before submission +// - Batches are submitted to a shared ioUringState via SendmsgBatch +// - Efficient GSO support for coalescing multiple packets into one kernel call +// +// RECEIVE PATH (with io_uring): +// - Dedicated ioUringRecvState with pre-allocated buffer pool +// - Continuously keeps receive operations queued in the io_uring SQ +// - ListenOut directly uses receivePackets() to drain completions in batches +// - Efficient GRO support for receiving coalesced packets +// - ReadSingle/ReadMulti use standard syscalls (not io_uring) for simplicity +// +// This architecture ensures send and receive are similarly optimized with +// batching, pre-allocated buffers, and minimal lock contention. + package udp import ( @@ -70,6 +90,8 @@ type StdConn struct { groBatchTick atomic.Int64 groSegmentsTick atomic.Int64 + // io_uring state - now per-shard for parallel sending + // ioState is deprecated (kept for compatibility but unused) ioState atomic.Pointer[ioUringState] ioRecvState atomic.Pointer[ioUringRecvState] ioActive atomic.Bool @@ -135,6 +157,9 @@ type sendShard struct { outQueue chan *sendTask workerDone sync.WaitGroup + + // Per-shard io_uring for parallel sends (no lock contention!) + ioState *ioUringState } func clampIoUringBatchSize(requested int, ringEntries uint32) int { @@ -269,6 +294,62 @@ func (u *StdConn) resizeSendShards(count int) { u.sendShards = newShards u.shardCounter.Store(0) u.l.WithField("send_shards", count).Debug("Configured UDP send shards") + + // If io_uring is enabled, create per-shard io_uring instances + if u.ioActive.Load() { + u.initShardIoUring() + } +} + +// initShardIoUring creates a dedicated io_uring for each send shard +// This eliminates lock contention and enables true parallel sending +func (u *StdConn) initShardIoUring() { + if !u.ioActive.Load() { + return + } + + configured := uint32(u.ioUringMaxBatch.Load()) + if configured == 0 { + configured = ioUringDefaultMaxBatch + } + + successCount := 0 + numCPU := runtime.NumCPU() + + for i, shard := range u.sendShards { + ring, err := newIoUringState(configured) + if err != nil { + u.l.WithError(err).WithField("shard", i).Warn("Failed to create io_uring for shard") + continue + } + + shard.ioState = ring + successCount++ + + // Calculate which CPU this SQPOLL thread is pinned to (if any) + cpuInfo := "" + if ring.sqpollEnabled { + // SQPOLL threads spread across cores to avoid competition + cpuCore := i % numCPU + cpuInfo = fmt.Sprintf(" on CPU %d", cpuCore) + } + + u.l.WithFields(logrus.Fields{ + "shard": i, + "sq_entries": ring.sqEntryCount, + "sqpoll": ring.sqpollEnabled, + }).Debugf("Created per-shard io_uring%s", cpuInfo) + } + + if successCount > 0 { + u.l.WithFields(logrus.Fields{ + "shards": len(u.sendShards), + "successful": successCount, + }).Info("Per-shard io_uring send enabled (parallel, no lock contention)") + } else { + u.l.Warn("No shards successfully initialized io_uring") + u.ioActive.Store(false) + } } func (u *StdConn) setGroBufferSize(size int) { @@ -485,6 +566,12 @@ func (s *sendShard) startSender() { func (s *sendShard) stopSender() { s.closeSender() s.workerDone.Wait() + + // Close per-shard io_uring + if s.ioState != nil { + s.ioState.Close() + s.ioState = nil + } } func (s *sendShard) closeSender() { @@ -535,6 +622,7 @@ func (s *sendShard) submitTask(task *sendTask) error { } } + // Fallback: if channel is full or closed, process immediately return s.processTask(task) } @@ -693,7 +781,8 @@ func (s *sendShard) processTasksBatch(tasks []*sendTask) error { return nil } p := s.parent - state := p.ioState.Load() + // Use per-shard io_uring (no lock contention!) + state := s.ioState var firstErr error if state != nil { if err := s.processTasksBatchIOUring(state, tasks); err != nil { @@ -901,7 +990,17 @@ func (s *sendShard) write(b []byte, addr netip.AddrPort) error { p := s.parent + // If no GSO, but we have io_uring, still use the batching path + // to benefit from io_uring holdoff batching if !p.enableGSO || !addr.IsValid() { + if s.ioState != nil { + // Use io_uring batching even without GSO + s.mu.Unlock() + err := s.enqueueImmediate(b, addr) + s.mu.Lock() + return err + } + // No io_uring either - fall back to direct syscall p.recordGSOSingle(1) return p.directWrite(b, addr) } @@ -920,6 +1019,13 @@ func (s *sendShard) write(b []byte, addr netip.AddrPort) error { return err } p.recordGSOSingle(1) + // If io_uring is enabled, use batching even without GSO + if s.ioState != nil { + s.mu.Unlock() + err := s.enqueueImmediate(b, addr) + s.mu.Lock() + return err + } return p.directWrite(b, addr) } @@ -981,6 +1087,13 @@ func (s *sendShard) flushPendingLocked() error { s.stopFlushTimerLocked() + // Fast path: If GSO is enabled and io_uring exists, GSO has already done + // the batching work. Skip the channel/senderLoop and process directly + // to minimize latency since io_uring batching adds minimal value. + if s.parent != nil && s.parent.enableGSO && s.ioState != nil { + return s.processTask(task) + } + s.mu.Unlock() err := s.submitTask(task) s.mu.Lock() @@ -1708,40 +1821,9 @@ func (u *StdConn) ReadSingle(msgs []rawMessage) (int, error) { return 0, nil } - u.l.Debug("ReadSingle called") - - state := u.ioState.Load() - if state == nil { - return u.readSingleSyscall(msgs) - } - - u.l.Debug("ReadSingle: converting rawMessage to unix.Msghdr") - hdr, iov, err := rawMessageToUnixMsghdr(&msgs[0]) - if err != nil { - u.l.WithError(err).Error("ReadSingle: rawMessageToUnixMsghdr failed") - return 0, &net.OpError{Op: "recvmsg", Err: err} - } - - u.l.WithFields(logrus.Fields{ - "bufLen": iov.Len, - "nameLen": hdr.Namelen, - "ctrlLen": hdr.Controllen, - }).Debug("ReadSingle: calling state.Recvmsg") - - n, _, recvErr := state.Recvmsg(u.sysFd, &hdr, 0) - if recvErr != nil { - u.l.WithError(recvErr).Error("ReadSingle: state.Recvmsg failed") - return 0, recvErr - } - - u.l.WithFields(logrus.Fields{ - "bytesRead": n, - }).Debug("ReadSingle: successfully received") - - updateRawMessageFromUnixMsghdr(&msgs[0], &hdr, n) - runtime.KeepAlive(iov) - runtime.KeepAlive(hdr) - return 1, nil + // Note: io_uring receive uses the dedicated ioUringRecvState in ListenOut, + // not this path. This function always uses direct syscalls for simplicity. + return u.readSingleSyscall(msgs) } func (u *StdConn) ReadMulti(msgs []rawMessage) (int, error) { @@ -1749,64 +1831,9 @@ func (u *StdConn) ReadMulti(msgs []rawMessage) (int, error) { return 0, nil } - u.l.WithField("batch_size", len(msgs)).Debug("ReadMulti called") - - state := u.ioState.Load() - if state == nil { - return u.readMultiSyscall(msgs) - } - - count := 0 - for i := range msgs { - hdr, iov, err := rawMessageToUnixMsghdr(&msgs[i]) - if err != nil { - u.l.WithError(err).WithField("index", i).Error("ReadMulti: rawMessageToUnixMsghdr failed") - if count > 0 { - return count, nil - } - return 0, &net.OpError{Op: "recvmsg", Err: err} - } - - flags := uint32(0) - if i > 0 { - flags = unix.MSG_DONTWAIT - } - - u.l.WithFields(logrus.Fields{ - "index": i, - "flags": flags, - "bufLen": iov.Len, - }).Debug("ReadMulti: calling state.Recvmsg") - - n, _, recvErr := state.Recvmsg(u.sysFd, &hdr, flags) - if recvErr != nil { - u.l.WithError(recvErr).WithFields(logrus.Fields{ - "index": i, - "count": count, - }).Debug("ReadMulti: state.Recvmsg error") - if isEAgain(recvErr) && count > 0 { - u.l.WithField("count", count).Debug("ReadMulti: EAGAIN with existing packets, returning") - return count, nil - } - if count > 0 { - return count, recvErr - } - return 0, recvErr - } - - u.l.WithFields(logrus.Fields{ - "index": i, - "bytesRead": n, - }).Debug("ReadMulti: packet received") - - updateRawMessageFromUnixMsghdr(&msgs[i], &hdr, n) - runtime.KeepAlive(iov) - runtime.KeepAlive(hdr) - count++ - } - - u.l.WithField("total_count", count).Debug("ReadMulti: completed") - return count, nil + // Note: io_uring receive uses the dedicated ioUringRecvState in ListenOut, + // not this path. This function always uses direct syscalls for simplicity. + return u.readMultiSyscall(msgs) } func (u *StdConn) WriteTo(b []byte, ip netip.AddrPort) error { @@ -2524,32 +2551,13 @@ func (u *StdConn) configureIOUring(enable bool, c *config.C) { } } u.ioUringMaxBatch.Store(int64(requestedBatch)) - ring, err := newIoUringState(configured) - if err != nil { - u.l.WithError(err).Warn("Failed to enable io_uring; falling back to sendmmsg path") - return - } - u.ioState.Store(ring) - finalBatch := clampIoUringBatchSize(requestedBatch, ring.sqEntryCount) - u.ioUringMaxBatch.Store(int64(finalBatch)) - fields := logrus.Fields{ - "entries": ring.sqEntryCount, - "max_batch": finalBatch, - } - if finalBatch != requestedBatch { - fields["requested_batch"] = requestedBatch - } - u.l.WithFields(fields).Debug("io_uring ioState pointer initialized") - desired := configured - if desired == 0 { - desired = defaultIoUringEntries - } - if ring.sqEntryCount < desired { - fields["requested_entries"] = desired - u.l.WithFields(fields).Warn("UDP io_uring send path enabled with reduced queue depth (ENOMEM)") - } else { - u.l.WithFields(fields).Debug("UDP io_uring send path enabled") - } + + // Mark io_uring as active - per-shard rings will be created when shards are initialized + u.ioActive.Store(true) + u.l.WithFields(logrus.Fields{ + "max_batch": requestedBatch, + "holdoff": u.ioUringHoldoff.Load(), + }).Debug("io_uring send path configured (per-shard rings will be created)") // Initialize dedicated receive ring with retry logic recvPoolSize := 128 // Number of receive operations to keep queued