diff --git a/outside.go b/outside.go index 7d2b6661..45a405b3 100644 --- a/outside.go +++ b/outside.go @@ -230,7 +230,7 @@ func (f *Interface) handleOutsideRelayPacket(hostinfo *HostInfo, via ViaSender, switch targetRelay.Type { case ForwardingType: // Forward this packet through the relay tunnel - // Find the target HostInfo + // Find the target HostInfo //todo it would potentially be nice to batch these f.SendVia(targetHI, targetRelay, signedPayload, nb, out, false) case TerminalType: hostinfo.logger(f.l).Error("Unexpected Relay Type of Terminal") diff --git a/udp/rx_reorder_linux.go b/udp/rx_reorder_linux.go deleted file mode 100644 index 7887b9f9..00000000 --- a/udp/rx_reorder_linux.go +++ /dev/null @@ -1,86 +0,0 @@ -//go:build !android && !e2e_testing -// +build !android,!e2e_testing - -package udp - -import ( - "cmp" - "net/netip" - "slices" -) - -// rxSegment is one nebula packet pulled out of a recvmmsg entry — either a -// lone datagram or one segment of a GRO superpacket. cnt is the big-endian -// uint64 message counter at bytes [8:16] of the nebula header; 0 if the -// segment is too short to contain a header. ecn is the 2-bit IP-level ECN -// codepoint stamped on the carrier (one value per slot, since GRO requires -// equal ECN across coalesced datagrams). -type rxSegment struct { - src netip.AddrPort - cnt uint64 - buf []byte - ecn byte -} - -// rxReorderBuffer accumulates one recvmmsg batch worth of segments, -// splits any GRO superpackets at gso_size boundaries, stable-sorts by -// (src, port, counter), then delivers in order. The reorder distance is -// bounded by len(buf), which the caller sizes to stay well within the -// receiver's ReplayWindow so older arrivals are not rejected as replays. -type rxReorderBuffer struct { - buf []rxSegment -} - -func newRxReorderBuffer(initialCap int) *rxReorderBuffer { - return &rxReorderBuffer{buf: make([]rxSegment, 0, initialCap)} -} - -// reset prepares the buffer for the next recvmmsg batch. -func (r *rxReorderBuffer) reset() { r.buf = r.buf[:0] } - -// addEntry expands one recvmmsg slot into rxSegments. When segSize <= 0 or -// segSize >= len(payload) the payload is appended as a single segment; -// otherwise the kernel-coalesced GRO superpacket is split at segSize -// boundaries (the kernel guarantees every segment is exactly segSize bytes -// except for the final one, which may be short). ecn applies uniformly to -// every produced segment because GRO requires equal ECN across coalesced -// datagrams. -func (r *rxReorderBuffer) addEntry(from netip.AddrPort, payload []byte, segSize int, ecn byte) { - if segSize <= 0 || segSize >= len(payload) { - r.buf = append(r.buf, rxSegment{from, headerCounter(payload), payload, ecn}) - return - } - for off := 0; off < len(payload); off += segSize { - end := off + segSize - if end > len(payload) { - end = len(payload) - } - seg := payload[off:end] - r.buf = append(r.buf, rxSegment{from, headerCounter(seg), seg, ecn}) - } -} - -// sortStable orders the accumulated segments by (src addr, src port, -// counter). Same-source segments are reordered into counter order; -// cross-source relative order is determined by a stable address compare so -// the sort is total and predictable. -func (r *rxReorderBuffer) sortStable() { - slices.SortStableFunc(r.buf, func(a, b rxSegment) int { - if c := a.src.Addr().Compare(b.src.Addr()); c != 0 { - return c - } - if c := cmp.Compare(a.src.Port(), b.src.Port()); c != 0 { - return c - } - return cmp.Compare(a.cnt, b.cnt) - }) -} - -// deliver invokes fn once per segment in sorted order, then nils the -// per-entry buf reference so the next batch's append doesn't alias it. -func (r *rxReorderBuffer) deliver(fn EncReader) { - for k := range r.buf { - fn(r.buf[k].src, r.buf[k].buf, RxMeta{OuterECN: r.buf[k].ecn}) - r.buf[k].buf = nil - } -} diff --git a/udp/rx_reorder_linux_test.go b/udp/rx_reorder_linux_test.go deleted file mode 100644 index 180e2e4a..00000000 --- a/udp/rx_reorder_linux_test.go +++ /dev/null @@ -1,203 +0,0 @@ -//go:build !android && !e2e_testing -// +build !android,!e2e_testing - -package udp - -import ( - "encoding/binary" - "net/netip" - "testing" -) - -// makeNebulaPkt returns a buffer whose [8:16] bytes encode the given -// counter big-endian, the rest left zero. Anything shorter than 16 bytes -// would yield counter 0; tests use this to simulate well-formed nebula -// headers (the rxReorderBuffer doesn't care about anything else). -func makeNebulaPkt(cnt uint64, payLen int) []byte { - if payLen < 16 { - payLen = 16 - } - b := make([]byte, payLen) - binary.BigEndian.PutUint64(b[8:16], cnt) - return b -} - -func srcOf(addr string, port uint16) netip.AddrPort { - return netip.AddrPortFrom(netip.MustParseAddr(addr), port) -} - -func TestRxReorderBuffer_LonePassesThrough(t *testing.T) { - r := newRxReorderBuffer(8) - pkt := makeNebulaPkt(42, 100) - r.addEntry(srcOf("1.1.1.1", 4242), pkt, 0, 0x02) - - if got := len(r.buf); got != 1 { - t.Fatalf("want 1 entry, got %d", got) - } - if r.buf[0].cnt != 42 { - t.Errorf("counter=%d want 42", r.buf[0].cnt) - } - if r.buf[0].ecn != 0x02 { - t.Errorf("ecn=%#x want 0x02", r.buf[0].ecn) - } - if len(r.buf[0].buf) != 100 { - t.Errorf("buf len=%d want 100", len(r.buf[0].buf)) - } -} - -func TestRxReorderBuffer_SegSizeGEPayloadIsLone(t *testing.T) { - // segSize >= len(payload) means the kernel did not coalesce this slot. - r := newRxReorderBuffer(8) - pkt := makeNebulaPkt(7, 50) - r.addEntry(srcOf("1.1.1.1", 1), pkt, 50, 0) - if got := len(r.buf); got != 1 { - t.Fatalf("segSize==len: want 1 entry, got %d", got) - } - r.reset() - r.addEntry(srcOf("1.1.1.1", 1), pkt, 60, 0) - if got := len(r.buf); got != 1 { - t.Fatalf("segSize>len: want 1 entry, got %d", got) - } -} - -func TestRxReorderBuffer_GROSplitExactMultiple(t *testing.T) { - // 3 segments of 80 bytes each, packed into one 240-byte GRO superpacket. - const segSize = 80 - const numSeg = 3 - pkt := make([]byte, segSize*numSeg) - for i := range numSeg { - off := i * segSize - binary.BigEndian.PutUint64(pkt[off+8:off+16], uint64(100+i)) - } - - r := newRxReorderBuffer(8) - r.addEntry(srcOf("2.2.2.2", 5555), pkt, segSize, 0x03) - if got := len(r.buf); got != numSeg { - t.Fatalf("want %d segments, got %d", numSeg, got) - } - for i, seg := range r.buf { - if seg.cnt != uint64(100+i) { - t.Errorf("seg %d: cnt=%d want %d", i, seg.cnt, 100+i) - } - if len(seg.buf) != segSize { - t.Errorf("seg %d: buf len=%d want %d", i, len(seg.buf), segSize) - } - if seg.ecn != 0x03 { - t.Errorf("seg %d: ecn=%#x want 0x03 (uniform across GRO)", i, seg.ecn) - } - } -} - -func TestRxReorderBuffer_GROSplitShortFinal(t *testing.T) { - // 200-byte payload, segSize=80 → segments of 80, 80, 40. - const segSize = 80 - pkt := make([]byte, 200) - binary.BigEndian.PutUint64(pkt[8:16], 1) - binary.BigEndian.PutUint64(pkt[80+8:80+16], 2) - binary.BigEndian.PutUint64(pkt[160+8:160+16], 3) - - r := newRxReorderBuffer(8) - r.addEntry(srcOf("3.3.3.3", 1), pkt, segSize, 0) - if got := len(r.buf); got != 3 { - t.Fatalf("want 3 segments, got %d", got) - } - wantLens := []int{80, 80, 40} - for i, seg := range r.buf { - if len(seg.buf) != wantLens[i] { - t.Errorf("seg %d: len=%d want %d", i, len(seg.buf), wantLens[i]) - } - } -} - -func TestRxReorderBuffer_SortGroupsBySrcThenCounter(t *testing.T) { - r := newRxReorderBuffer(8) - a := srcOf("1.1.1.1", 1) - b := srcOf("2.2.2.2", 1) - // Insert deliberately scrambled. - r.addEntry(a, makeNebulaPkt(3, 16), 0, 0) - r.addEntry(b, makeNebulaPkt(1, 16), 0, 0) - r.addEntry(a, makeNebulaPkt(1, 16), 0, 0) - r.addEntry(b, makeNebulaPkt(2, 16), 0, 0) - r.addEntry(a, makeNebulaPkt(2, 16), 0, 0) - - r.sortStable() - - want := []struct { - src netip.AddrPort - cnt uint64 - }{ - {a, 1}, {a, 2}, {a, 3}, {b, 1}, {b, 2}, - } - if got := len(r.buf); got != len(want) { - t.Fatalf("len=%d want %d", got, len(want)) - } - for i, w := range want { - if r.buf[i].src != w.src || r.buf[i].cnt != w.cnt { - t.Errorf("idx %d: got %v/%d want %v/%d", - i, r.buf[i].src, r.buf[i].cnt, w.src, w.cnt) - } - } -} - -func TestRxReorderBuffer_SortStableAcrossPorts(t *testing.T) { - // Same source addr but different ports — must group by port. - r := newRxReorderBuffer(8) - addr := netip.MustParseAddr("4.4.4.4") - p1 := netip.AddrPortFrom(addr, 1) - p2 := netip.AddrPortFrom(addr, 2) - r.addEntry(p2, makeNebulaPkt(10, 16), 0, 0) - r.addEntry(p1, makeNebulaPkt(20, 16), 0, 0) - r.addEntry(p2, makeNebulaPkt(5, 16), 0, 0) - - r.sortStable() - - // Expect: p1/20 then p2/5 then p2/10. - if r.buf[0].src.Port() != 1 || r.buf[1].src.Port() != 2 || r.buf[2].src.Port() != 2 { - t.Fatalf("port order broken: %v %v %v", - r.buf[0].src.Port(), r.buf[1].src.Port(), r.buf[2].src.Port()) - } - if r.buf[1].cnt != 5 || r.buf[2].cnt != 10 { - t.Errorf("counter order in p2: %d %d (want 5 10)", r.buf[1].cnt, r.buf[2].cnt) - } -} - -func TestRxReorderBuffer_DeliverInOrderAndNilsRefs(t *testing.T) { - r := newRxReorderBuffer(4) - a := srcOf("5.5.5.5", 1) - r.addEntry(a, makeNebulaPkt(2, 32), 0, 0x01) - r.addEntry(a, makeNebulaPkt(1, 32), 0, 0x01) - r.sortStable() - - var seenCnts []uint64 - var seenECN []byte - r.deliver(func(src netip.AddrPort, buf []byte, meta RxMeta) { - seenCnts = append(seenCnts, binary.BigEndian.Uint64(buf[8:16])) - seenECN = append(seenECN, meta.OuterECN) - }) - - if len(seenCnts) != 2 || seenCnts[0] != 1 || seenCnts[1] != 2 { - t.Errorf("delivery order broken: %v", seenCnts) - } - if seenECN[0] != 0x01 || seenECN[1] != 0x01 { - t.Errorf("ecn passed wrong: %v", seenECN) - } - for i := range r.buf { - if r.buf[i].buf != nil { - t.Errorf("buf[%d].buf not nil after deliver", i) - } - } -} - -func TestRxReorderBuffer_ResetIsReusable(t *testing.T) { - r := newRxReorderBuffer(2) - r.addEntry(srcOf("6.6.6.6", 1), makeNebulaPkt(1, 16), 0, 0) - r.addEntry(srcOf("6.6.6.6", 1), makeNebulaPkt(2, 16), 0, 0) - r.reset() - if got := len(r.buf); got != 0 { - t.Fatalf("after reset len=%d want 0", got) - } - r.addEntry(srcOf("6.6.6.6", 1), makeNebulaPkt(7, 16), 0, 0) - if r.buf[0].cnt != 7 { - t.Errorf("after reset+add: cnt=%d want 7", r.buf[0].cnt) - } -} diff --git a/udp/udp_linux.go b/udp/udp_linux.go index a1ccd8e2..b36e64ba 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -69,13 +69,6 @@ type StdConn struct { // each arriving datagram as a per-slot cmsg, and listenOutBatch passes // the parsed value to the EncReader callback for RFC 6040 combine. ecnRecvSupported bool - - // rxOrder is the per-batch scratch listenOutBatch uses to gather every - // segment in a recvmmsg call (after splitting GRO superpackets) and - // stable-sort by (source, message-counter) before delivery. Reordering - // fits within the receiver's replay window so briefly out-of-order - // arrivals do not get rejected as replays. - rxOrder *rxReorderBuffer } func setReusePort(network, address string, c syscall.RawConn) error { @@ -459,10 +452,6 @@ func (u *StdConn) listenOutBatch(r EncReader, flush func()) error { } msgs, buffers, names, _ := u.PrepareRawMessages(u.batch, bufSize, cmsgSpace) - if u.rxOrder == nil { - u.rxOrder = newRxReorderBuffer(u.batch * 64) - } - //reader needs to capture variables from this function, since it's used as a lambda with rawConn.Read //defining it outside the loop so it gets re-used reader := func(fd uintptr) (done bool) { @@ -484,9 +473,6 @@ func (u *StdConn) listenOutBatch(r EncReader, flush func()) error { return operr } - // Phase 1: gather every segment from this recvmmsg into rxOrder, - // splitting GRO superpackets into their constituent segments. - //todo u.rxOrder.reset() for i := 0; i < n; i++ { from := getFrom(names, i, u.isV4) payload := buffers[i][:msgs[i].Len] @@ -509,15 +495,8 @@ func (u *StdConn) listenOutBatch(r EncReader, flush func()) error { r(from, seg, RxMeta{OuterECN: outerECN}) } } - - //todo u.rxOrder.addEntry(from, payload, segSize, outerECN) } - // stable-sort by (src, port, counter), then deliver in order. - // this is on top of the sort performed before decrypt - //todo u.rxOrder.sortStable() - //todo u.rxOrder.deliver(r) - // let callers (e.g. TUN write coalescer) flush any state they accumulated across this batch. flush() } }