disable sort-on-RX, CPU pinning seems to work for now

This commit is contained in:
JackDoan
2026-05-04 11:57:33 -05:00
parent 924268cc1f
commit e3e8622b98

View File

@@ -430,8 +430,18 @@ func (u *StdConn) listenOutSingle(r EncReader, flush func()) error {
} }
} }
func (u *StdConn) listenOutBatch(r EncReader, flush func()) error { func getFrom(names [][]byte, i int, isV4 bool) netip.AddrPort {
var ip netip.Addr var ip netip.Addr
// Its ok to skip the ok check here, the slicing is the only error that can occur and it will panic
if isV4 {
ip, _ = netip.AddrFromSlice(names[i][4:8])
} else {
ip, _ = netip.AddrFromSlice(names[i][8:24])
}
return netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4]))
}
func (u *StdConn) listenOutBatch(r EncReader, flush func()) error {
var n int var n int
var operr error var operr error
@@ -476,15 +486,9 @@ func (u *StdConn) listenOutBatch(r EncReader, flush func()) error {
// Phase 1: gather every segment from this recvmmsg into rxOrder, // Phase 1: gather every segment from this recvmmsg into rxOrder,
// splitting GRO superpackets into their constituent segments. // splitting GRO superpackets into their constituent segments.
u.rxOrder.reset() //todo u.rxOrder.reset()
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
// Its ok to skip the ok check here, the slicing is the only error that can occur and it will panic from := getFrom(names, i, u.isV4)
if u.isV4 {
ip, _ = netip.AddrFromSlice(names[i][4:8])
} else {
ip, _ = netip.AddrFromSlice(names[i][8:24])
}
from := netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4]))
payload := buffers[i][:msgs[i].Len] payload := buffers[i][:msgs[i].Len]
segSize := 0 segSize := 0
@@ -492,17 +496,28 @@ func (u *StdConn) listenOutBatch(r EncReader, flush func()) error {
if cmsgSpace > 0 { if cmsgSpace > 0 {
segSize, outerECN = parseRecvCmsg(&msgs[i].Hdr, u.groSupported, u.ecnRecvSupported, u.isV4) segSize, outerECN = parseRecvCmsg(&msgs[i].Hdr, u.groSupported, u.ecnRecvSupported, u.isV4)
} }
u.rxOrder.addEntry(from, payload, segSize, outerECN)
if segSize <= 0 || segSize >= len(payload) {
r(from, payload, RxMeta{OuterECN: outerECN})
} else {
for off := 0; off < len(payload); off += segSize {
end := off + segSize
if end > len(payload) {
end = len(payload)
}
seg := payload[off:end]
r(from, seg, RxMeta{OuterECN: outerECN})
}
} }
// Phase 2 + 3: stable-sort by (src, port, counter), then deliver in //todo u.rxOrder.addEntry(from, payload, segSize, outerECN)
// order. Reorder distance is bounded by len(u.rxOrder.buf), which }
// stays well within the receiver's ReplayWindow (currently 8192) so
// older arrivals are not rejected as replays. // stable-sort by (src, port, counter), then deliver in order.
u.rxOrder.sortStable() // this is on top of the sort performed before decrypt
u.rxOrder.deliver(r) //todo u.rxOrder.sortStable()
// End-of-batch: let callers (e.g. TUN write coalescer) flush any //todo u.rxOrder.deliver(r)
// state they accumulated across this batch. // let callers (e.g. TUN write coalescer) flush any state they accumulated across this batch.
flush() flush()
} }
} }