mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-16 04:47:38 +02:00
haha yep faster
This commit is contained in:
176
udp/udp_linux.go
176
udp/udp_linux.go
@@ -32,6 +32,17 @@ type StdConn struct {
|
||||
writeIovs []iovec
|
||||
writeNames [][]byte
|
||||
|
||||
// Per-entry UDP_SEGMENT cmsg scratch. writeCmsg is one contiguous slab
|
||||
// of MaxWriteBatch * writeCmsgSpace bytes; each entry's cmsg header is
|
||||
// pre-filled once in prepareWriteMessages. WriteBatch only rewrites the
|
||||
// 2-byte gso_size payload (and toggles Hdr.Control on/off) per call.
|
||||
writeCmsg []byte
|
||||
writeCmsgSpace int
|
||||
|
||||
// writeEntryEnd[e] is the bufs index *after* the last packet packed
|
||||
// into mmsghdr entry e. Used to rewind `i` on partial sendmmsg success.
|
||||
writeEntryEnd []int
|
||||
|
||||
// Preallocated closure + in/out slots for sendmmsg, so the hot path
|
||||
// does not heap-allocate a fresh closure per call.
|
||||
writeChunk int
|
||||
@@ -104,6 +115,34 @@ func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch in
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// prepareWriteMessages allocates one mmsghdr/iovec/sockaddr/cmsg scratch
|
||||
// slot per sendmmsg entry. The iovec slab is sized to the same n so a
|
||||
// single entry can fan out to up to n iovecs (needed for UDP_SEGMENT runs
|
||||
// that coalesce consecutive bufs into one entry). Hdr.Iov / Hdr.Iovlen /
|
||||
// Hdr.Control / Hdr.Controllen are wired per call since each entry can
|
||||
// span a variable number of iovecs and may or may not carry a cmsg.
|
||||
func (u *StdConn) prepareWriteMessages(n int) {
|
||||
u.writeMsgs = make([]rawMessage, n)
|
||||
u.writeIovs = make([]iovec, n)
|
||||
u.writeNames = make([][]byte, n)
|
||||
u.writeEntryEnd = make([]int, n)
|
||||
|
||||
u.writeCmsgSpace = unix.CmsgSpace(2)
|
||||
u.writeCmsg = make([]byte, n*u.writeCmsgSpace)
|
||||
for k := 0; k < n; k++ {
|
||||
off := k * u.writeCmsgSpace
|
||||
h := (*unix.Cmsghdr)(unsafe.Pointer(&u.writeCmsg[off]))
|
||||
h.Level = unix.SOL_UDP
|
||||
h.Type = unix.UDP_SEGMENT
|
||||
setCmsgLen(h, unix.CmsgLen(2))
|
||||
}
|
||||
|
||||
for i := range u.writeMsgs {
|
||||
u.writeNames[i] = make([]byte, unix.SizeofSockaddrInet6)
|
||||
u.writeMsgs[i].Hdr.Name = &u.writeNames[i][0]
|
||||
}
|
||||
}
|
||||
|
||||
// maxGSOSegments caps the per-sendmsg GSO fan-out. Linux kernels have
|
||||
// historically capped UDP_MAX_SEGMENTS at 64; newer kernels raise it to 128
|
||||
// but we stay conservative so the same code works everywhere.
|
||||
@@ -318,62 +357,143 @@ func (u *StdConn) WriteTo(b []byte, ip netip.AddrPort) error {
|
||||
}
|
||||
|
||||
// WriteBatch sends bufs via sendmmsg(2) using the preallocated scratch on
|
||||
// StdConn. Chunks larger than the scratch are processed in multiple syscalls.
|
||||
// If sendmmsg returns a fatal error mid-chunk we fall back to single WriteTo
|
||||
// calls for the remainder so the caller still gets best-effort delivery.
|
||||
// StdConn. Consecutive packets to the same destination with matching segment
|
||||
// sizes (all but possibly the last) are coalesced into a single mmsghdr entry
|
||||
// carrying a UDP_SEGMENT cmsg, so one syscall can mix runs of GSO superpackets
|
||||
// with plain one-off datagrams. Without GSO support every packet is its own
|
||||
// entry, matching the prior behaviour.
|
||||
//
|
||||
// Chunks larger than the scratch are processed across multiple syscalls. If
|
||||
// sendmmsg returns a fatal error before any entry is sent we fall back to
|
||||
// per-packet WriteTo for that chunk so the caller still gets best-effort
|
||||
// delivery.
|
||||
func (u *StdConn) WriteBatch(bufs [][]byte, addrs []netip.AddrPort) error {
|
||||
if len(bufs) != len(addrs) {
|
||||
return fmt.Errorf("WriteBatch: len(bufs)=%d != len(addrs)=%d", len(bufs), len(addrs))
|
||||
}
|
||||
//u.l.WithField("bufs", len(bufs)).Info("WriteBatch")
|
||||
|
||||
i := 0
|
||||
for i < len(bufs) {
|
||||
chunk := len(bufs) - i
|
||||
if chunk > len(u.writeMsgs) {
|
||||
chunk = len(u.writeMsgs)
|
||||
baseI := i
|
||||
entry := 0
|
||||
iovIdx := 0
|
||||
|
||||
for entry < len(u.writeMsgs) && i < len(bufs) {
|
||||
iovBudget := len(u.writeIovs) - iovIdx
|
||||
if iovBudget < 1 {
|
||||
break
|
||||
}
|
||||
runLen, segSize := u.planRun(bufs, addrs, i, iovBudget)
|
||||
if runLen == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
for k := 0; k < chunk; k++ {
|
||||
for k := 0; k < runLen; k++ {
|
||||
b := bufs[i+k]
|
||||
if len(b) == 0 {
|
||||
// sendmmsg with an empty iovec is legal but pointless; fall
|
||||
// through after filling the slot so Base is still valid.
|
||||
u.writeIovs[k].Base = nil
|
||||
setIovLen(&u.writeIovs[k], 0)
|
||||
u.writeIovs[iovIdx+k].Base = nil
|
||||
setIovLen(&u.writeIovs[iovIdx+k], 0)
|
||||
} else {
|
||||
u.writeIovs[k].Base = &b[0]
|
||||
setIovLen(&u.writeIovs[k], len(b))
|
||||
u.writeIovs[iovIdx+k].Base = &b[0]
|
||||
setIovLen(&u.writeIovs[iovIdx+k], len(b))
|
||||
}
|
||||
nlen, err := writeSockaddr(u.writeNames[k], addrs[i+k], u.isV4)
|
||||
}
|
||||
|
||||
nlen, err := writeSockaddr(u.writeNames[entry], addrs[i], u.isV4)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
u.writeMsgs[k].Hdr.Namelen = uint32(nlen)
|
||||
|
||||
hdr := &u.writeMsgs[entry].Hdr
|
||||
hdr.Iov = &u.writeIovs[iovIdx]
|
||||
setMsgIovlen(hdr, runLen)
|
||||
hdr.Namelen = uint32(nlen)
|
||||
|
||||
if runLen >= 2 {
|
||||
off := entry * u.writeCmsgSpace
|
||||
dataOff := off + unix.CmsgLen(0)
|
||||
binary.NativeEndian.PutUint16(u.writeCmsg[dataOff:dataOff+2], uint16(segSize))
|
||||
hdr.Control = &u.writeCmsg[off]
|
||||
setMsgControllen(hdr, u.writeCmsgSpace)
|
||||
} else {
|
||||
hdr.Control = nil
|
||||
setMsgControllen(hdr, 0)
|
||||
}
|
||||
|
||||
sent, serr := u.sendmmsg(chunk)
|
||||
if serr != nil {
|
||||
if sent <= 0 {
|
||||
// nothing went out; fall back to WriteTo for this chunk.
|
||||
for k := 0; k < chunk; k++ {
|
||||
if err := u.WriteTo(bufs[i+k], addrs[i+k]); err != nil {
|
||||
return err
|
||||
i += runLen
|
||||
iovIdx += runLen
|
||||
u.writeEntryEnd[entry] = i
|
||||
entry++
|
||||
}
|
||||
|
||||
if entry == 0 {
|
||||
return fmt.Errorf("sendmmsg: no progress")
|
||||
}
|
||||
|
||||
sent, serr := u.sendmmsg(entry)
|
||||
if serr != nil && sent <= 0 {
|
||||
// Nothing went out for this chunk; fall back to WriteTo for each
|
||||
// packet that was queued this iteration.
|
||||
for k := baseI; k < i; k++ {
|
||||
if werr := u.WriteTo(bufs[k], addrs[k]); werr != nil {
|
||||
return werr
|
||||
}
|
||||
}
|
||||
i += chunk
|
||||
continue
|
||||
}
|
||||
// partial: treat as success for the sent packets and retry the
|
||||
// remainder on the next outer-loop iteration.
|
||||
}
|
||||
if sent == 0 {
|
||||
return fmt.Errorf("sendmmsg made no progress")
|
||||
}
|
||||
i += sent
|
||||
// Rewind i to the end of the last successfully sent entry. For a
|
||||
// full-success send this leaves i unchanged; for a partial send it
|
||||
// replays the remainder on the next outer-loop iteration.
|
||||
i = u.writeEntryEnd[sent-1]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// planRun groups consecutive packets starting at `start` that can be sent as
|
||||
// a single UDP GSO superpacket (one sendmmsg entry with UDP_SEGMENT cmsg).
|
||||
// A run of length 1 means the entry carries no cmsg and the kernel treats
|
||||
// it as a plain datagram. Returns the run length and the per-segment size
|
||||
// (which equals len(bufs[start])). Without GSO support every call returns
|
||||
// runLen=1.
|
||||
func (u *StdConn) planRun(bufs [][]byte, addrs []netip.AddrPort, start, iovBudget int) (int, int) {
|
||||
if start >= len(bufs) || iovBudget < 1 {
|
||||
return 0, 0
|
||||
}
|
||||
segSize := len(bufs[start])
|
||||
if !u.gsoSupported || segSize == 0 || segSize > maxGSOBytes {
|
||||
return 1, segSize
|
||||
}
|
||||
dst := addrs[start]
|
||||
maxLen := maxGSOSegments
|
||||
if iovBudget < maxLen {
|
||||
maxLen = iovBudget
|
||||
}
|
||||
runLen := 1
|
||||
total := segSize
|
||||
for runLen < maxLen && start+runLen < len(bufs) {
|
||||
nextLen := len(bufs[start+runLen])
|
||||
if nextLen == 0 || nextLen > segSize {
|
||||
break
|
||||
}
|
||||
if addrs[start+runLen] != dst {
|
||||
break
|
||||
}
|
||||
if total+nextLen > maxGSOBytes {
|
||||
break
|
||||
}
|
||||
total += nextLen
|
||||
runLen++
|
||||
if nextLen < segSize {
|
||||
// A short packet must be the last in the run.
|
||||
break
|
||||
}
|
||||
}
|
||||
return runLen, segSize
|
||||
}
|
||||
|
||||
// sendmmsgRawWrite is the preallocated callback passed to rawConn.Write. It
|
||||
// reads its input (u.writeChunk) and writes its outputs (u.writeSent,
|
||||
// u.writeErrno) through StdConn fields so the closure itself does not
|
||||
|
||||
@@ -53,22 +53,6 @@ func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
|
||||
return msgs, buffers, names
|
||||
}
|
||||
|
||||
// prepareWriteMessages allocates one Mmsghdr/iovec/sockaddr scratch per slot,
|
||||
// wired up so each writeMsgs[i] already points at writeIovs[i] and
|
||||
// writeNames[i]. Callers fill in the iovec Base/Len, the sockaddr bytes, and
|
||||
// Namelen before each sendmmsg.
|
||||
func (u *StdConn) prepareWriteMessages(n int) {
|
||||
u.writeMsgs = make([]rawMessage, n)
|
||||
u.writeIovs = make([]iovec, n)
|
||||
u.writeNames = make([][]byte, n)
|
||||
for i := range u.writeMsgs {
|
||||
u.writeNames[i] = make([]byte, unix.SizeofSockaddrInet6)
|
||||
u.writeMsgs[i].Hdr.Iov = &u.writeIovs[i]
|
||||
u.writeMsgs[i].Hdr.Iovlen = 1
|
||||
u.writeMsgs[i].Hdr.Name = &u.writeNames[i][0]
|
||||
}
|
||||
}
|
||||
|
||||
func setIovLen(v *iovec, n int) {
|
||||
v.Len = uint32(n)
|
||||
}
|
||||
|
||||
@@ -56,22 +56,6 @@ func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
|
||||
return msgs, buffers, names
|
||||
}
|
||||
|
||||
// prepareWriteMessages allocates one Mmsghdr/iovec/sockaddr scratch per slot,
|
||||
// wired up so each writeMsgs[i] already points at writeIovs[i] and
|
||||
// writeNames[i]. Callers fill in the iovec Base/Len, the sockaddr bytes, and
|
||||
// Namelen before each sendmmsg.
|
||||
func (u *StdConn) prepareWriteMessages(n int) {
|
||||
u.writeMsgs = make([]rawMessage, n)
|
||||
u.writeIovs = make([]iovec, n)
|
||||
u.writeNames = make([][]byte, n)
|
||||
for i := range u.writeMsgs {
|
||||
u.writeNames[i] = make([]byte, unix.SizeofSockaddrInet6)
|
||||
u.writeMsgs[i].Hdr.Iov = &u.writeIovs[i]
|
||||
u.writeMsgs[i].Hdr.Iovlen = 1
|
||||
u.writeMsgs[i].Hdr.Name = &u.writeNames[i][0]
|
||||
}
|
||||
}
|
||||
|
||||
func setIovLen(v *iovec, n int) {
|
||||
v.Len = uint64(n)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user