From 382b15ac5294b6cdeb2dab0f93bfbfd03c250d81 Mon Sep 17 00:00:00 2001 From: JackDoan Date: Tue, 21 Apr 2026 17:19:32 -0500 Subject: [PATCH] haha yep faster --- udp/udp_linux.go | 186 ++++++++++++++++++++++++++++++++++++-------- udp/udp_linux_32.go | 16 ---- udp/udp_linux_64.go | 16 ---- 3 files changed, 153 insertions(+), 65 deletions(-) diff --git a/udp/udp_linux.go b/udp/udp_linux.go index 40fd0463..e3c28d5d 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -32,6 +32,17 @@ type StdConn struct { writeIovs []iovec writeNames [][]byte + // Per-entry UDP_SEGMENT cmsg scratch. writeCmsg is one contiguous slab + // of MaxWriteBatch * writeCmsgSpace bytes; each entry's cmsg header is + // pre-filled once in prepareWriteMessages. WriteBatch only rewrites the + // 2-byte gso_size payload (and toggles Hdr.Control on/off) per call. + writeCmsg []byte + writeCmsgSpace int + + // writeEntryEnd[e] is the bufs index *after* the last packet packed + // into mmsghdr entry e. Used to rewind `i` on partial sendmmsg success. + writeEntryEnd []int + // Preallocated closure + in/out slots for sendmmsg, so the hot path // does not heap-allocate a fresh closure per call. writeChunk int @@ -104,6 +115,34 @@ func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch in return out, nil } +// prepareWriteMessages allocates one mmsghdr/iovec/sockaddr/cmsg scratch +// slot per sendmmsg entry. The iovec slab is sized to the same n so a +// single entry can fan out to up to n iovecs (needed for UDP_SEGMENT runs +// that coalesce consecutive bufs into one entry). Hdr.Iov / Hdr.Iovlen / +// Hdr.Control / Hdr.Controllen are wired per call since each entry can +// span a variable number of iovecs and may or may not carry a cmsg. +func (u *StdConn) prepareWriteMessages(n int) { + u.writeMsgs = make([]rawMessage, n) + u.writeIovs = make([]iovec, n) + u.writeNames = make([][]byte, n) + u.writeEntryEnd = make([]int, n) + + u.writeCmsgSpace = unix.CmsgSpace(2) + u.writeCmsg = make([]byte, n*u.writeCmsgSpace) + for k := 0; k < n; k++ { + off := k * u.writeCmsgSpace + h := (*unix.Cmsghdr)(unsafe.Pointer(&u.writeCmsg[off])) + h.Level = unix.SOL_UDP + h.Type = unix.UDP_SEGMENT + setCmsgLen(h, unix.CmsgLen(2)) + } + + for i := range u.writeMsgs { + u.writeNames[i] = make([]byte, unix.SizeofSockaddrInet6) + u.writeMsgs[i].Hdr.Name = &u.writeNames[i][0] + } +} + // maxGSOSegments caps the per-sendmsg GSO fan-out. Linux kernels have // historically capped UDP_MAX_SEGMENTS at 64; newer kernels raise it to 128 // but we stay conservative so the same code works everywhere. @@ -318,62 +357,143 @@ func (u *StdConn) WriteTo(b []byte, ip netip.AddrPort) error { } // WriteBatch sends bufs via sendmmsg(2) using the preallocated scratch on -// StdConn. Chunks larger than the scratch are processed in multiple syscalls. -// If sendmmsg returns a fatal error mid-chunk we fall back to single WriteTo -// calls for the remainder so the caller still gets best-effort delivery. +// StdConn. Consecutive packets to the same destination with matching segment +// sizes (all but possibly the last) are coalesced into a single mmsghdr entry +// carrying a UDP_SEGMENT cmsg, so one syscall can mix runs of GSO superpackets +// with plain one-off datagrams. Without GSO support every packet is its own +// entry, matching the prior behaviour. +// +// Chunks larger than the scratch are processed across multiple syscalls. If +// sendmmsg returns a fatal error before any entry is sent we fall back to +// per-packet WriteTo for that chunk so the caller still gets best-effort +// delivery. func (u *StdConn) WriteBatch(bufs [][]byte, addrs []netip.AddrPort) error { if len(bufs) != len(addrs) { return fmt.Errorf("WriteBatch: len(bufs)=%d != len(addrs)=%d", len(bufs), len(addrs)) } - //u.l.WithField("bufs", len(bufs)).Info("WriteBatch") + i := 0 for i < len(bufs) { - chunk := len(bufs) - i - if chunk > len(u.writeMsgs) { - chunk = len(u.writeMsgs) - } + baseI := i + entry := 0 + iovIdx := 0 - for k := 0; k < chunk; k++ { - b := bufs[i+k] - if len(b) == 0 { - // sendmmsg with an empty iovec is legal but pointless; fall - // through after filling the slot so Base is still valid. - u.writeIovs[k].Base = nil - setIovLen(&u.writeIovs[k], 0) - } else { - u.writeIovs[k].Base = &b[0] - setIovLen(&u.writeIovs[k], len(b)) + for entry < len(u.writeMsgs) && i < len(bufs) { + iovBudget := len(u.writeIovs) - iovIdx + if iovBudget < 1 { + break } - nlen, err := writeSockaddr(u.writeNames[k], addrs[i+k], u.isV4) + runLen, segSize := u.planRun(bufs, addrs, i, iovBudget) + if runLen == 0 { + break + } + + for k := 0; k < runLen; k++ { + b := bufs[i+k] + if len(b) == 0 { + u.writeIovs[iovIdx+k].Base = nil + setIovLen(&u.writeIovs[iovIdx+k], 0) + } else { + u.writeIovs[iovIdx+k].Base = &b[0] + setIovLen(&u.writeIovs[iovIdx+k], len(b)) + } + } + + nlen, err := writeSockaddr(u.writeNames[entry], addrs[i], u.isV4) if err != nil { return err } - u.writeMsgs[k].Hdr.Namelen = uint32(nlen) + + hdr := &u.writeMsgs[entry].Hdr + hdr.Iov = &u.writeIovs[iovIdx] + setMsgIovlen(hdr, runLen) + hdr.Namelen = uint32(nlen) + + if runLen >= 2 { + off := entry * u.writeCmsgSpace + dataOff := off + unix.CmsgLen(0) + binary.NativeEndian.PutUint16(u.writeCmsg[dataOff:dataOff+2], uint16(segSize)) + hdr.Control = &u.writeCmsg[off] + setMsgControllen(hdr, u.writeCmsgSpace) + } else { + hdr.Control = nil + setMsgControllen(hdr, 0) + } + + i += runLen + iovIdx += runLen + u.writeEntryEnd[entry] = i + entry++ } - sent, serr := u.sendmmsg(chunk) - if serr != nil { - if sent <= 0 { - // nothing went out; fall back to WriteTo for this chunk. - for k := 0; k < chunk; k++ { - if err := u.WriteTo(bufs[i+k], addrs[i+k]); err != nil { - return err - } + if entry == 0 { + return fmt.Errorf("sendmmsg: no progress") + } + + sent, serr := u.sendmmsg(entry) + if serr != nil && sent <= 0 { + // Nothing went out for this chunk; fall back to WriteTo for each + // packet that was queued this iteration. + for k := baseI; k < i; k++ { + if werr := u.WriteTo(bufs[k], addrs[k]); werr != nil { + return werr } - i += chunk - continue } - // partial: treat as success for the sent packets and retry the - // remainder on the next outer-loop iteration. + continue } if sent == 0 { return fmt.Errorf("sendmmsg made no progress") } - i += sent + // Rewind i to the end of the last successfully sent entry. For a + // full-success send this leaves i unchanged; for a partial send it + // replays the remainder on the next outer-loop iteration. + i = u.writeEntryEnd[sent-1] } return nil } +// planRun groups consecutive packets starting at `start` that can be sent as +// a single UDP GSO superpacket (one sendmmsg entry with UDP_SEGMENT cmsg). +// A run of length 1 means the entry carries no cmsg and the kernel treats +// it as a plain datagram. Returns the run length and the per-segment size +// (which equals len(bufs[start])). Without GSO support every call returns +// runLen=1. +func (u *StdConn) planRun(bufs [][]byte, addrs []netip.AddrPort, start, iovBudget int) (int, int) { + if start >= len(bufs) || iovBudget < 1 { + return 0, 0 + } + segSize := len(bufs[start]) + if !u.gsoSupported || segSize == 0 || segSize > maxGSOBytes { + return 1, segSize + } + dst := addrs[start] + maxLen := maxGSOSegments + if iovBudget < maxLen { + maxLen = iovBudget + } + runLen := 1 + total := segSize + for runLen < maxLen && start+runLen < len(bufs) { + nextLen := len(bufs[start+runLen]) + if nextLen == 0 || nextLen > segSize { + break + } + if addrs[start+runLen] != dst { + break + } + if total+nextLen > maxGSOBytes { + break + } + total += nextLen + runLen++ + if nextLen < segSize { + // A short packet must be the last in the run. + break + } + } + return runLen, segSize +} + // sendmmsgRawWrite is the preallocated callback passed to rawConn.Write. It // reads its input (u.writeChunk) and writes its outputs (u.writeSent, // u.writeErrno) through StdConn fields so the closure itself does not diff --git a/udp/udp_linux_32.go b/udp/udp_linux_32.go index 1fd469f3..1d4c7fbb 100644 --- a/udp/udp_linux_32.go +++ b/udp/udp_linux_32.go @@ -53,22 +53,6 @@ func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) { return msgs, buffers, names } -// prepareWriteMessages allocates one Mmsghdr/iovec/sockaddr scratch per slot, -// wired up so each writeMsgs[i] already points at writeIovs[i] and -// writeNames[i]. Callers fill in the iovec Base/Len, the sockaddr bytes, and -// Namelen before each sendmmsg. -func (u *StdConn) prepareWriteMessages(n int) { - u.writeMsgs = make([]rawMessage, n) - u.writeIovs = make([]iovec, n) - u.writeNames = make([][]byte, n) - for i := range u.writeMsgs { - u.writeNames[i] = make([]byte, unix.SizeofSockaddrInet6) - u.writeMsgs[i].Hdr.Iov = &u.writeIovs[i] - u.writeMsgs[i].Hdr.Iovlen = 1 - u.writeMsgs[i].Hdr.Name = &u.writeNames[i][0] - } -} - func setIovLen(v *iovec, n int) { v.Len = uint32(n) } diff --git a/udp/udp_linux_64.go b/udp/udp_linux_64.go index 6f5008c0..0293b1a4 100644 --- a/udp/udp_linux_64.go +++ b/udp/udp_linux_64.go @@ -56,22 +56,6 @@ func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) { return msgs, buffers, names } -// prepareWriteMessages allocates one Mmsghdr/iovec/sockaddr scratch per slot, -// wired up so each writeMsgs[i] already points at writeIovs[i] and -// writeNames[i]. Callers fill in the iovec Base/Len, the sockaddr bytes, and -// Namelen before each sendmmsg. -func (u *StdConn) prepareWriteMessages(n int) { - u.writeMsgs = make([]rawMessage, n) - u.writeIovs = make([]iovec, n) - u.writeNames = make([][]byte, n) - for i := range u.writeMsgs { - u.writeNames[i] = make([]byte, unix.SizeofSockaddrInet6) - u.writeMsgs[i].Hdr.Iov = &u.writeIovs[i] - u.writeMsgs[i].Hdr.Iovlen = 1 - u.writeMsgs[i].Hdr.Name = &u.writeNames[i][0] - } -} - func setIovLen(v *iovec, n int) { v.Len = uint64(n) }