From 3dea7615305a8260f7717d2cc69999491679ab5d Mon Sep 17 00:00:00 2001 From: Ryan Huber Date: Mon, 3 Nov 2025 10:12:02 +0000 Subject: [PATCH] fix compile for 386 --- udp/io_uring_linux.go | 176 +++++++++++++++++++++--------------------- 1 file changed, 88 insertions(+), 88 deletions(-) diff --git a/udp/io_uring_linux.go b/udp/io_uring_linux.go index c1580df..122ab33 100644 --- a/udp/io_uring_linux.go +++ b/udp/io_uring_linux.go @@ -18,18 +18,18 @@ import ( ) const ( - ioringOpSendmsg = 9 - ioringOpRecvmsg = 10 - ioringEnterGetevents = 1 << 0 - ioringSetupClamp = 1 << 4 - ioringSetupCoopTaskrun = 1 << 8 // Kernel 5.19+: reduce thread creation - ioringSetupSingleIssuer = 1 << 12 // Kernel 6.0+: single submitter optimization - ioringRegisterIowqMaxWorkers = 19 // Register opcode to limit workers - ioringOffSqRing = 0 - ioringOffCqRing = 0x8000000 - ioringOffSqes = 0x10000000 - defaultIoUringEntries = 256 - ioUringSqeSize = 64 // struct io_uring_sqe size defined by kernel ABI + ioringOpSendmsg = 9 + ioringOpRecvmsg = 10 + ioringEnterGetevents = 1 << 0 + ioringSetupClamp = 1 << 4 + ioringSetupCoopTaskrun = 1 << 8 // Kernel 5.19+: reduce thread creation + ioringSetupSingleIssuer = 1 << 12 // Kernel 6.0+: single submitter optimization + ioringRegisterIowqMaxWorkers = 19 // Register opcode to limit workers + ioringOffSqRing = 0 + ioringOffCqRing = 0x8000000 + ioringOffSqes = 0x10000000 + defaultIoUringEntries = 256 + ioUringSqeSize = 64 // struct io_uring_sqe size defined by kernel ABI ) type ioSqringOffsets struct { @@ -170,13 +170,13 @@ type ioUringState struct { // recvBuffer represents a single receive operation with its associated buffers type recvBuffer struct { - payloadBuf []byte // Buffer for packet data - nameBuf []byte // Buffer for source address - controlBuf []byte // Buffer for control messages - msghdr *unix.Msghdr // Message header for recvmsg - iovec *unix.Iovec // IO vector pointing to payloadBuf - userData uint64 // User data for tracking this operation - inFlight atomic.Bool // Whether this buffer has a pending io_uring operation + payloadBuf []byte // Buffer for packet data + nameBuf []byte // Buffer for source address + controlBuf []byte // Buffer for control messages + msghdr *unix.Msghdr // Message header for recvmsg + iovec *unix.Iovec // IO vector pointing to payloadBuf + userData uint64 // User data for tracking this operation + inFlight atomic.Bool // Whether this buffer has a pending io_uring operation } // ioUringRecvState manages a dedicated io_uring for receiving packets @@ -200,16 +200,16 @@ type ioUringRecvState struct { cqRingMask *uint32 cqRingEntries *uint32 - mu sync.Mutex - userData uint64 - bufferPool []*recvBuffer // Pool of all receive buffers - bufferMap map[uint64]*recvBuffer // Map userData -> buffer - + mu sync.Mutex + userData uint64 + bufferPool []*recvBuffer // Pool of all receive buffers + bufferMap map[uint64]*recvBuffer // Map userData -> buffer + sqEntryCount uint32 cqEntryCount uint32 - - sockFd int // Socket file descriptor to receive from - closed atomic.Bool + + sockFd int // Socket file descriptor to receive from + closed atomic.Bool } func alignUint32(v, alignment uint32) uint32 { @@ -235,12 +235,12 @@ func newIoUringState(entries uint32) (*ioUringState, error) { tries := entries var params ioUringParams - + // Try flag combinations in order (5.19+ -> baseline) // Note: SINGLE_ISSUER causes EEXIST errors, so it's excluded flagSets := []uint32{ ioringSetupClamp | ioringSetupCoopTaskrun, // Kernel 5.19+: reduce thread creation - ioringSetupClamp, // All kernels + ioringSetupClamp, // All kernels } flagSetIdx := 0 @@ -417,7 +417,7 @@ func (r *ioUringState) submitAndWaitLocked(submit, wait uint32) error { if wait > 0 { flags = ioringEnterGetevents } - + for { _, _, errno := unix.Syscall6(unix.SYS_IO_URING_ENTER, uintptr(r.fd), uintptr(submit), uintptr(wait), flags, 0, 0) if errno == 0 { @@ -1024,14 +1024,14 @@ var recvControlDataPool = sync.Pool{ // poolSize determines how many receive operations to keep queued func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize int) (*ioUringRecvState, error) { const minEntries = 8 - + if poolSize < 1 { poolSize = 64 // Default pool size } if poolSize > 2048 { poolSize = 2048 // Cap pool size } - + if entries == 0 { entries = uint32(poolSize) } @@ -1044,12 +1044,12 @@ func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize in tries := entries var params ioUringParams - + // Try flag combinations in order (5.19+ -> baseline) // Note: SINGLE_ISSUER causes EEXIST errors, so it's excluded flagSets := []uint32{ ioringSetupClamp | ioringSetupCoopTaskrun, // Kernel 5.19+: reduce thread creation - ioringSetupClamp, // All kernels + ioringSetupClamp, // All kernels } flagSetIdx := 0 @@ -1105,44 +1105,44 @@ func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize in userData: ring.userData, } ring.userData++ - + // Initialize iovec to point to payload buffer buf.iovec.Base = &buf.payloadBuf[0] buf.iovec.SetLen(len(buf.payloadBuf)) - + // Initialize msghdr buf.msghdr.Name = &buf.nameBuf[0] buf.msghdr.Namelen = uint32(len(buf.nameBuf)) buf.msghdr.Iov = buf.iovec buf.msghdr.Iovlen = 1 buf.msghdr.Control = &buf.controlBuf[0] - buf.msghdr.Controllen = uint64(len(buf.controlBuf)) - - ring.bufferPool[i] = buf - ring.bufferMap[buf.userData] = buf + buf.msghdr.Controllen = controllen(len(buf.controlBuf)) + + ring.bufferPool[i] = buf + ring.bufferMap[buf.userData] = buf + } + + logrus.WithFields(logrus.Fields{ + "poolSize": poolSize, + "entries": ring.sqEntryCount, + "bufferSize": bufferSize, + }).Info("io_uring receive ring created") + + // Limit kernel worker threads to prevent thousands being spawned + // [0] = bounded workers, [1] = unbounded workers + maxWorkers := [2]uint32{4, 4} // Limit to 4 workers of each type + _, _, errno = unix.Syscall6( + unix.SYS_IO_URING_REGISTER, + uintptr(fd), + uintptr(ioringRegisterIowqMaxWorkers), + uintptr(unsafe.Pointer(&maxWorkers[0])), + 2, // array length + 0, 0, + ) + // Ignore errors - older kernels don't support this + + return ring, nil } - - logrus.WithFields(logrus.Fields{ - "poolSize": poolSize, - "entries": ring.sqEntryCount, - "bufferSize": bufferSize, - }).Info("io_uring receive ring created") - - // Limit kernel worker threads to prevent thousands being spawned - // [0] = bounded workers, [1] = unbounded workers - maxWorkers := [2]uint32{4, 4} // Limit to 4 workers of each type - _, _, errno = unix.Syscall6( - unix.SYS_IO_URING_REGISTER, - uintptr(fd), - uintptr(ioringRegisterIowqMaxWorkers), - uintptr(unsafe.Pointer(&maxWorkers[0])), - 2, // array length - 0, 0, - ) - // Ignore errors - older kernels don't support this - - return ring, nil -} } func (r *ioUringRecvState) mapRings(params *ioUringParams) error { @@ -1215,7 +1215,7 @@ func (r *ioUringRecvState) submitRecvLocked(buf *recvBuffer) error { // Reset buffer state for reuse buf.msghdr.Namelen = uint32(len(buf.nameBuf)) - buf.msghdr.Controllen = uint64(len(buf.controlBuf)) + buf.msghdr.Controllen = controllen(len(buf.controlBuf)) buf.msghdr.Flags = 0 buf.iovec.SetLen(len(buf.payloadBuf)) @@ -1241,9 +1241,9 @@ func (r *ioUringRecvState) submitRecvLocked(buf *recvBuffer) error { r.sqArray[idx] = uint32(idx) atomic.StoreUint32(r.sqTail, tail+1) - + buf.inFlight.Store(true) - + return nil } @@ -1297,7 +1297,7 @@ func (r *ioUringRecvState) fillRecvQueue() error { if submitted > 0 { return r.submitAndWaitLocked(submitted, 0) } - + return nil } @@ -1321,12 +1321,12 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { submitted++ } } - + waitCount := uint32(0) if wait { waitCount = 1 } - + if submitted > 0 || wait { if err := r.submitAndWaitLocked(submitted, waitCount); err != nil { return nil, err @@ -1338,7 +1338,7 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { head := atomic.LoadUint32(r.cqHead) tail := atomic.LoadUint32(r.cqTail) mask := *r.cqRingMask - + completions := uint32(0) errors := 0 eagains := 0 @@ -1346,23 +1346,23 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { for head != tail { idx := head & mask cqe := &r.cqCqes[idx] - + userData := cqe.UserData res := cqe.Res flags := cqe.Flags - + head++ atomic.StoreUint32(r.cqHead, head) completions++ - + buf, ok := r.bufferMap[userData] if !ok { logrus.WithField("userData", userData).Warn("io_uring recv: unknown userData in completion") continue } - + buf.inFlight.Store(false) - + if res < 0 { errno := syscall.Errno(-res) // EAGAIN is expected for non-blocking - just resubmit @@ -1377,21 +1377,21 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { } continue } - + if res == 0 { // Connection closed or no data continue } - + // Successfully received packet n := int(res) - + // Copy address var from unix.RawSockaddrInet6 if buf.msghdr.Namelen > 0 && buf.msghdr.Namelen <= uint32(len(buf.nameBuf)) { copy((*(*[unix.SizeofSockaddrInet6]byte)(unsafe.Pointer(&from)))[:], buf.nameBuf[:buf.msghdr.Namelen]) } - + // Get buffer from pool and copy data dataBufPtr := recvPacketDataPool.Get().(*[]byte) dataBuf := *dataBufPtr @@ -1402,7 +1402,7 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { dataBuf = dataBuf[:n] } copy(dataBuf, buf.payloadBuf[:n]) - + // Copy control messages if present var controlBuf []byte var controlBufPtr *[]byte @@ -1412,14 +1412,14 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) { controlBuf = (*controlBufPtr)[:controllen] copy(controlBuf, buf.controlBuf[:controllen]) } - + packets = append(packets, RecvPacket{ - Data: dataBuf, - N: n, - From: &from, - Flags: flags, - Control: controlBuf, - Controllen: controllen, + Data: dataBuf, + N: n, + From: &from, + Flags: flags, + Control: controlBuf, + Controllen: controllen, RecycleFunc: func() { // Return buffers to pool recvPacketDataPool.Put(dataBufPtr) @@ -1438,9 +1438,9 @@ func (r *ioUringRecvState) Close() error { if r == nil { return nil } - + r.closed.Store(true) - + r.mu.Lock() defer r.mu.Unlock()