fix compile for 386

This commit is contained in:
Ryan Huber
2025-11-03 10:12:02 +00:00
parent b394112ad9
commit 3dea761530

View File

@@ -18,18 +18,18 @@ import (
) )
const ( const (
ioringOpSendmsg = 9 ioringOpSendmsg = 9
ioringOpRecvmsg = 10 ioringOpRecvmsg = 10
ioringEnterGetevents = 1 << 0 ioringEnterGetevents = 1 << 0
ioringSetupClamp = 1 << 4 ioringSetupClamp = 1 << 4
ioringSetupCoopTaskrun = 1 << 8 // Kernel 5.19+: reduce thread creation ioringSetupCoopTaskrun = 1 << 8 // Kernel 5.19+: reduce thread creation
ioringSetupSingleIssuer = 1 << 12 // Kernel 6.0+: single submitter optimization ioringSetupSingleIssuer = 1 << 12 // Kernel 6.0+: single submitter optimization
ioringRegisterIowqMaxWorkers = 19 // Register opcode to limit workers ioringRegisterIowqMaxWorkers = 19 // Register opcode to limit workers
ioringOffSqRing = 0 ioringOffSqRing = 0
ioringOffCqRing = 0x8000000 ioringOffCqRing = 0x8000000
ioringOffSqes = 0x10000000 ioringOffSqes = 0x10000000
defaultIoUringEntries = 256 defaultIoUringEntries = 256
ioUringSqeSize = 64 // struct io_uring_sqe size defined by kernel ABI ioUringSqeSize = 64 // struct io_uring_sqe size defined by kernel ABI
) )
type ioSqringOffsets struct { type ioSqringOffsets struct {
@@ -170,13 +170,13 @@ type ioUringState struct {
// recvBuffer represents a single receive operation with its associated buffers // recvBuffer represents a single receive operation with its associated buffers
type recvBuffer struct { type recvBuffer struct {
payloadBuf []byte // Buffer for packet data payloadBuf []byte // Buffer for packet data
nameBuf []byte // Buffer for source address nameBuf []byte // Buffer for source address
controlBuf []byte // Buffer for control messages controlBuf []byte // Buffer for control messages
msghdr *unix.Msghdr // Message header for recvmsg msghdr *unix.Msghdr // Message header for recvmsg
iovec *unix.Iovec // IO vector pointing to payloadBuf iovec *unix.Iovec // IO vector pointing to payloadBuf
userData uint64 // User data for tracking this operation userData uint64 // User data for tracking this operation
inFlight atomic.Bool // Whether this buffer has a pending io_uring operation inFlight atomic.Bool // Whether this buffer has a pending io_uring operation
} }
// ioUringRecvState manages a dedicated io_uring for receiving packets // ioUringRecvState manages a dedicated io_uring for receiving packets
@@ -200,16 +200,16 @@ type ioUringRecvState struct {
cqRingMask *uint32 cqRingMask *uint32
cqRingEntries *uint32 cqRingEntries *uint32
mu sync.Mutex mu sync.Mutex
userData uint64 userData uint64
bufferPool []*recvBuffer // Pool of all receive buffers bufferPool []*recvBuffer // Pool of all receive buffers
bufferMap map[uint64]*recvBuffer // Map userData -> buffer bufferMap map[uint64]*recvBuffer // Map userData -> buffer
sqEntryCount uint32 sqEntryCount uint32
cqEntryCount uint32 cqEntryCount uint32
sockFd int // Socket file descriptor to receive from sockFd int // Socket file descriptor to receive from
closed atomic.Bool closed atomic.Bool
} }
func alignUint32(v, alignment uint32) uint32 { func alignUint32(v, alignment uint32) uint32 {
@@ -235,12 +235,12 @@ func newIoUringState(entries uint32) (*ioUringState, error) {
tries := entries tries := entries
var params ioUringParams var params ioUringParams
// Try flag combinations in order (5.19+ -> baseline) // Try flag combinations in order (5.19+ -> baseline)
// Note: SINGLE_ISSUER causes EEXIST errors, so it's excluded // Note: SINGLE_ISSUER causes EEXIST errors, so it's excluded
flagSets := []uint32{ flagSets := []uint32{
ioringSetupClamp | ioringSetupCoopTaskrun, // Kernel 5.19+: reduce thread creation ioringSetupClamp | ioringSetupCoopTaskrun, // Kernel 5.19+: reduce thread creation
ioringSetupClamp, // All kernels ioringSetupClamp, // All kernels
} }
flagSetIdx := 0 flagSetIdx := 0
@@ -417,7 +417,7 @@ func (r *ioUringState) submitAndWaitLocked(submit, wait uint32) error {
if wait > 0 { if wait > 0 {
flags = ioringEnterGetevents flags = ioringEnterGetevents
} }
for { for {
_, _, errno := unix.Syscall6(unix.SYS_IO_URING_ENTER, uintptr(r.fd), uintptr(submit), uintptr(wait), flags, 0, 0) _, _, errno := unix.Syscall6(unix.SYS_IO_URING_ENTER, uintptr(r.fd), uintptr(submit), uintptr(wait), flags, 0, 0)
if errno == 0 { if errno == 0 {
@@ -1024,14 +1024,14 @@ var recvControlDataPool = sync.Pool{
// poolSize determines how many receive operations to keep queued // poolSize determines how many receive operations to keep queued
func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize int) (*ioUringRecvState, error) { func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize int) (*ioUringRecvState, error) {
const minEntries = 8 const minEntries = 8
if poolSize < 1 { if poolSize < 1 {
poolSize = 64 // Default pool size poolSize = 64 // Default pool size
} }
if poolSize > 2048 { if poolSize > 2048 {
poolSize = 2048 // Cap pool size poolSize = 2048 // Cap pool size
} }
if entries == 0 { if entries == 0 {
entries = uint32(poolSize) entries = uint32(poolSize)
} }
@@ -1044,12 +1044,12 @@ func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize in
tries := entries tries := entries
var params ioUringParams var params ioUringParams
// Try flag combinations in order (5.19+ -> baseline) // Try flag combinations in order (5.19+ -> baseline)
// Note: SINGLE_ISSUER causes EEXIST errors, so it's excluded // Note: SINGLE_ISSUER causes EEXIST errors, so it's excluded
flagSets := []uint32{ flagSets := []uint32{
ioringSetupClamp | ioringSetupCoopTaskrun, // Kernel 5.19+: reduce thread creation ioringSetupClamp | ioringSetupCoopTaskrun, // Kernel 5.19+: reduce thread creation
ioringSetupClamp, // All kernels ioringSetupClamp, // All kernels
} }
flagSetIdx := 0 flagSetIdx := 0
@@ -1105,44 +1105,44 @@ func newIoUringRecvState(sockFd int, entries uint32, poolSize int, bufferSize in
userData: ring.userData, userData: ring.userData,
} }
ring.userData++ ring.userData++
// Initialize iovec to point to payload buffer // Initialize iovec to point to payload buffer
buf.iovec.Base = &buf.payloadBuf[0] buf.iovec.Base = &buf.payloadBuf[0]
buf.iovec.SetLen(len(buf.payloadBuf)) buf.iovec.SetLen(len(buf.payloadBuf))
// Initialize msghdr // Initialize msghdr
buf.msghdr.Name = &buf.nameBuf[0] buf.msghdr.Name = &buf.nameBuf[0]
buf.msghdr.Namelen = uint32(len(buf.nameBuf)) buf.msghdr.Namelen = uint32(len(buf.nameBuf))
buf.msghdr.Iov = buf.iovec buf.msghdr.Iov = buf.iovec
buf.msghdr.Iovlen = 1 buf.msghdr.Iovlen = 1
buf.msghdr.Control = &buf.controlBuf[0] buf.msghdr.Control = &buf.controlBuf[0]
buf.msghdr.Controllen = uint64(len(buf.controlBuf)) buf.msghdr.Controllen = controllen(len(buf.controlBuf))
ring.bufferPool[i] = buf ring.bufferPool[i] = buf
ring.bufferMap[buf.userData] = buf ring.bufferMap[buf.userData] = buf
}
logrus.WithFields(logrus.Fields{
"poolSize": poolSize,
"entries": ring.sqEntryCount,
"bufferSize": bufferSize,
}).Info("io_uring receive ring created")
// Limit kernel worker threads to prevent thousands being spawned
// [0] = bounded workers, [1] = unbounded workers
maxWorkers := [2]uint32{4, 4} // Limit to 4 workers of each type
_, _, errno = unix.Syscall6(
unix.SYS_IO_URING_REGISTER,
uintptr(fd),
uintptr(ioringRegisterIowqMaxWorkers),
uintptr(unsafe.Pointer(&maxWorkers[0])),
2, // array length
0, 0,
)
// Ignore errors - older kernels don't support this
return ring, nil
} }
logrus.WithFields(logrus.Fields{
"poolSize": poolSize,
"entries": ring.sqEntryCount,
"bufferSize": bufferSize,
}).Info("io_uring receive ring created")
// Limit kernel worker threads to prevent thousands being spawned
// [0] = bounded workers, [1] = unbounded workers
maxWorkers := [2]uint32{4, 4} // Limit to 4 workers of each type
_, _, errno = unix.Syscall6(
unix.SYS_IO_URING_REGISTER,
uintptr(fd),
uintptr(ioringRegisterIowqMaxWorkers),
uintptr(unsafe.Pointer(&maxWorkers[0])),
2, // array length
0, 0,
)
// Ignore errors - older kernels don't support this
return ring, nil
}
} }
func (r *ioUringRecvState) mapRings(params *ioUringParams) error { func (r *ioUringRecvState) mapRings(params *ioUringParams) error {
@@ -1215,7 +1215,7 @@ func (r *ioUringRecvState) submitRecvLocked(buf *recvBuffer) error {
// Reset buffer state for reuse // Reset buffer state for reuse
buf.msghdr.Namelen = uint32(len(buf.nameBuf)) buf.msghdr.Namelen = uint32(len(buf.nameBuf))
buf.msghdr.Controllen = uint64(len(buf.controlBuf)) buf.msghdr.Controllen = controllen(len(buf.controlBuf))
buf.msghdr.Flags = 0 buf.msghdr.Flags = 0
buf.iovec.SetLen(len(buf.payloadBuf)) buf.iovec.SetLen(len(buf.payloadBuf))
@@ -1241,9 +1241,9 @@ func (r *ioUringRecvState) submitRecvLocked(buf *recvBuffer) error {
r.sqArray[idx] = uint32(idx) r.sqArray[idx] = uint32(idx)
atomic.StoreUint32(r.sqTail, tail+1) atomic.StoreUint32(r.sqTail, tail+1)
buf.inFlight.Store(true) buf.inFlight.Store(true)
return nil return nil
} }
@@ -1297,7 +1297,7 @@ func (r *ioUringRecvState) fillRecvQueue() error {
if submitted > 0 { if submitted > 0 {
return r.submitAndWaitLocked(submitted, 0) return r.submitAndWaitLocked(submitted, 0)
} }
return nil return nil
} }
@@ -1321,12 +1321,12 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) {
submitted++ submitted++
} }
} }
waitCount := uint32(0) waitCount := uint32(0)
if wait { if wait {
waitCount = 1 waitCount = 1
} }
if submitted > 0 || wait { if submitted > 0 || wait {
if err := r.submitAndWaitLocked(submitted, waitCount); err != nil { if err := r.submitAndWaitLocked(submitted, waitCount); err != nil {
return nil, err return nil, err
@@ -1338,7 +1338,7 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) {
head := atomic.LoadUint32(r.cqHead) head := atomic.LoadUint32(r.cqHead)
tail := atomic.LoadUint32(r.cqTail) tail := atomic.LoadUint32(r.cqTail)
mask := *r.cqRingMask mask := *r.cqRingMask
completions := uint32(0) completions := uint32(0)
errors := 0 errors := 0
eagains := 0 eagains := 0
@@ -1346,23 +1346,23 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) {
for head != tail { for head != tail {
idx := head & mask idx := head & mask
cqe := &r.cqCqes[idx] cqe := &r.cqCqes[idx]
userData := cqe.UserData userData := cqe.UserData
res := cqe.Res res := cqe.Res
flags := cqe.Flags flags := cqe.Flags
head++ head++
atomic.StoreUint32(r.cqHead, head) atomic.StoreUint32(r.cqHead, head)
completions++ completions++
buf, ok := r.bufferMap[userData] buf, ok := r.bufferMap[userData]
if !ok { if !ok {
logrus.WithField("userData", userData).Warn("io_uring recv: unknown userData in completion") logrus.WithField("userData", userData).Warn("io_uring recv: unknown userData in completion")
continue continue
} }
buf.inFlight.Store(false) buf.inFlight.Store(false)
if res < 0 { if res < 0 {
errno := syscall.Errno(-res) errno := syscall.Errno(-res)
// EAGAIN is expected for non-blocking - just resubmit // EAGAIN is expected for non-blocking - just resubmit
@@ -1377,21 +1377,21 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) {
} }
continue continue
} }
if res == 0 { if res == 0 {
// Connection closed or no data // Connection closed or no data
continue continue
} }
// Successfully received packet // Successfully received packet
n := int(res) n := int(res)
// Copy address // Copy address
var from unix.RawSockaddrInet6 var from unix.RawSockaddrInet6
if buf.msghdr.Namelen > 0 && buf.msghdr.Namelen <= uint32(len(buf.nameBuf)) { if buf.msghdr.Namelen > 0 && buf.msghdr.Namelen <= uint32(len(buf.nameBuf)) {
copy((*(*[unix.SizeofSockaddrInet6]byte)(unsafe.Pointer(&from)))[:], buf.nameBuf[:buf.msghdr.Namelen]) copy((*(*[unix.SizeofSockaddrInet6]byte)(unsafe.Pointer(&from)))[:], buf.nameBuf[:buf.msghdr.Namelen])
} }
// Get buffer from pool and copy data // Get buffer from pool and copy data
dataBufPtr := recvPacketDataPool.Get().(*[]byte) dataBufPtr := recvPacketDataPool.Get().(*[]byte)
dataBuf := *dataBufPtr dataBuf := *dataBufPtr
@@ -1402,7 +1402,7 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) {
dataBuf = dataBuf[:n] dataBuf = dataBuf[:n]
} }
copy(dataBuf, buf.payloadBuf[:n]) copy(dataBuf, buf.payloadBuf[:n])
// Copy control messages if present // Copy control messages if present
var controlBuf []byte var controlBuf []byte
var controlBufPtr *[]byte var controlBufPtr *[]byte
@@ -1412,14 +1412,14 @@ func (r *ioUringRecvState) receivePackets(wait bool) ([]RecvPacket, error) {
controlBuf = (*controlBufPtr)[:controllen] controlBuf = (*controlBufPtr)[:controllen]
copy(controlBuf, buf.controlBuf[:controllen]) copy(controlBuf, buf.controlBuf[:controllen])
} }
packets = append(packets, RecvPacket{ packets = append(packets, RecvPacket{
Data: dataBuf, Data: dataBuf,
N: n, N: n,
From: &from, From: &from,
Flags: flags, Flags: flags,
Control: controlBuf, Control: controlBuf,
Controllen: controllen, Controllen: controllen,
RecycleFunc: func() { RecycleFunc: func() {
// Return buffers to pool // Return buffers to pool
recvPacketDataPool.Put(dataBufPtr) recvPacketDataPool.Put(dataBufPtr)
@@ -1438,9 +1438,9 @@ func (r *ioUringRecvState) Close() error {
if r == nil { if r == nil {
return nil return nil
} }
r.closed.Store(true) r.closed.Store(true)
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()