diff --git a/interface.go b/interface.go index 883afebd..c5aae59a 100644 --- a/interface.go +++ b/interface.go @@ -309,18 +309,28 @@ func (f *Interface) listenOut(i int) { ctCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout) lhh := f.lightHouse.NewRequestHandler() - plaintext := make([]byte, udp.MTU) h := &header.H{} fwPacket := &firewall.Packet{} nb := make([]byte, 12, 12) + // plaintexts is a ring of decrypt scratches, one per packet in a UDP + // recvmmsg batch. The coalescer borrows payload slices from here and + // requires they stay valid until Flush — so we rotate each packet and + // reset only in the batch-end flush callback. + var plaintexts [][]byte + idx := 0 coalescer := f.tunCoalescers[i] err := li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) { - f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l)) + if idx >= len(plaintexts) { + plaintexts = append(plaintexts, make([]byte, udp.MTU)) + } + f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintexts[idx][:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l)) + idx++ }, func() { if err := coalescer.Flush(); err != nil { f.l.WithError(err).Error("Failed to flush tun coalescer") } + idx = 0 }) if err != nil && !f.closed.Load() { diff --git a/overlay/device.go b/overlay/device.go index dc58bcfe..c89bf5e9 100644 --- a/overlay/device.go +++ b/overlay/device.go @@ -31,21 +31,24 @@ type Device interface { NewMultiQueueReader() (Queue, error) } -// GSOWriter is implemented by Queues that can write a TCP TSO superpacket as -// a single virtio_net_hdr + payload writev, letting the kernel segment on -// egress. Callers type-assert on it; backends that don't support GSO return -// false from Supported and all coalescing logic is skipped. +// GSOWriter is implemented by Queues that can emit a TCP TSO superpacket +// assembled from a header prefix plus one or more borrowed payload +// fragments, in a single vectored write (writev with a leading +// virtio_net_hdr). This lets the coalescer avoid copying payload bytes +// between the caller's decrypt buffer and the TUN. Backends without GSO +// support return false from GSOSupported and coalescing is skipped. // -// pkt must contain the IPv4/IPv6 + TCP header plus the concatenated -// coalesced payload. hdrLen is the total L3+L4 header length (where the -// payload starts). csumStart is the byte offset where the TCP header -// begins (= IP header length). gsoSize is the MSS — every segment except -// possibly the last must be exactly this many payload bytes. isV6 selects -// GSO_TCPV4 vs GSO_TCPV6. +// hdr contains the IPv4/IPv6 + TCP header prefix (mutable — callers will +// have filled in total length and pseudo-header partial). pays are +// non-overlapping payload fragments whose concatenation is the full +// superpacket payload; they are read-only from the writer's perspective +// and must remain valid until the call returns. gsoSize is the MSS: +// every segment except possibly the last is exactly that many bytes. +// csumStart is the byte offset where the TCP header begins within hdr. // -// pkt's TCP checksum field must already hold the pseudo-header partial +// hdr's TCP checksum field must already hold the pseudo-header partial // sum (single-fold, not inverted), per virtio NEEDS_CSUM semantics. type GSOWriter interface { - WriteGSO(pkt []byte, gsoSize uint16, isV6 bool, hdrLen, csumStart uint16) error + WriteGSO(hdr []byte, pays [][]byte, gsoSize uint16, isV6 bool, csumStart uint16) error GSOSupported() bool } diff --git a/overlay/tun_linux.go b/overlay/tun_linux.go index 41a8a1a0..60cf09c3 100644 --- a/overlay/tun_linux.go +++ b/overlay/tun_linux.go @@ -53,9 +53,17 @@ type tunFile struct { // by WriteGSO. Separate from zeroVnetHdr so a concurrent non-GSO Write on // another queue never observes a half-written header. gsoHdrBuf [virtioNetHdrLen]byte - gsoIovs [2]unix.Iovec + // gsoIovs is the writev iovec scratch for WriteGSO. Sized to hold the + // virtio header + IP/TCP header + up to gsoInitialPayIovs payload + // fragments; grown on demand if a coalescer pushes more. + gsoIovs []unix.Iovec } +// gsoInitialPayIovs is the starting capacity (in payload fragments) of +// tunFile.gsoIovs. Sized to cover the default coalesce segment cap without +// any reallocations. +const gsoInitialPayIovs = 66 + // zeroVnetHdr is the 10-byte virtio_net_hdr we prepend to every TUN write when // IFF_VNET_HDR is active. All-zero signals "no GSO, no checksum offload"; the // kernel accepts the packet as-is. @@ -84,6 +92,7 @@ func (r *tunFile) newFriend(fd int) (*tunFile, error) { out.segBuf = make([]byte, tunSegBufCap) out.writeIovs[0].Base = &zeroVnetHdr[0] out.writeIovs[0].SetLen(virtioNetHdrLen) + out.gsoIovs = make([]unix.Iovec, 2, 2+gsoInitialPayIovs) out.gsoIovs[0].Base = &out.gsoHdrBuf[0] out.gsoIovs[0].SetLen(virtioNetHdrLen) } @@ -119,6 +128,7 @@ func newTunFd(fd int, vnetHdr bool) (*tunFile, error) { out.segBuf = make([]byte, tunSegBufCap) out.writeIovs[0].Base = &zeroVnetHdr[0] out.writeIovs[0].SetLen(virtioNetHdrLen) + out.gsoIovs = make([]unix.Iovec, 2, 2+gsoInitialPayIovs) out.gsoIovs[0].Base = &out.gsoHdrBuf[0] out.gsoIovs[0].SetLen(virtioNetHdrLen) } @@ -346,46 +356,79 @@ func (r *tunFile) Write(buf []byte) (int, error) { // Write calls. func (r *tunFile) GSOSupported() bool { return r.vnetHdr } -// WriteGSO emits pkt as a single TCP TSO superpacket via writev. pkt must -// contain a full IPv4/IPv6 + TCP header prefix followed by the concatenated -// coalesced payload. The TCP checksum field must already hold the -// pseudo-header partial (NEEDS_CSUM semantics). gsoSize is the MSS; every -// segment except the last must be exactly that many payload bytes. -func (r *tunFile) WriteGSO(pkt []byte, gsoSize uint16, isV6 bool, hdrLen, csumStart uint16) error { +// WriteGSO emits a TCP TSO superpacket in a single writev. hdr is the +// IPv4/IPv6 + TCP header prefix (already finalized — total length, IP csum, +// and TCP pseudo-header partial set by the caller). pays are payload +// fragments whose concatenation forms the full coalesced payload; each +// slice is read-only and must stay valid until return. gsoSize is the MSS; +// every segment except possibly the last is exactly gsoSize bytes. +// csumStart is the byte offset where the TCP header begins within hdr. +func (r *tunFile) WriteGSO(hdr []byte, pays [][]byte, gsoSize uint16, isV6 bool, csumStart uint16) error { if !r.vnetHdr { return fmt.Errorf("WriteGSO called on tun without IFF_VNET_HDR") } - if len(pkt) == 0 { + if len(hdr) == 0 || len(pays) == 0 { return nil } - hdr := virtioNetHdr{ + + // Build the virtio_net_hdr. When pays total to <= gsoSize the kernel + // would produce a single segment; keep NEEDS_CSUM semantics but skip + // the GSO type so the kernel doesn't spuriously mark this as TSO. + vhdr := virtioNetHdr{ Flags: unix.VIRTIO_NET_HDR_F_NEEDS_CSUM, - HdrLen: hdrLen, + HdrLen: uint16(len(hdr)), GSOSize: gsoSize, CsumStart: csumStart, CsumOffset: 16, // TCP checksum field lives 16 bytes into the TCP header } - if isV6 { - hdr.GSOType = unix.VIRTIO_NET_HDR_GSO_TCPV6 - } else { - hdr.GSOType = unix.VIRTIO_NET_HDR_GSO_TCPV4 + var totalPay int + for _, p := range pays { + totalPay += len(p) + } + if totalPay > int(gsoSize) { + if isV6 { + vhdr.GSOType = unix.VIRTIO_NET_HDR_GSO_TCPV6 + } else { + vhdr.GSOType = unix.VIRTIO_NET_HDR_GSO_TCPV4 + } + } else { + vhdr.GSOType = unix.VIRTIO_NET_HDR_GSO_NONE + vhdr.GSOSize = 0 + } + vhdr.encode(r.gsoHdrBuf[:]) + + // Build the iovec array: [virtio_hdr, hdr, pays...]. r.gsoIovs[0] is + // wired to gsoHdrBuf at construction and never changes. + need := 2 + len(pays) + if cap(r.gsoIovs) < need { + grown := make([]unix.Iovec, need) + grown[0] = r.gsoIovs[0] + r.gsoIovs = grown + } else { + r.gsoIovs = r.gsoIovs[:need] + } + r.gsoIovs[1].Base = &hdr[0] + r.gsoIovs[1].SetLen(len(hdr)) + for i, p := range pays { + r.gsoIovs[2+i].Base = &p[0] + r.gsoIovs[2+i].SetLen(len(p)) } - hdr.encode(r.gsoHdrBuf[:]) - r.gsoIovs[1].Base = &pkt[0] - r.gsoIovs[1].SetLen(len(pkt)) iovPtr := uintptr(unsafe.Pointer(&r.gsoIovs[0])) + iovCnt := uintptr(len(r.gsoIovs)) for { - n, _, errno := syscall.RawSyscall(unix.SYS_WRITEV, uintptr(r.fd), iovPtr, 2) + n, _, errno := syscall.RawSyscall(unix.SYS_WRITEV, uintptr(r.fd), iovPtr, iovCnt) if errno == 0 { - runtime.KeepAlive(pkt) + runtime.KeepAlive(hdr) + runtime.KeepAlive(pays) if int(n) < virtioNetHdrLen { return io.ErrShortWrite } return nil } if errno == unix.EAGAIN { - runtime.KeepAlive(pkt) + runtime.KeepAlive(hdr) + runtime.KeepAlive(pays) if err := r.blockOnWrite(); err != nil { return err } @@ -394,7 +437,8 @@ func (r *tunFile) WriteGSO(pkt []byte, gsoSize uint16, isV6 bool, hdrLen, csumSt if errno == unix.EINTR { continue } - runtime.KeepAlive(pkt) + runtime.KeepAlive(hdr) + runtime.KeepAlive(pays) return errno } } diff --git a/tcp_coalesce.go b/tcp_coalesce.go index b16d6f2f..c6ebdde8 100644 --- a/tcp_coalesce.go +++ b/tcp_coalesce.go @@ -7,72 +7,114 @@ import ( "github.com/slackhq/nebula/overlay" ) -// IPPROTO_TCP is the IANA protocol number for TCP. Hardcoded instead of -// reaching for ipProtoTCP because golang.org/x/sys/unix doesn't -// define that constant on Windows, which would break cross-compiles even -// though this file runs unchanged on every platform. +// ipProtoTCP is the IANA protocol number for TCP. Hardcoded instead of +// reaching for golang.org/x/sys/unix — that package doesn't define the +// constant on Windows, which would break cross-compiles even though this +// file runs unchanged on every platform. const ipProtoTCP = 6 -// tcpCoalesceBufSize bounds the largest coalesced superpacket we will buffer. -// Linux caps sk_gso_max_size around 64KiB; 65535 bytes covers IP hdr + TCP -// hdr + up to ~65KB of payload, which is the most the kernel's TSO can -// segment in one shot. +// tcpCoalesceBufSize caps total bytes per superpacket. Mirrors the kernel's +// sk_gso_max_size of ~64KiB; anything beyond this would be rejected anyway. const tcpCoalesceBufSize = 65535 -// tcpCoalesceMaxSegs caps how many segments we are willing to coalesce into -// a single superpacket regardless of byte budget. Kernel allows up to 64 -// for UDP GSO and 128 for many TSO engines; stop well before either limit -// to keep latency bounded. +// tcpCoalesceMaxSegs caps how many segments we'll coalesce into a single +// superpacket. Keeping this well below the kernel's TSO ceiling bounds +// latency. const tcpCoalesceMaxSegs = 64 -// tcpCoalescer accumulates adjacent in-flow TCP data segments into a single -// TSO superpacket and emits them via overlay.GSOWriter in one writev. When -// a packet fails admission or fails to extend the pending flow, the -// pending superpacket is flushed and the non-matching packet is written -// through as-is. Owns no locks — one coalescer per TUN write queue. +// tcpCoalesceHdrCap is the scratch space we copy a seed's IP+TCP header +// into. IPv6 (40) + TCP with full options (60) = 100 bytes. +const tcpCoalesceHdrCap = 100 + +// initialSlots is the starting capacity of the slot pool. One flow per +// packet is the worst case so this matches a typical UDP recvmmsg batch. +const initialSlots = 64 + +// flowKey identifies a TCP flow by {src, dst, sport, dport, family}. +// Comparable, so linear scans over the slot list stay tight. +type flowKey struct { + src, dst [16]byte + sport, dport uint16 + isV6 bool +} + +// coalesceSlot is one entry in the coalescer's ordered event queue. When +// passthrough is true the slot holds a single borrowed packet that must be +// emitted verbatim (non-TCP, non-admissible TCP, or oversize seed). When +// passthrough is false the slot is an in-progress coalesced superpacket: +// hdrBuf is a mutable copy of the seed's IP+TCP header (we patch total +// length and pseudo-header partial at flush), and payIovs are *borrowed* +// slices from the caller's plaintext buffers — no payload is ever copied. +// The caller (listenOut) must keep those buffers alive until Flush. +type coalesceSlot struct { + passthrough bool + rawPkt []byte // borrowed when passthrough + + fk flowKey + hdrBuf [tcpCoalesceHdrCap]byte + hdrLen int + ipHdrLen int + isV6 bool + gsoSize int + numSeg int + totalPay int + nextSeq uint32 + // psh closes the chain: set when the last-accepted segment had PSH or + // was sub-gsoSize. No further appends after that. + psh bool + payIovs [][]byte +} + +// tcpCoalescer accumulates adjacent in-flow TCP data segments across +// multiple concurrent flows and emits each flow's run as a single TSO +// superpacket via overlay.GSOWriter. All output — coalesced or not — is +// deferred until Flush so arrival order is preserved on the wire. Owns +// no locks; one coalescer per TUN write queue. type tcpCoalescer struct { plainW io.Writer gsoW overlay.GSOWriter // nil when the queue doesn't support TSO - buf []byte - bufLen int // valid bytes in buf — hdrLen plus accumulated payload - active bool // a seed packet is present - numSeg int - gsoSize int // payload length of each segment (= MSS of the seed) - isV6 bool - ipHdrLen int - hdrLen int // ipHdrLen + tcpHdrLen, the offset where payload starts - nextSeq uint32 // expected TCP seq of the next packet to coalesce - // psh indicates the last-accepted segment had PSH set. We accept a PSH - // packet as the final segment but reject any further Adds after that. - psh bool + // slots is the ordered event queue. Flush walks it once and emits each + // entry as either a WriteGSO (coalesced) or a plainW.Write (passthrough). + slots []*coalesceSlot + // openSlots maps a flow key to its most recent non-sealed slot, so new + // segments can extend an in-progress superpacket in O(1). Slots are + // removed from this map when they close (PSH or short-last-segment), + // when a non-admissible packet for that flow arrives, or in Flush. + openSlots map[flowKey]*coalesceSlot + pool []*coalesceSlot // free list for reuse } func newTCPCoalescer(w io.Writer) *tcpCoalescer { - c := &tcpCoalescer{plainW: w, buf: make([]byte, tcpCoalesceBufSize)} + c := &tcpCoalescer{ + plainW: w, + slots: make([]*coalesceSlot, 0, initialSlots), + openSlots: make(map[flowKey]*coalesceSlot, initialSlots), + pool: make([]*coalesceSlot, 0, initialSlots), + } if gw, ok := w.(overlay.GSOWriter); ok && gw.GSOSupported() { c.gsoW = gw } return c } -// parsedTCP holds the byte offsets / values we extract from one admission -// check so Add and canAppend don't re-parse the same header twice. +// parsedTCP holds the fields extracted from a single parse so later steps +// (admission, slot lookup, canAppend) don't re-walk the header. type parsedTCP struct { - isV6 bool + fk flowKey ipHdrLen int tcpHdrLen int - hdrLen int // ipHdrLen + tcpHdrLen + hdrLen int payLen int seq uint32 flags byte } -// parseCoalesceable decides whether pkt is eligible for TCP coalescing. It -// accepts IPv4 (no options, DF set, no fragmentation) and IPv6 (no -// extension headers) carrying a TCP segment with flags in {ACK, ACK|PSH} -// and a non-empty payload. On success it returns the parsed offsets. -func parseCoalesceable(pkt []byte) (parsedTCP, bool) { +// parseTCPBase extracts the flow key and IP/TCP offsets for any TCP packet, +// regardless of whether it's admissible for coalescing. Returns ok=false +// for non-TCP or malformed input. Accepts IPv4 (no options, no fragmentation) +// and IPv6 (no extension headers). +func parseTCPBase(pkt []byte) (parsedTCP, bool) { var p parsedTCP if len(pkt) < 20 { return p, false @@ -80,42 +122,41 @@ func parseCoalesceable(pkt []byte) (parsedTCP, bool) { v := pkt[0] >> 4 switch v { case 4: - if len(pkt) < 20 { - return p, false - } ihl := int(pkt[0]&0x0f) * 4 if ihl != 20 { - return p, false // reject IP options + return p, false } if pkt[9] != ipProtoTCP { return p, false } - // Fragment check: MF=0 and frag offset=0. Accept DF=1 or DF=0 — - // just reject any actual fragmentation. - fragField := binary.BigEndian.Uint16(pkt[6:8]) - if fragField&0x3fff != 0 { + // Reject actual fragmentation (MF or non-zero frag offset). + if binary.BigEndian.Uint16(pkt[6:8])&0x3fff != 0 { return p, false } totalLen := int(binary.BigEndian.Uint16(pkt[2:4])) if totalLen > len(pkt) || totalLen < ihl { return p, false } - p.isV6 = false - p.ipHdrLen = ihl + p.ipHdrLen = 20 + p.fk.isV6 = false + copy(p.fk.src[:4], pkt[12:16]) + copy(p.fk.dst[:4], pkt[16:20]) pkt = pkt[:totalLen] case 6: if len(pkt) < 40 { return p, false } if pkt[6] != ipProtoTCP { - return p, false // reject ext headers + return p, false } payloadLen := int(binary.BigEndian.Uint16(pkt[4:6])) if 40+payloadLen > len(pkt) { return p, false } - p.isV6 = true p.ipHdrLen = 40 + p.fk.isV6 = true + copy(p.fk.src[:], pkt[8:24]) + copy(p.fk.dst[:], pkt[24:40]) pkt = pkt[:40+payloadLen] default: return p, false @@ -131,146 +172,216 @@ func parseCoalesceable(pkt []byte) (parsedTCP, bool) { if len(pkt) < p.ipHdrLen+tcpOff { return p, false } - flags := pkt[p.ipHdrLen+13] - // Allow only ACK and ACK|PSH. In particular: no SYN/FIN/RST/URG/CWR/ECE. - const ack = 0x10 - const psh = 0x08 - if flags&^(ack|psh) != 0 || flags&ack == 0 { - return p, false - } p.tcpHdrLen = tcpOff p.hdrLen = p.ipHdrLen + tcpOff p.payLen = len(pkt) - p.hdrLen - if p.payLen <= 0 { - return p, false - } p.seq = binary.BigEndian.Uint32(pkt[p.ipHdrLen+4 : p.ipHdrLen+8]) - p.flags = flags + p.flags = pkt[p.ipHdrLen+13] + p.fk.sport = binary.BigEndian.Uint16(pkt[p.ipHdrLen : p.ipHdrLen+2]) + p.fk.dport = binary.BigEndian.Uint16(pkt[p.ipHdrLen+2 : p.ipHdrLen+4]) return p, true } -// Add takes a plaintext inbound packet destined for the tun. If GSO is -// unavailable or the packet isn't coalesceable, Add falls through to a -// plain Write on the underlying queue (flushing any pending superpacket -// first). +// coalesceable reports whether a parsed TCP segment is eligible for +// coalescing. Accepts only ACK or ACK|PSH with a non-empty payload. +func (p parsedTCP) coalesceable() bool { + const ack = 0x10 + const psh = 0x08 + if p.flags&^(ack|psh) != 0 || p.flags&ack == 0 { + return false + } + return p.payLen > 0 +} + +// Add borrows pkt. The caller must keep pkt valid until the next Flush, +// whether or not the packet was coalesced — passthrough (non-admissible) +// packets are queued and written at Flush time, not synchronously. func (c *tcpCoalescer) Add(pkt []byte) error { if c.gsoW == nil { - _, err := c.plainW.Write(pkt) - return err + c.addPassthrough(pkt) + return nil } - info, ok := parseCoalesceable(pkt) + info, ok := parseTCPBase(pkt) if !ok { - if c.active { - if err := c.flushLocked(); err != nil { - return err - } - } - _, err := c.plainW.Write(pkt) - return err + // Non-TCP or malformed — can't possibly collide with an open flow. + c.addPassthrough(pkt) + return nil + } + if !info.coalesceable() { + // TCP but not admissible (SYN/FIN/RST/URG/CWR/ECE or zero-payload). + // Seal this flow's open slot so later in-flow packets don't extend + // it and accidentally reorder past this passthrough. + delete(c.openSlots, info.fk) + c.addPassthrough(pkt) + return nil } - if c.active { - if c.canAppend(pkt, info) { - c.appendPayload(pkt, info) - if info.flags&0x08 != 0 { - c.psh = true + if open := c.openSlots[info.fk]; open != nil { + if c.canAppend(open, pkt, info) { + c.appendPayload(open, pkt, info) + if open.psh { + delete(c.openSlots, info.fk) } return nil } - if err := c.flushLocked(); err != nil { - return err - } + // Can't extend — seal it and fall through to seed a fresh slot. + delete(c.openSlots, info.fk) } - return c.seed(pkt, info) -} - -// Flush emits any pending superpacket. Called by the UDP read loop at -// recvmmsg batch boundaries — "no more packets coming right now". -func (c *tcpCoalescer) Flush() error { - if !c.active { - return nil - } - return c.flushLocked() -} - -func (c *tcpCoalescer) reset() { - c.active = false - c.bufLen = 0 - c.numSeg = 0 - c.gsoSize = 0 - c.hdrLen = 0 - c.ipHdrLen = 0 - c.nextSeq = 0 - c.psh = false -} - -func (c *tcpCoalescer) seed(pkt []byte, info parsedTCP) error { - if info.hdrLen+info.payLen > len(c.buf) { - // Oversize single packet — flush (already done above) and passthrough. - _, err := c.plainW.Write(pkt) - return err - } - copy(c.buf, pkt[:info.hdrLen+info.payLen]) - c.active = true - c.bufLen = info.hdrLen + info.payLen - c.numSeg = 1 - c.gsoSize = info.payLen - c.isV6 = info.isV6 - c.ipHdrLen = info.ipHdrLen - c.hdrLen = info.hdrLen - c.nextSeq = info.seq + uint32(info.payLen) - c.psh = info.flags&0x08 != 0 + c.seed(pkt, info) return nil } -// canAppend reports whether info's packet extends the current seed: same -// flow, adjacent seq, payload size rule, and no-PSH-mid-chain. -func (c *tcpCoalescer) canAppend(pkt []byte, info parsedTCP) bool { - if c.psh { - return false // we already accepted a PSH — chain is closed +// Flush emits every queued event in arrival order. Coalesced slots go out +// via WriteGSO; passthrough slots go out via plainW.Write. Returns the +// first error observed; keeps draining so one bad packet doesn't hold up +// the rest. After Flush returns, borrowed payload slices may be recycled. +func (c *tcpCoalescer) Flush() error { + var first error + for _, s := range c.slots { + var err error + if s.passthrough { + _, err = c.plainW.Write(s.rawPkt) + } else { + err = c.flushSlot(s) + } + if err != nil && first == nil { + first = err + } + c.release(s) } - if info.isV6 != c.isV6 { + for i := range c.slots { + c.slots[i] = nil + } + c.slots = c.slots[:0] + for k := range c.openSlots { + delete(c.openSlots, k) + } + return first +} + +func (c *tcpCoalescer) addPassthrough(pkt []byte) { + s := c.take() + s.passthrough = true + s.rawPkt = pkt + c.slots = append(c.slots, s) +} + +func (c *tcpCoalescer) seed(pkt []byte, info parsedTCP) { + if info.hdrLen > tcpCoalesceHdrCap || info.hdrLen+info.payLen > tcpCoalesceBufSize { + // Pathological shape — can't fit our scratch, emit as-is. + c.addPassthrough(pkt) + return + } + s := c.take() + s.passthrough = false + s.rawPkt = nil + copy(s.hdrBuf[:], pkt[:info.hdrLen]) + s.hdrLen = info.hdrLen + s.ipHdrLen = info.ipHdrLen + s.isV6 = info.fk.isV6 + s.fk = info.fk + s.gsoSize = info.payLen + s.numSeg = 1 + s.totalPay = info.payLen + s.nextSeq = info.seq + uint32(info.payLen) + s.psh = info.flags&0x08 != 0 + s.payIovs = append(s.payIovs[:0], pkt[info.hdrLen:info.hdrLen+info.payLen]) + c.slots = append(c.slots, s) + if !s.psh { + c.openSlots[info.fk] = s + } +} + +// canAppend reports whether info's packet extends the slot's seed: same +// header shape and stable contents, adjacent seq, not oversized, chain not +// closed. +func (c *tcpCoalescer) canAppend(s *coalesceSlot, pkt []byte, info parsedTCP) bool { + if s.psh { return false } - if info.hdrLen != c.hdrLen { + if info.hdrLen != s.hdrLen { return false } - if info.seq != c.nextSeq { + if info.seq != s.nextSeq { return false } - if c.numSeg >= tcpCoalesceMaxSegs { + if s.numSeg >= tcpCoalesceMaxSegs { return false } - if c.bufLen+info.payLen > len(c.buf) { + if info.payLen > s.gsoSize { return false } - // Every mid-chain segment must be exactly gsoSize. The final segment may - // be shorter, but once a short segment is appended we can't add another. - if info.payLen > c.gsoSize { + if s.hdrLen+s.totalPay+info.payLen > tcpCoalesceBufSize { return false } - if info.payLen < c.gsoSize { - // Will become the last segment — always OK to append, just no more. - } - // Compare the stable parts of the header. - if !headersMatch(c.buf[:c.hdrLen], pkt[:info.hdrLen], c.isV6, c.ipHdrLen) { + if !headersMatch(s.hdrBuf[:s.hdrLen], pkt[:info.hdrLen], s.isV6, s.ipHdrLen) { return false } return true } -func (c *tcpCoalescer) appendPayload(pkt []byte, info parsedTCP) { - copy(c.buf[c.bufLen:], pkt[info.hdrLen:info.hdrLen+info.payLen]) - c.bufLen += info.payLen - c.numSeg++ - c.nextSeq = info.seq + uint32(info.payLen) - // If this was a sub-gsoSize last segment, mark chain as closed. - if info.payLen < c.gsoSize { - c.psh = true +func (c *tcpCoalescer) appendPayload(s *coalesceSlot, pkt []byte, info parsedTCP) { + s.payIovs = append(s.payIovs, pkt[info.hdrLen:info.hdrLen+info.payLen]) + s.numSeg++ + s.totalPay += info.payLen + s.nextSeq = info.seq + uint32(info.payLen) + if info.payLen < s.gsoSize || info.flags&0x08 != 0 { + s.psh = true } } +func (c *tcpCoalescer) take() *coalesceSlot { + if n := len(c.pool); n > 0 { + s := c.pool[n-1] + c.pool[n-1] = nil + c.pool = c.pool[:n-1] + return s + } + return &coalesceSlot{} +} + +func (c *tcpCoalescer) release(s *coalesceSlot) { + s.passthrough = false + s.rawPkt = nil + for i := range s.payIovs { + s.payIovs[i] = nil + } + s.payIovs = s.payIovs[:0] + s.numSeg = 0 + s.totalPay = 0 + s.psh = false + c.pool = append(c.pool, s) +} + +// flushSlot patches the header and calls WriteGSO. Does not remove the +// slot from c.slots. +func (c *tcpCoalescer) flushSlot(s *coalesceSlot) error { + total := s.hdrLen + s.totalPay + l4Len := total - s.ipHdrLen + hdr := s.hdrBuf[:s.hdrLen] + + if s.isV6 { + binary.BigEndian.PutUint16(hdr[4:6], uint16(l4Len)) + } else { + binary.BigEndian.PutUint16(hdr[2:4], uint16(total)) + hdr[10] = 0 + hdr[11] = 0 + binary.BigEndian.PutUint16(hdr[10:12], ipv4HdrChecksum(hdr[:s.ipHdrLen])) + } + + var psum uint32 + if s.isV6 { + psum = pseudoSumIPv6(hdr[8:24], hdr[24:40], ipProtoTCP, l4Len) + } else { + psum = pseudoSumIPv4(hdr[12:16], hdr[16:20], ipProtoTCP, l4Len) + } + tcsum := s.ipHdrLen + 16 + binary.BigEndian.PutUint16(hdr[tcsum:tcsum+2], foldOnceNoInvert(psum)) + + return c.gsoW.WriteGSO(hdr, s.payIovs, uint16(s.gsoSize), s.isV6, uint16(s.ipHdrLen)) +} + // headersMatch compares two IP+TCP header prefixes for byte-for-byte // equality on every field that must be identical across coalesced // segments. Size/IPID/IPCsum/seq/flags/tcpCsum are masked out. @@ -330,58 +441,6 @@ func bytesEq(a, b []byte) bool { return true } -func (c *tcpCoalescer) flushLocked() error { - // Guarantee the coalescer is empty on exit regardless of how we leave. - defer c.reset() - - if c.numSeg <= 1 { - _, err := c.plainW.Write(c.buf[:c.bufLen]) - return err - } - - total := c.bufLen - l4Len := total - c.ipHdrLen - - // Fix IP header length field. - if c.isV6 { - if l4Len > 0xffff { - // Shouldn't happen given buffer size, but guard against it. - return c.flushAsPerSegment() - } - binary.BigEndian.PutUint16(c.buf[4:6], uint16(l4Len)) - } else { - if total > 0xffff { - return c.flushAsPerSegment() - } - binary.BigEndian.PutUint16(c.buf[2:4], uint16(total)) - // Recompute IPv4 header checksum. - c.buf[10] = 0 - c.buf[11] = 0 - binary.BigEndian.PutUint16(c.buf[10:12], ipv4HdrChecksum(c.buf[:c.ipHdrLen])) - } - - // Write the virtio NEEDS_CSUM pseudo-header partial into the TCP csum field. - var psum uint32 - if c.isV6 { - psum = pseudoSumIPv6(c.buf[8:24], c.buf[24:40], ipProtoTCP, l4Len) - } else { - psum = pseudoSumIPv4(c.buf[12:16], c.buf[16:20], ipProtoTCP, l4Len) - } - tcsum := c.ipHdrLen + 16 - binary.BigEndian.PutUint16(c.buf[tcsum:tcsum+2], foldOnceNoInvert(psum)) - - return c.gsoW.WriteGSO(c.buf[:total], uint16(c.gsoSize), c.isV6, uint16(c.hdrLen), uint16(c.ipHdrLen)) -} - -// flushAsPerSegment is a defensive fallback used if the coalesced superpacket -// somehow exceeds 16-bit length fields. It writes the packet as-is through -// the plain writer (the kernel will reject it, but that's a visible error -// rather than silent corruption). -func (c *tcpCoalescer) flushAsPerSegment() error { - _, err := c.plainW.Write(c.buf[:c.bufLen]) - return err -} - // ipv4HdrChecksum computes the IPv4 header checksum over hdr (which must // already have its checksum field zeroed) and returns the folded/inverted // 16-bit value to store. diff --git a/tcp_coalesce_test.go b/tcp_coalesce_test.go index c70d28e3..9d7713fc 100644 --- a/tcp_coalesce_test.go +++ b/tcp_coalesce_test.go @@ -5,8 +5,9 @@ import ( "testing" ) -// A minimal stub writer that records each plain Write and each WriteGSO -// call without touching a real TUN fd. +// fakeTunWriter records plain Writes and WriteGSO calls without touching a +// real TUN fd. WriteGSO preserves the split between hdr and borrowed pays +// so tests can inspect each independently. type fakeTunWriter struct { gsoEnabled bool writes [][]byte @@ -14,13 +15,31 @@ type fakeTunWriter struct { } type fakeGSOWrite struct { - pkt []byte + hdr []byte + pays [][]byte gsoSize uint16 isV6 bool - hdrLen uint16 csumStart uint16 } +// total returns hdrLen + sum of pay lens. +func (g fakeGSOWrite) total() int { + n := len(g.hdr) + for _, p := range g.pays { + n += len(p) + } + return n +} + +// payLen sums the pays. +func (g fakeGSOWrite) payLen() int { + var n int + for _, p := range g.pays { + n += len(p) + } + return n +} + func (w *fakeTunWriter) Write(p []byte) (int, error) { buf := make([]byte, len(p)) copy(buf, p) @@ -28,10 +47,22 @@ func (w *fakeTunWriter) Write(p []byte) (int, error) { return len(p), nil } -func (w *fakeTunWriter) WriteGSO(pkt []byte, gsoSize uint16, isV6 bool, hdrLen, csumStart uint16) error { - buf := make([]byte, len(pkt)) - copy(buf, pkt) - w.gsoWrites = append(w.gsoWrites, fakeGSOWrite{pkt: buf, gsoSize: gsoSize, isV6: isV6, hdrLen: hdrLen, csumStart: csumStart}) +func (w *fakeTunWriter) WriteGSO(hdr []byte, pays [][]byte, gsoSize uint16, isV6 bool, csumStart uint16) error { + hcopy := make([]byte, len(hdr)) + copy(hcopy, hdr) + paysCopy := make([][]byte, len(pays)) + for i, p := range pays { + pc := make([]byte, len(p)) + copy(pc, p) + paysCopy[i] = pc + } + w.gsoWrites = append(w.gsoWrites, fakeGSOWrite{ + hdr: hcopy, + pays: paysCopy, + gsoSize: gsoSize, + isV6: isV6, + csumStart: csumStart, + }) return nil } @@ -40,33 +71,34 @@ func (w *fakeTunWriter) GSOSupported() bool { return w.gsoEnabled } // buildTCPv4 constructs a minimal IPv4+TCP packet with the given payload, // seq, and flags. Assumes no IP options and a 20-byte TCP header. func buildTCPv4(seq uint32, flags byte, payload []byte) []byte { + return buildTCPv4Ports(1000, 2000, seq, flags, payload) +} + +// buildTCPv4Ports is buildTCPv4 with caller-specified ports so tests can +// build distinct flows. +func buildTCPv4Ports(sport, dport uint16, seq uint32, flags byte, payload []byte) []byte { const ipHdrLen = 20 const tcpHdrLen = 20 total := ipHdrLen + tcpHdrLen + len(payload) pkt := make([]byte, total) - // IPv4 header. - pkt[0] = 0x45 // version 4, IHL 5 - pkt[1] = 0x00 // TOS + pkt[0] = 0x45 + pkt[1] = 0x00 binary.BigEndian.PutUint16(pkt[2:4], uint16(total)) - binary.BigEndian.PutUint16(pkt[4:6], 0) // id - binary.BigEndian.PutUint16(pkt[6:8], 0x4000) // DF - pkt[8] = 64 // TTL + binary.BigEndian.PutUint16(pkt[4:6], 0) + binary.BigEndian.PutUint16(pkt[6:8], 0x4000) + pkt[8] = 64 pkt[9] = ipProtoTCP - // csum left zero — coalescer recomputes on emit. - copy(pkt[12:16], []byte{10, 0, 0, 1}) // src - copy(pkt[16:20], []byte{10, 0, 0, 2}) // dst + copy(pkt[12:16], []byte{10, 0, 0, 1}) + copy(pkt[16:20], []byte{10, 0, 0, 2}) - // TCP header. - binary.BigEndian.PutUint16(pkt[20:22], 1000) // sport - binary.BigEndian.PutUint16(pkt[22:24], 2000) // dport + binary.BigEndian.PutUint16(pkt[20:22], sport) + binary.BigEndian.PutUint16(pkt[22:24], dport) binary.BigEndian.PutUint32(pkt[24:28], seq) - binary.BigEndian.PutUint32(pkt[28:32], 12345) // ack - pkt[32] = 0x50 // data offset = 5 << 4 + binary.BigEndian.PutUint32(pkt[28:32], 12345) + pkt[32] = 0x50 pkt[33] = flags - binary.BigEndian.PutUint16(pkt[34:36], 0xffff) // window - // tcp csum zero - // urgent zero + binary.BigEndian.PutUint16(pkt[34:36], 0xffff) copy(pkt[40:], payload) return pkt @@ -87,6 +119,13 @@ func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) { if err := c.Add(pkt); err != nil { t.Fatal(err) } + // No sync write — passthrough is deferred to Flush. + if len(w.writes) != 0 || len(w.gsoWrites) != 0 { + t.Fatalf("no Add-time writes: got writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) + } + if err := c.Flush(); err != nil { + t.Fatal(err) + } if len(w.writes) != 1 || len(w.gsoWrites) != 0 { t.Fatalf("want single plain write, got writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) } @@ -95,7 +134,6 @@ func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) { func TestCoalescerNonTCPPassthrough(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} c := newTCPCoalescer(w) - // ICMP packet: proto=1. pkt := make([]byte, 28) pkt[0] = 0x45 binary.BigEndian.PutUint16(pkt[2:4], 28) @@ -105,6 +143,9 @@ func TestCoalescerNonTCPPassthrough(t *testing.T) { if err := c.Add(pkt); err != nil { t.Fatal(err) } + if err := c.Flush(); err != nil { + t.Fatal(err) + } if len(w.writes) != 1 || len(w.gsoWrites) != 0 { t.Fatalf("ICMP should pass through unchanged") } @@ -117,17 +158,24 @@ func TestCoalescerSeedThenFlushAlone(t *testing.T) { if err := c.Add(pkt); err != nil { t.Fatal(err) } - // No flush yet — still pending. if len(w.writes) != 0 || len(w.gsoWrites) != 0 { t.Fatalf("unexpected output before flush") } if err := c.Flush(); err != nil { t.Fatal(err) } - // Single segment — should use plain write, not gso. - if len(w.writes) != 1 || len(w.gsoWrites) != 0 { + // Single-segment flush now goes through WriteGSO with GSO_NONE + // (virtio NEEDS_CSUM lets the kernel fill in the L4 csum). + if len(w.gsoWrites) != 1 || len(w.writes) != 0 { t.Fatalf("single-seg flush: writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) } + g := w.gsoWrites[0] + if g.total() != 40+1000 { + t.Errorf("super total=%d want %d", g.total(), 40+1000) + } + if g.payLen() != 1000 { + t.Errorf("payLen=%d want 1000", g.payLen()) + } } func TestCoalescerCoalescesAdjacentACKs(t *testing.T) { @@ -153,18 +201,20 @@ func TestCoalescerCoalescesAdjacentACKs(t *testing.T) { if g.gsoSize != 1200 { t.Errorf("gsoSize=%d want 1200", g.gsoSize) } - if g.hdrLen != 40 { - t.Errorf("hdrLen=%d want 40", g.hdrLen) + if len(g.hdr) != 40 { + t.Errorf("hdrLen=%d want 40", len(g.hdr)) } if g.csumStart != 20 { t.Errorf("csumStart=%d want 20", g.csumStart) } - if len(g.pkt) != 40+3*1200 { - t.Errorf("superpacket len=%d want %d", len(g.pkt), 40+3*1200) + if len(g.pays) != 3 { + t.Errorf("pay count=%d want 3", len(g.pays)) } - // IP total length should reflect superpacket. - if tot := binary.BigEndian.Uint16(g.pkt[2:4]); int(tot) != len(g.pkt) { - t.Errorf("ip total_length=%d want %d", tot, len(g.pkt)) + if g.total() != 40+3*1200 { + t.Errorf("superpacket len=%d want %d", g.total(), 40+3*1200) + } + if tot := binary.BigEndian.Uint16(g.hdr[2:4]); int(tot) != g.total() { + t.Errorf("ip total_length=%d want %d", tot, g.total()) } } @@ -175,17 +225,15 @@ func TestCoalescerRejectsSeqGap(t *testing.T) { if err := c.Add(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) } - // seq should be 2200; use 3000 to simulate a gap. if err := c.Add(buildTCPv4(3000, tcpAck, pay)); err != nil { t.Fatal(err) } if err := c.Flush(); err != nil { t.Fatal(err) } - // First packet should have been flushed as a plain write (single seg), - // then second packet seeded and flushed likewise. - if len(w.writes) != 2 || len(w.gsoWrites) != 0 { - t.Fatalf("seq gap: want 2 plain writes got writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) + // Each packet flushes as its own single-segment WriteGSO now. + if len(w.gsoWrites) != 2 || len(w.writes) != 0 { + t.Fatalf("seq gap: want 2 gso writes got writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) } } @@ -196,7 +244,8 @@ func TestCoalescerRejectsFlagMismatch(t *testing.T) { if err := c.Add(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) } - // SYN flag — not admissible at all. Should flush first packet + plain-write second. + // SYN|ACK is non-admissible. Must flush matching flow's slot (gso) + // and then plain-write the SYN packet itself. syn := buildTCPv4(2200, tcpSyn|tcpAck, pay) if err := c.Add(syn); err != nil { t.Fatal(err) @@ -204,8 +253,8 @@ func TestCoalescerRejectsFlagMismatch(t *testing.T) { if err := c.Flush(); err != nil { t.Fatal(err) } - if len(w.writes) != 2 || len(w.gsoWrites) != 0 { - t.Fatalf("flag mismatch: want 2 plain writes got writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) + if len(w.writes) != 1 || len(w.gsoWrites) != 1 { + t.Fatalf("flag mismatch: want 1 plain + 1 gso, got writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) } } @@ -219,6 +268,7 @@ func TestCoalescerRejectsFIN(t *testing.T) { if err := c.Flush(); err != nil { t.Fatal(err) } + // FIN isn't admissible — passthrough as plain, no slot, no gso. if len(w.writes) != 1 || len(w.gsoWrites) != 0 { t.Fatalf("FIN should be passthrough, got writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) } @@ -235,26 +285,26 @@ func TestCoalescerShortLastSegmentClosesChain(t *testing.T) { if err := c.Add(buildTCPv4(2200, tcpAck, half)); err != nil { t.Fatal(err) } - // Next full-size would have to start at 2700 but chain is closed — - // should flush + seed. + // Chain now closed; next packet seeds a new slot on the same flow + // after flushing the old one. if err := c.Add(buildTCPv4(2700, tcpAck, full)); err != nil { t.Fatal(err) } if err := c.Flush(); err != nil { t.Fatal(err) } - // Expect: one gso write (first two coalesced) + one plain write (the - // third, flushed alone). - if len(w.gsoWrites) != 1 { - t.Fatalf("want 1 gso write got %d", len(w.gsoWrites)) + // Expect two gso writes: the first two packets coalesced, then the + // third flushed alone (single-seg via GSO_NONE). + if len(w.gsoWrites) != 2 { + t.Fatalf("want 2 gso writes got %d", len(w.gsoWrites)) } - if len(w.writes) != 1 { - t.Fatalf("want 1 plain write got %d", len(w.writes)) + if len(w.writes) != 0 { + t.Fatalf("want 0 plain writes got %d", len(w.writes)) } if w.gsoWrites[0].gsoSize != 1200 { t.Errorf("gsoSize=%d want 1200", w.gsoWrites[0].gsoSize) } - if got, want := len(w.gsoWrites[0].pkt), 40+1200+500; got != want { + if got, want := w.gsoWrites[0].total(), 40+1200+500; got != want { t.Errorf("super len=%d want %d", got, want) } } @@ -266,22 +316,21 @@ func TestCoalescerPSHFinalizesChain(t *testing.T) { if err := c.Add(buildTCPv4(1000, tcpAck, pay)); err != nil { t.Fatal(err) } - // Last full-size segment with PSH — admitted but chain is now closed. if err := c.Add(buildTCPv4(2200, tcpAckPsh, pay)); err != nil { t.Fatal(err) } - // Further full-size would not coalesce. if err := c.Add(buildTCPv4(3400, tcpAck, pay)); err != nil { t.Fatal(err) } if err := c.Flush(); err != nil { t.Fatal(err) } - if len(w.gsoWrites) != 1 { - t.Fatalf("want 1 gso write got %d", len(w.gsoWrites)) + // First two coalesce; the third seeds a fresh slot that flushes alone. + if len(w.gsoWrites) != 2 { + t.Fatalf("want 2 gso writes got %d", len(w.gsoWrites)) } - if len(w.writes) != 1 { - t.Fatalf("want 1 plain write got %d", len(w.writes)) + if len(w.writes) != 0 { + t.Fatalf("want 0 plain writes got %d", len(w.writes)) } } @@ -291,7 +340,6 @@ func TestCoalescerRejectsDifferentFlow(t *testing.T) { pay := make([]byte, 1200) p1 := buildTCPv4(1000, tcpAck, pay) p2 := buildTCPv4(2200, tcpAck, pay) - // Mutate p2's source port to break flow match. binary.BigEndian.PutUint16(p2[20:22], 9999) if err := c.Add(p1); err != nil { t.Fatal(err) @@ -302,9 +350,9 @@ func TestCoalescerRejectsDifferentFlow(t *testing.T) { if err := c.Flush(); err != nil { t.Fatal(err) } - // Both flushed as plain writes. - if len(w.writes) != 2 || len(w.gsoWrites) != 0 { - t.Fatalf("diff flow: writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) + // Two independent flows, each flushes its own single-segment WriteGSO. + if len(w.gsoWrites) != 2 || len(w.writes) != 0 { + t.Fatalf("diff flow: want 2 gso writes got writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) } } @@ -322,6 +370,7 @@ func TestCoalescerRejectsIPOptions(t *testing.T) { if err := c.Flush(); err != nil { t.Fatal(err) } + // Non-admissible parse → passthrough as plain. if len(w.writes) != 1 || len(w.gsoWrites) != 0 { t.Fatalf("IP options should passthrough, got writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) } @@ -330,7 +379,7 @@ func TestCoalescerRejectsIPOptions(t *testing.T) { func TestCoalescerCapBySegments(t *testing.T) { w := &fakeTunWriter{gsoEnabled: true} c := newTCPCoalescer(w) - pay := make([]byte, 512) // small so we can fit many before byte cap + pay := make([]byte, 512) seq := uint32(1000) for i := 0; i < tcpCoalesceMaxSegs+5; i++ { if err := c.Add(buildTCPv4(seq, tcpAck, pay)); err != nil { @@ -341,16 +390,187 @@ func TestCoalescerCapBySegments(t *testing.T) { if err := c.Flush(); err != nil { t.Fatal(err) } - // We expect the first tcpCoalesceMaxSegs to form one gso, then 5 more: - // The 5 follow-ons seed a new super that completes as another gso if >=2, - // or a mix. Just assert we never exceed the cap per super. for _, g := range w.gsoWrites { - segs := (len(g.pkt) - int(g.hdrLen)) / int(g.gsoSize) - if rem := (len(g.pkt) - int(g.hdrLen)) % int(g.gsoSize); rem != 0 { - segs++ - } + segs := len(g.pays) if segs > tcpCoalesceMaxSegs { t.Fatalf("super exceeded seg cap: %d > %d", segs, tcpCoalesceMaxSegs) } } } + +// TestCoalescerMultipleFlowsInSameBatch proves two interleaved bulk TCP +// flows coalesce independently in a single Flush. +func TestCoalescerMultipleFlowsInSameBatch(t *testing.T) { + w := &fakeTunWriter{gsoEnabled: true} + c := newTCPCoalescer(w) + pay := make([]byte, 1200) + + // Flow A: sport 1000. Flow B: sport 3000. + if err := c.Add(buildTCPv4Ports(1000, 2000, 100, tcpAck, pay)); err != nil { + t.Fatal(err) + } + if err := c.Add(buildTCPv4Ports(3000, 2000, 500, tcpAck, pay)); err != nil { + t.Fatal(err) + } + if err := c.Add(buildTCPv4Ports(1000, 2000, 1300, tcpAck, pay)); err != nil { + t.Fatal(err) + } + if err := c.Add(buildTCPv4Ports(3000, 2000, 1700, tcpAck, pay)); err != nil { + t.Fatal(err) + } + if err := c.Add(buildTCPv4Ports(1000, 2000, 2500, tcpAck, pay)); err != nil { + t.Fatal(err) + } + if err := c.Add(buildTCPv4Ports(3000, 2000, 2900, tcpAck, pay)); err != nil { + t.Fatal(err) + } + if err := c.Flush(); err != nil { + t.Fatal(err) + } + + if len(w.gsoWrites) != 2 { + t.Fatalf("want 2 gso writes (one per flow), got %d", len(w.gsoWrites)) + } + if len(w.writes) != 0 { + t.Fatalf("want no plain writes, got %d", len(w.writes)) + } + // Each superpacket should carry 3 segments. + for i, g := range w.gsoWrites { + if len(g.pays) != 3 { + t.Errorf("gso[%d]: segs=%d want 3", i, len(g.pays)) + } + if g.gsoSize != 1200 { + t.Errorf("gso[%d]: gsoSize=%d want 1200", i, g.gsoSize) + } + } + // Verify each superpacket carries the source port it was seeded with. + seenSports := map[uint16]bool{} + for _, g := range w.gsoWrites { + sp := binary.BigEndian.Uint16(g.hdr[20:22]) + seenSports[sp] = true + } + if !seenSports[1000] || !seenSports[3000] { + t.Errorf("expected superpackets for sports 1000 and 3000, got %v", seenSports) + } +} + +// TestCoalescerPreservesArrivalOrder confirms that with passthrough and +// coalesced events both queued, Flush emits them in Add order rather than +// writing passthrough packets synchronously. +func TestCoalescerPreservesArrivalOrder(t *testing.T) { + w := &orderedFakeWriter{gsoEnabled: true} + c := newTCPCoalescer(w) + // Sequence: coalesceable TCP, ICMP (passthrough), coalesceable TCP on + // a different flow. Expected emit order: gso(X), plain(ICMP), gso(Y). + pay := make([]byte, 1200) + if err := c.Add(buildTCPv4Ports(1000, 2000, 100, tcpAck, pay)); err != nil { + t.Fatal(err) + } + icmp := make([]byte, 28) + icmp[0] = 0x45 + binary.BigEndian.PutUint16(icmp[2:4], 28) + icmp[9] = 1 + copy(icmp[12:16], []byte{10, 0, 0, 1}) + copy(icmp[16:20], []byte{10, 0, 0, 3}) + if err := c.Add(icmp); err != nil { + t.Fatal(err) + } + if err := c.Add(buildTCPv4Ports(3000, 2000, 500, tcpAck, pay)); err != nil { + t.Fatal(err) + } + // Nothing should have hit the writer synchronously. + if len(w.events) != 0 { + t.Fatalf("Add emitted events synchronously: %v", w.events) + } + if err := c.Flush(); err != nil { + t.Fatal(err) + } + if got, want := w.events, []string{"gso", "plain", "gso"}; !stringSliceEq(got, want) { + t.Fatalf("flush order=%v want %v", got, want) + } +} + +// orderedFakeWriter records only the sequence of call types so tests can +// assert arrival order without inspecting bytes. +type orderedFakeWriter struct { + gsoEnabled bool + events []string +} + +func (w *orderedFakeWriter) Write(p []byte) (int, error) { + w.events = append(w.events, "plain") + return len(p), nil +} + +func (w *orderedFakeWriter) WriteGSO(hdr []byte, pays [][]byte, gsoSize uint16, isV6 bool, csumStart uint16) error { + w.events = append(w.events, "gso") + return nil +} + +func (w *orderedFakeWriter) GSOSupported() bool { return w.gsoEnabled } + +func stringSliceEq(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +// TestCoalescerInterleavedFlowsPreserveOrdering checks that a non-admissible +// packet (SYN) mid-flow only flushes its own flow, not others. +func TestCoalescerInterleavedFlowsPreserveOrdering(t *testing.T) { + w := &fakeTunWriter{gsoEnabled: true} + c := newTCPCoalescer(w) + pay := make([]byte, 1200) + + // Flow A two segments. + if err := c.Add(buildTCPv4Ports(1000, 2000, 100, tcpAck, pay)); err != nil { + t.Fatal(err) + } + if err := c.Add(buildTCPv4Ports(1000, 2000, 1300, tcpAck, pay)); err != nil { + t.Fatal(err) + } + // Flow B two segments. + if err := c.Add(buildTCPv4Ports(3000, 2000, 500, tcpAck, pay)); err != nil { + t.Fatal(err) + } + if err := c.Add(buildTCPv4Ports(3000, 2000, 1700, tcpAck, pay)); err != nil { + t.Fatal(err) + } + // Flow A SYN (non-admissible) — must flush only flow A's slot. + syn := buildTCPv4Ports(1000, 2000, 9999, tcpSyn|tcpAck, pay) + if err := c.Add(syn); err != nil { + t.Fatal(err) + } + // Flow B continues — should still be coalesced with its seed. + if err := c.Add(buildTCPv4Ports(3000, 2000, 2900, tcpAck, pay)); err != nil { + t.Fatal(err) + } + if err := c.Flush(); err != nil { + t.Fatal(err) + } + + // Expected: + // - 1 gso for flow A (first 2 segments) + // - 1 plain for flow A SYN + // - 1 gso for flow B (3 segments) + if len(w.gsoWrites) != 2 { + t.Fatalf("want 2 gso writes, got %d", len(w.gsoWrites)) + } + if len(w.writes) != 1 { + t.Fatalf("want 1 plain write (SYN), got %d", len(w.writes)) + } + // Find the 3-segment gso (flow B) and the 2-segment gso (flow A). + var segCounts []int + for _, g := range w.gsoWrites { + segCounts = append(segCounts, len(g.pays)) + } + if !(segCounts[0] == 2 && segCounts[1] == 3) && !(segCounts[0] == 3 && segCounts[1] == 2) { + t.Errorf("unexpected segment counts: %v (want 2 and 3)", segCounts) + } +}