diff --git a/udp/udp_linux.go b/udp/udp_linux.go index 5ae5847b..a1ccd8e2 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -430,8 +430,18 @@ func (u *StdConn) listenOutSingle(r EncReader, flush func()) error { } } -func (u *StdConn) listenOutBatch(r EncReader, flush func()) error { +func getFrom(names [][]byte, i int, isV4 bool) netip.AddrPort { var ip netip.Addr + // Its ok to skip the ok check here, the slicing is the only error that can occur and it will panic + if isV4 { + ip, _ = netip.AddrFromSlice(names[i][4:8]) + } else { + ip, _ = netip.AddrFromSlice(names[i][8:24]) + } + return netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4])) +} + +func (u *StdConn) listenOutBatch(r EncReader, flush func()) error { var n int var operr error @@ -476,15 +486,9 @@ func (u *StdConn) listenOutBatch(r EncReader, flush func()) error { // Phase 1: gather every segment from this recvmmsg into rxOrder, // splitting GRO superpackets into their constituent segments. - u.rxOrder.reset() + //todo u.rxOrder.reset() for i := 0; i < n; i++ { - // Its ok to skip the ok check here, the slicing is the only error that can occur and it will panic - if u.isV4 { - ip, _ = netip.AddrFromSlice(names[i][4:8]) - } else { - ip, _ = netip.AddrFromSlice(names[i][8:24]) - } - from := netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4])) + from := getFrom(names, i, u.isV4) payload := buffers[i][:msgs[i].Len] segSize := 0 @@ -492,17 +496,28 @@ func (u *StdConn) listenOutBatch(r EncReader, flush func()) error { if cmsgSpace > 0 { segSize, outerECN = parseRecvCmsg(&msgs[i].Hdr, u.groSupported, u.ecnRecvSupported, u.isV4) } - u.rxOrder.addEntry(from, payload, segSize, outerECN) + + if segSize <= 0 || segSize >= len(payload) { + r(from, payload, RxMeta{OuterECN: outerECN}) + } else { + for off := 0; off < len(payload); off += segSize { + end := off + segSize + if end > len(payload) { + end = len(payload) + } + seg := payload[off:end] + r(from, seg, RxMeta{OuterECN: outerECN}) + } + } + + //todo u.rxOrder.addEntry(from, payload, segSize, outerECN) } - // Phase 2 + 3: stable-sort by (src, port, counter), then deliver in - // order. Reorder distance is bounded by len(u.rxOrder.buf), which - // stays well within the receiver's ReplayWindow (currently 8192) so - // older arrivals are not rejected as replays. - u.rxOrder.sortStable() - u.rxOrder.deliver(r) - // End-of-batch: let callers (e.g. TUN write coalescer) flush any - // state they accumulated across this batch. + // stable-sort by (src, port, counter), then deliver in order. + // this is on top of the sort performed before decrypt + //todo u.rxOrder.sortStable() + //todo u.rxOrder.deliver(r) + // let callers (e.g. TUN write coalescer) flush any state they accumulated across this batch. flush() } }