From c73b2dfbc71cd3de241808733df47f7f599b3271 Mon Sep 17 00:00:00 2001 From: Ryan Huber Date: Mon, 3 Nov 2025 10:45:30 +0000 Subject: [PATCH] fixed fallback for non io_uring packet send/recv --- cert/pem.go | 9 +++-- udp/udp_linux.go | 102 ++++++++++++++++++++++++++++++++++++----------- 2 files changed, 85 insertions(+), 26 deletions(-) diff --git a/cert/pem.go b/cert/pem.go index c04e63b..484c1e9 100644 --- a/cert/pem.go +++ b/cert/pem.go @@ -1,6 +1,7 @@ package cert import ( + "encoding/hex" "encoding/pem" "fmt" "time" @@ -249,9 +250,11 @@ func UnmarshalNebulaCertificateFromPEM(b []byte) (*NebulaCertificate, []byte, er // Handle issuer if c.Issuer() != "" { - // Convert hex string fingerprint back to bytes (this is an approximation) - // The old API used raw bytes, new API uses hex string - nc.Details.Issuer = []byte(c.Issuer()) + issuerBytes, err := hex.DecodeString(c.Issuer()) + if err != nil { + return nil, rest, fmt.Errorf("failed to decode issuer fingerprint: %w", err) + } + nc.Details.Issuer = issuerBytes } return nc, rest, nil diff --git a/udp/udp_linux.go b/udp/udp_linux.go index 7dd04f7..0e56efc 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -1406,21 +1406,21 @@ func (u *StdConn) ListenOut(r EncReader) { useIoUringRecv := recvRing != nil && u.ioRecvActive.Load() u.l.WithFields(logrus.Fields{ - "batch": u.batch, - "io_uring_send": u.ioState.Load() != nil, - "io_uring_recv": useIoUringRecv, + "batch": u.batch, + "io_uring_send": u.ioState.Load() != nil, + "io_uring_recv": useIoUringRecv, }).Info("ListenOut starting") if useIoUringRecv { // Use dedicated io_uring receive ring u.l.Info("ListenOut: using io_uring receive path") - + // Pre-fill the receive queue now that we're ready to receive if err := recvRing.fillRecvQueue(); err != nil { u.l.WithError(err).Error("Failed to fill receive queue") return } - + for { // Receive packets from io_uring (wait=true blocks until at least one packet arrives) packets, err := recvRing.receivePackets(true) @@ -1459,7 +1459,7 @@ func (u *StdConn) ListenOut(r EncReader) { u.l.WithField("family", pkt.From.Family).Warn("Received packet with unexpected address family") continue } - + ip, _ = netip.AddrFromSlice(pkt.From.Addr[:]) addr := netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16((*[2]byte)(unsafe.Pointer(&pkt.From.Port))[:])) payload := pkt.Data[:pkt.N] @@ -1471,7 +1471,7 @@ func (u *StdConn) ListenOut(r EncReader) { release() } } - + // Check for GRO segments handled := false if pkt.Controllen > 0 && len(pkt.Control) > 0 { @@ -1498,7 +1498,7 @@ func (u *StdConn) ListenOut(r EncReader) { } } } - + if !handled { r(addr, payload, releaseOnce) } @@ -1628,6 +1628,57 @@ func isEAgain(err error) bool { return false } +func (u *StdConn) readSingleSyscall(msgs []rawMessage) (int, error) { + if len(msgs) == 0 { + return 0, nil + } + for { + n, _, errno := unix.Syscall6( + unix.SYS_RECVMSG, + uintptr(u.sysFd), + uintptr(unsafe.Pointer(&msgs[0].Hdr)), + 0, + 0, + 0, + 0, + ) + if errno != 0 { + err := syscall.Errno(errno) + if err == unix.EINTR { + continue + } + return 0, &net.OpError{Op: "recvmsg", Err: err} + } + msgs[0].Len = uint32(n) + return 1, nil + } +} + +func (u *StdConn) readMultiSyscall(msgs []rawMessage) (int, error) { + if len(msgs) == 0 { + return 0, nil + } + for { + n, _, errno := unix.Syscall6( + unix.SYS_RECVMMSG, + uintptr(u.sysFd), + uintptr(unsafe.Pointer(&msgs[0])), + uintptr(len(msgs)), + unix.MSG_WAITFORONE, + 0, + 0, + ) + if errno != 0 { + err := syscall.Errno(errno) + if err == unix.EINTR { + continue + } + return 0, &net.OpError{Op: "recvmmsg", Err: err} + } + return int(n), nil + } +} + func (u *StdConn) ReadSingle(msgs []rawMessage) (int, error) { if len(msgs) == 0 { return 0, nil @@ -1637,8 +1688,7 @@ func (u *StdConn) ReadSingle(msgs []rawMessage) (int, error) { state := u.ioState.Load() if state == nil { - u.l.Error("ReadSingle: io_uring not initialized") - return 0, &net.OpError{Op: "recvmsg", Err: errors.New("io_uring not initialized")} + return u.readSingleSyscall(msgs) } u.l.Debug("ReadSingle: converting rawMessage to unix.Msghdr") @@ -1679,8 +1729,7 @@ func (u *StdConn) ReadMulti(msgs []rawMessage) (int, error) { state := u.ioState.Load() if state == nil { - u.l.Error("ReadMulti: io_uring not initialized") - return 0, &net.OpError{Op: "recvmsg", Err: errors.New("io_uring not initialized")} + return u.readMultiSyscall(msgs) } count := 0 @@ -2176,7 +2225,14 @@ func (u *StdConn) directWrite(b []byte, addr netip.AddrPort) error { "remote_is_v6": addr.Addr().Is6(), }).Debug("io_uring directWrite invoked") if state == nil { - return errors.New("io_uring state unavailable") + written, err := u.sendMsgSync(addr, b, nil, 0) + if err != nil { + return err + } + if written != len(b) { + return fmt.Errorf("sendmsg short write: wrote %d expected %d", written, len(b)) + } + return nil } n, err := u.sendMsgIOUring(state, addr, b, nil, 0) if err != nil { @@ -2401,11 +2457,11 @@ func (u *StdConn) configureIOUring(enable bool, c *config.C) { if u.ioState.Load() != nil { return } - + // Serialize io_uring initialization globally to avoid kernel resource races ioUringInitMu.Lock() defer ioUringInitMu.Unlock() - + var configured uint32 requestedBatch := ioUringDefaultMaxBatch if c != nil { @@ -2463,25 +2519,25 @@ func (u *StdConn) configureIOUring(enable bool, c *config.C) { } else { u.l.WithFields(fields).Debug("UDP io_uring send path enabled") } - + // Initialize dedicated receive ring with retry logic recvPoolSize := 128 // Number of receive operations to keep queued recvBufferSize := defaultGROReadBufferSize if recvBufferSize < MTU { recvBufferSize = MTU } - + var recvRing *ioUringRecvState maxRetries := 10 retryDelay := 10 * time.Millisecond - + for attempt := 0; attempt < maxRetries; attempt++ { var err error recvRing, err = newIoUringRecvState(u.sysFd, configured, recvPoolSize, recvBufferSize) if err == nil { break } - + if attempt < maxRetries-1 { u.l.WithFields(logrus.Fields{ "attempt": attempt + 1, @@ -2494,7 +2550,7 @@ func (u *StdConn) configureIOUring(enable bool, c *config.C) { u.l.WithError(err).Error("Failed to create io_uring receive ring after retries; will use standard recvmsg") } } - + if recvRing != nil { u.ioRecvState.Store(recvRing) u.ioRecvActive.Store(true) @@ -2505,7 +2561,7 @@ func (u *StdConn) configureIOUring(enable bool, c *config.C) { }).Info("UDP io_uring receive path enabled") // Note: receive queue will be filled on first receivePackets() call } - + return } @@ -2604,7 +2660,7 @@ func (u *StdConn) configureGSO(enable bool, c *config.C) { if c != nil { desiredShards = c.GetInt("listen.send_shards", 0) } - + // io_uring requires 1 shard due to shared ring mutex contention if u.ioState.Load() != nil { if desiredShards > 1 { @@ -2617,7 +2673,7 @@ func (u *StdConn) configureGSO(enable bool, c *config.C) { } desiredShards = 1 } - + // Only resize if actually changing shard count if len(u.sendShards) != desiredShards { u.resizeSendShards(desiredShards)