holy crap 2x

This commit is contained in:
JackDoan
2026-04-17 15:33:46 -05:00
parent 1fd24a19c7
commit 60e556866a
5 changed files with 643 additions and 307 deletions

View File

@@ -309,18 +309,28 @@ func (f *Interface) listenOut(i int) {
ctCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout) ctCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
lhh := f.lightHouse.NewRequestHandler() lhh := f.lightHouse.NewRequestHandler()
plaintext := make([]byte, udp.MTU)
h := &header.H{} h := &header.H{}
fwPacket := &firewall.Packet{} fwPacket := &firewall.Packet{}
nb := make([]byte, 12, 12) nb := make([]byte, 12, 12)
// plaintexts is a ring of decrypt scratches, one per packet in a UDP
// recvmmsg batch. The coalescer borrows payload slices from here and
// requires they stay valid until Flush — so we rotate each packet and
// reset only in the batch-end flush callback.
var plaintexts [][]byte
idx := 0
coalescer := f.tunCoalescers[i] coalescer := f.tunCoalescers[i]
err := li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) { err := li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) {
f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l)) if idx >= len(plaintexts) {
plaintexts = append(plaintexts, make([]byte, udp.MTU))
}
f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintexts[idx][:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l))
idx++
}, func() { }, func() {
if err := coalescer.Flush(); err != nil { if err := coalescer.Flush(); err != nil {
f.l.WithError(err).Error("Failed to flush tun coalescer") f.l.WithError(err).Error("Failed to flush tun coalescer")
} }
idx = 0
}) })
if err != nil && !f.closed.Load() { if err != nil && !f.closed.Load() {

View File

@@ -31,21 +31,24 @@ type Device interface {
NewMultiQueueReader() (Queue, error) NewMultiQueueReader() (Queue, error)
} }
// GSOWriter is implemented by Queues that can write a TCP TSO superpacket as // GSOWriter is implemented by Queues that can emit a TCP TSO superpacket
// a single virtio_net_hdr + payload writev, letting the kernel segment on // assembled from a header prefix plus one or more borrowed payload
// egress. Callers type-assert on it; backends that don't support GSO return // fragments, in a single vectored write (writev with a leading
// false from Supported and all coalescing logic is skipped. // virtio_net_hdr). This lets the coalescer avoid copying payload bytes
// between the caller's decrypt buffer and the TUN. Backends without GSO
// support return false from GSOSupported and coalescing is skipped.
// //
// pkt must contain the IPv4/IPv6 + TCP header plus the concatenated // hdr contains the IPv4/IPv6 + TCP header prefix (mutable — callers will
// coalesced payload. hdrLen is the total L3+L4 header length (where the // have filled in total length and pseudo-header partial). pays are
// payload starts). csumStart is the byte offset where the TCP header // non-overlapping payload fragments whose concatenation is the full
// begins (= IP header length). gsoSize is the MSS — every segment except // superpacket payload; they are read-only from the writer's perspective
// possibly the last must be exactly this many payload bytes. isV6 selects // and must remain valid until the call returns. gsoSize is the MSS:
// GSO_TCPV4 vs GSO_TCPV6. // every segment except possibly the last is exactly that many bytes.
// csumStart is the byte offset where the TCP header begins within hdr.
// //
// pkt's TCP checksum field must already hold the pseudo-header partial // hdr's TCP checksum field must already hold the pseudo-header partial
// sum (single-fold, not inverted), per virtio NEEDS_CSUM semantics. // sum (single-fold, not inverted), per virtio NEEDS_CSUM semantics.
type GSOWriter interface { type GSOWriter interface {
WriteGSO(pkt []byte, gsoSize uint16, isV6 bool, hdrLen, csumStart uint16) error WriteGSO(hdr []byte, pays [][]byte, gsoSize uint16, isV6 bool, csumStart uint16) error
GSOSupported() bool GSOSupported() bool
} }

View File

@@ -53,9 +53,17 @@ type tunFile struct {
// by WriteGSO. Separate from zeroVnetHdr so a concurrent non-GSO Write on // by WriteGSO. Separate from zeroVnetHdr so a concurrent non-GSO Write on
// another queue never observes a half-written header. // another queue never observes a half-written header.
gsoHdrBuf [virtioNetHdrLen]byte gsoHdrBuf [virtioNetHdrLen]byte
gsoIovs [2]unix.Iovec // gsoIovs is the writev iovec scratch for WriteGSO. Sized to hold the
// virtio header + IP/TCP header + up to gsoInitialPayIovs payload
// fragments; grown on demand if a coalescer pushes more.
gsoIovs []unix.Iovec
} }
// gsoInitialPayIovs is the starting capacity (in payload fragments) of
// tunFile.gsoIovs. Sized to cover the default coalesce segment cap without
// any reallocations.
const gsoInitialPayIovs = 66
// zeroVnetHdr is the 10-byte virtio_net_hdr we prepend to every TUN write when // zeroVnetHdr is the 10-byte virtio_net_hdr we prepend to every TUN write when
// IFF_VNET_HDR is active. All-zero signals "no GSO, no checksum offload"; the // IFF_VNET_HDR is active. All-zero signals "no GSO, no checksum offload"; the
// kernel accepts the packet as-is. // kernel accepts the packet as-is.
@@ -84,6 +92,7 @@ func (r *tunFile) newFriend(fd int) (*tunFile, error) {
out.segBuf = make([]byte, tunSegBufCap) out.segBuf = make([]byte, tunSegBufCap)
out.writeIovs[0].Base = &zeroVnetHdr[0] out.writeIovs[0].Base = &zeroVnetHdr[0]
out.writeIovs[0].SetLen(virtioNetHdrLen) out.writeIovs[0].SetLen(virtioNetHdrLen)
out.gsoIovs = make([]unix.Iovec, 2, 2+gsoInitialPayIovs)
out.gsoIovs[0].Base = &out.gsoHdrBuf[0] out.gsoIovs[0].Base = &out.gsoHdrBuf[0]
out.gsoIovs[0].SetLen(virtioNetHdrLen) out.gsoIovs[0].SetLen(virtioNetHdrLen)
} }
@@ -119,6 +128,7 @@ func newTunFd(fd int, vnetHdr bool) (*tunFile, error) {
out.segBuf = make([]byte, tunSegBufCap) out.segBuf = make([]byte, tunSegBufCap)
out.writeIovs[0].Base = &zeroVnetHdr[0] out.writeIovs[0].Base = &zeroVnetHdr[0]
out.writeIovs[0].SetLen(virtioNetHdrLen) out.writeIovs[0].SetLen(virtioNetHdrLen)
out.gsoIovs = make([]unix.Iovec, 2, 2+gsoInitialPayIovs)
out.gsoIovs[0].Base = &out.gsoHdrBuf[0] out.gsoIovs[0].Base = &out.gsoHdrBuf[0]
out.gsoIovs[0].SetLen(virtioNetHdrLen) out.gsoIovs[0].SetLen(virtioNetHdrLen)
} }
@@ -346,46 +356,79 @@ func (r *tunFile) Write(buf []byte) (int, error) {
// Write calls. // Write calls.
func (r *tunFile) GSOSupported() bool { return r.vnetHdr } func (r *tunFile) GSOSupported() bool { return r.vnetHdr }
// WriteGSO emits pkt as a single TCP TSO superpacket via writev. pkt must // WriteGSO emits a TCP TSO superpacket in a single writev. hdr is the
// contain a full IPv4/IPv6 + TCP header prefix followed by the concatenated // IPv4/IPv6 + TCP header prefix (already finalized — total length, IP csum,
// coalesced payload. The TCP checksum field must already hold the // and TCP pseudo-header partial set by the caller). pays are payload
// pseudo-header partial (NEEDS_CSUM semantics). gsoSize is the MSS; every // fragments whose concatenation forms the full coalesced payload; each
// segment except the last must be exactly that many payload bytes. // slice is read-only and must stay valid until return. gsoSize is the MSS;
func (r *tunFile) WriteGSO(pkt []byte, gsoSize uint16, isV6 bool, hdrLen, csumStart uint16) error { // every segment except possibly the last is exactly gsoSize bytes.
// csumStart is the byte offset where the TCP header begins within hdr.
func (r *tunFile) WriteGSO(hdr []byte, pays [][]byte, gsoSize uint16, isV6 bool, csumStart uint16) error {
if !r.vnetHdr { if !r.vnetHdr {
return fmt.Errorf("WriteGSO called on tun without IFF_VNET_HDR") return fmt.Errorf("WriteGSO called on tun without IFF_VNET_HDR")
} }
if len(pkt) == 0 { if len(hdr) == 0 || len(pays) == 0 {
return nil return nil
} }
hdr := virtioNetHdr{
// Build the virtio_net_hdr. When pays total to <= gsoSize the kernel
// would produce a single segment; keep NEEDS_CSUM semantics but skip
// the GSO type so the kernel doesn't spuriously mark this as TSO.
vhdr := virtioNetHdr{
Flags: unix.VIRTIO_NET_HDR_F_NEEDS_CSUM, Flags: unix.VIRTIO_NET_HDR_F_NEEDS_CSUM,
HdrLen: hdrLen, HdrLen: uint16(len(hdr)),
GSOSize: gsoSize, GSOSize: gsoSize,
CsumStart: csumStart, CsumStart: csumStart,
CsumOffset: 16, // TCP checksum field lives 16 bytes into the TCP header CsumOffset: 16, // TCP checksum field lives 16 bytes into the TCP header
} }
if isV6 { var totalPay int
hdr.GSOType = unix.VIRTIO_NET_HDR_GSO_TCPV6 for _, p := range pays {
} else { totalPay += len(p)
hdr.GSOType = unix.VIRTIO_NET_HDR_GSO_TCPV4 }
if totalPay > int(gsoSize) {
if isV6 {
vhdr.GSOType = unix.VIRTIO_NET_HDR_GSO_TCPV6
} else {
vhdr.GSOType = unix.VIRTIO_NET_HDR_GSO_TCPV4
}
} else {
vhdr.GSOType = unix.VIRTIO_NET_HDR_GSO_NONE
vhdr.GSOSize = 0
}
vhdr.encode(r.gsoHdrBuf[:])
// Build the iovec array: [virtio_hdr, hdr, pays...]. r.gsoIovs[0] is
// wired to gsoHdrBuf at construction and never changes.
need := 2 + len(pays)
if cap(r.gsoIovs) < need {
grown := make([]unix.Iovec, need)
grown[0] = r.gsoIovs[0]
r.gsoIovs = grown
} else {
r.gsoIovs = r.gsoIovs[:need]
}
r.gsoIovs[1].Base = &hdr[0]
r.gsoIovs[1].SetLen(len(hdr))
for i, p := range pays {
r.gsoIovs[2+i].Base = &p[0]
r.gsoIovs[2+i].SetLen(len(p))
} }
hdr.encode(r.gsoHdrBuf[:])
r.gsoIovs[1].Base = &pkt[0]
r.gsoIovs[1].SetLen(len(pkt))
iovPtr := uintptr(unsafe.Pointer(&r.gsoIovs[0])) iovPtr := uintptr(unsafe.Pointer(&r.gsoIovs[0]))
iovCnt := uintptr(len(r.gsoIovs))
for { for {
n, _, errno := syscall.RawSyscall(unix.SYS_WRITEV, uintptr(r.fd), iovPtr, 2) n, _, errno := syscall.RawSyscall(unix.SYS_WRITEV, uintptr(r.fd), iovPtr, iovCnt)
if errno == 0 { if errno == 0 {
runtime.KeepAlive(pkt) runtime.KeepAlive(hdr)
runtime.KeepAlive(pays)
if int(n) < virtioNetHdrLen { if int(n) < virtioNetHdrLen {
return io.ErrShortWrite return io.ErrShortWrite
} }
return nil return nil
} }
if errno == unix.EAGAIN { if errno == unix.EAGAIN {
runtime.KeepAlive(pkt) runtime.KeepAlive(hdr)
runtime.KeepAlive(pays)
if err := r.blockOnWrite(); err != nil { if err := r.blockOnWrite(); err != nil {
return err return err
} }
@@ -394,7 +437,8 @@ func (r *tunFile) WriteGSO(pkt []byte, gsoSize uint16, isV6 bool, hdrLen, csumSt
if errno == unix.EINTR { if errno == unix.EINTR {
continue continue
} }
runtime.KeepAlive(pkt) runtime.KeepAlive(hdr)
runtime.KeepAlive(pays)
return errno return errno
} }
} }

View File

@@ -7,72 +7,114 @@ import (
"github.com/slackhq/nebula/overlay" "github.com/slackhq/nebula/overlay"
) )
// IPPROTO_TCP is the IANA protocol number for TCP. Hardcoded instead of // ipProtoTCP is the IANA protocol number for TCP. Hardcoded instead of
// reaching for ipProtoTCP because golang.org/x/sys/unix doesn't // reaching for golang.org/x/sys/unix — that package doesn't define the
// define that constant on Windows, which would break cross-compiles even // constant on Windows, which would break cross-compiles even though this
// though this file runs unchanged on every platform. // file runs unchanged on every platform.
const ipProtoTCP = 6 const ipProtoTCP = 6
// tcpCoalesceBufSize bounds the largest coalesced superpacket we will buffer. // tcpCoalesceBufSize caps total bytes per superpacket. Mirrors the kernel's
// Linux caps sk_gso_max_size around 64KiB; 65535 bytes covers IP hdr + TCP // sk_gso_max_size of ~64KiB; anything beyond this would be rejected anyway.
// hdr + up to ~65KB of payload, which is the most the kernel's TSO can
// segment in one shot.
const tcpCoalesceBufSize = 65535 const tcpCoalesceBufSize = 65535
// tcpCoalesceMaxSegs caps how many segments we are willing to coalesce into // tcpCoalesceMaxSegs caps how many segments we'll coalesce into a single
// a single superpacket regardless of byte budget. Kernel allows up to 64 // superpacket. Keeping this well below the kernel's TSO ceiling bounds
// for UDP GSO and 128 for many TSO engines; stop well before either limit // latency.
// to keep latency bounded.
const tcpCoalesceMaxSegs = 64 const tcpCoalesceMaxSegs = 64
// tcpCoalescer accumulates adjacent in-flow TCP data segments into a single // tcpCoalesceHdrCap is the scratch space we copy a seed's IP+TCP header
// TSO superpacket and emits them via overlay.GSOWriter in one writev. When // into. IPv6 (40) + TCP with full options (60) = 100 bytes.
// a packet fails admission or fails to extend the pending flow, the const tcpCoalesceHdrCap = 100
// pending superpacket is flushed and the non-matching packet is written
// through as-is. Owns no locks — one coalescer per TUN write queue. // initialSlots is the starting capacity of the slot pool. One flow per
// packet is the worst case so this matches a typical UDP recvmmsg batch.
const initialSlots = 64
// flowKey identifies a TCP flow by {src, dst, sport, dport, family}.
// Comparable, so linear scans over the slot list stay tight.
type flowKey struct {
src, dst [16]byte
sport, dport uint16
isV6 bool
}
// coalesceSlot is one entry in the coalescer's ordered event queue. When
// passthrough is true the slot holds a single borrowed packet that must be
// emitted verbatim (non-TCP, non-admissible TCP, or oversize seed). When
// passthrough is false the slot is an in-progress coalesced superpacket:
// hdrBuf is a mutable copy of the seed's IP+TCP header (we patch total
// length and pseudo-header partial at flush), and payIovs are *borrowed*
// slices from the caller's plaintext buffers — no payload is ever copied.
// The caller (listenOut) must keep those buffers alive until Flush.
type coalesceSlot struct {
passthrough bool
rawPkt []byte // borrowed when passthrough
fk flowKey
hdrBuf [tcpCoalesceHdrCap]byte
hdrLen int
ipHdrLen int
isV6 bool
gsoSize int
numSeg int
totalPay int
nextSeq uint32
// psh closes the chain: set when the last-accepted segment had PSH or
// was sub-gsoSize. No further appends after that.
psh bool
payIovs [][]byte
}
// tcpCoalescer accumulates adjacent in-flow TCP data segments across
// multiple concurrent flows and emits each flow's run as a single TSO
// superpacket via overlay.GSOWriter. All output — coalesced or not — is
// deferred until Flush so arrival order is preserved on the wire. Owns
// no locks; one coalescer per TUN write queue.
type tcpCoalescer struct { type tcpCoalescer struct {
plainW io.Writer plainW io.Writer
gsoW overlay.GSOWriter // nil when the queue doesn't support TSO gsoW overlay.GSOWriter // nil when the queue doesn't support TSO
buf []byte // slots is the ordered event queue. Flush walks it once and emits each
bufLen int // valid bytes in buf — hdrLen plus accumulated payload // entry as either a WriteGSO (coalesced) or a plainW.Write (passthrough).
active bool // a seed packet is present slots []*coalesceSlot
numSeg int // openSlots maps a flow key to its most recent non-sealed slot, so new
gsoSize int // payload length of each segment (= MSS of the seed) // segments can extend an in-progress superpacket in O(1). Slots are
isV6 bool // removed from this map when they close (PSH or short-last-segment),
ipHdrLen int // when a non-admissible packet for that flow arrives, or in Flush.
hdrLen int // ipHdrLen + tcpHdrLen, the offset where payload starts openSlots map[flowKey]*coalesceSlot
nextSeq uint32 // expected TCP seq of the next packet to coalesce pool []*coalesceSlot // free list for reuse
// psh indicates the last-accepted segment had PSH set. We accept a PSH
// packet as the final segment but reject any further Adds after that.
psh bool
} }
func newTCPCoalescer(w io.Writer) *tcpCoalescer { func newTCPCoalescer(w io.Writer) *tcpCoalescer {
c := &tcpCoalescer{plainW: w, buf: make([]byte, tcpCoalesceBufSize)} c := &tcpCoalescer{
plainW: w,
slots: make([]*coalesceSlot, 0, initialSlots),
openSlots: make(map[flowKey]*coalesceSlot, initialSlots),
pool: make([]*coalesceSlot, 0, initialSlots),
}
if gw, ok := w.(overlay.GSOWriter); ok && gw.GSOSupported() { if gw, ok := w.(overlay.GSOWriter); ok && gw.GSOSupported() {
c.gsoW = gw c.gsoW = gw
} }
return c return c
} }
// parsedTCP holds the byte offsets / values we extract from one admission // parsedTCP holds the fields extracted from a single parse so later steps
// check so Add and canAppend don't re-parse the same header twice. // (admission, slot lookup, canAppend) don't re-walk the header.
type parsedTCP struct { type parsedTCP struct {
isV6 bool fk flowKey
ipHdrLen int ipHdrLen int
tcpHdrLen int tcpHdrLen int
hdrLen int // ipHdrLen + tcpHdrLen hdrLen int
payLen int payLen int
seq uint32 seq uint32
flags byte flags byte
} }
// parseCoalesceable decides whether pkt is eligible for TCP coalescing. It // parseTCPBase extracts the flow key and IP/TCP offsets for any TCP packet,
// accepts IPv4 (no options, DF set, no fragmentation) and IPv6 (no // regardless of whether it's admissible for coalescing. Returns ok=false
// extension headers) carrying a TCP segment with flags in {ACK, ACK|PSH} // for non-TCP or malformed input. Accepts IPv4 (no options, no fragmentation)
// and a non-empty payload. On success it returns the parsed offsets. // and IPv6 (no extension headers).
func parseCoalesceable(pkt []byte) (parsedTCP, bool) { func parseTCPBase(pkt []byte) (parsedTCP, bool) {
var p parsedTCP var p parsedTCP
if len(pkt) < 20 { if len(pkt) < 20 {
return p, false return p, false
@@ -80,42 +122,41 @@ func parseCoalesceable(pkt []byte) (parsedTCP, bool) {
v := pkt[0] >> 4 v := pkt[0] >> 4
switch v { switch v {
case 4: case 4:
if len(pkt) < 20 {
return p, false
}
ihl := int(pkt[0]&0x0f) * 4 ihl := int(pkt[0]&0x0f) * 4
if ihl != 20 { if ihl != 20 {
return p, false // reject IP options return p, false
} }
if pkt[9] != ipProtoTCP { if pkt[9] != ipProtoTCP {
return p, false return p, false
} }
// Fragment check: MF=0 and frag offset=0. Accept DF=1 or DF=0 — // Reject actual fragmentation (MF or non-zero frag offset).
// just reject any actual fragmentation. if binary.BigEndian.Uint16(pkt[6:8])&0x3fff != 0 {
fragField := binary.BigEndian.Uint16(pkt[6:8])
if fragField&0x3fff != 0 {
return p, false return p, false
} }
totalLen := int(binary.BigEndian.Uint16(pkt[2:4])) totalLen := int(binary.BigEndian.Uint16(pkt[2:4]))
if totalLen > len(pkt) || totalLen < ihl { if totalLen > len(pkt) || totalLen < ihl {
return p, false return p, false
} }
p.isV6 = false p.ipHdrLen = 20
p.ipHdrLen = ihl p.fk.isV6 = false
copy(p.fk.src[:4], pkt[12:16])
copy(p.fk.dst[:4], pkt[16:20])
pkt = pkt[:totalLen] pkt = pkt[:totalLen]
case 6: case 6:
if len(pkt) < 40 { if len(pkt) < 40 {
return p, false return p, false
} }
if pkt[6] != ipProtoTCP { if pkt[6] != ipProtoTCP {
return p, false // reject ext headers return p, false
} }
payloadLen := int(binary.BigEndian.Uint16(pkt[4:6])) payloadLen := int(binary.BigEndian.Uint16(pkt[4:6]))
if 40+payloadLen > len(pkt) { if 40+payloadLen > len(pkt) {
return p, false return p, false
} }
p.isV6 = true
p.ipHdrLen = 40 p.ipHdrLen = 40
p.fk.isV6 = true
copy(p.fk.src[:], pkt[8:24])
copy(p.fk.dst[:], pkt[24:40])
pkt = pkt[:40+payloadLen] pkt = pkt[:40+payloadLen]
default: default:
return p, false return p, false
@@ -131,146 +172,216 @@ func parseCoalesceable(pkt []byte) (parsedTCP, bool) {
if len(pkt) < p.ipHdrLen+tcpOff { if len(pkt) < p.ipHdrLen+tcpOff {
return p, false return p, false
} }
flags := pkt[p.ipHdrLen+13]
// Allow only ACK and ACK|PSH. In particular: no SYN/FIN/RST/URG/CWR/ECE.
const ack = 0x10
const psh = 0x08
if flags&^(ack|psh) != 0 || flags&ack == 0 {
return p, false
}
p.tcpHdrLen = tcpOff p.tcpHdrLen = tcpOff
p.hdrLen = p.ipHdrLen + tcpOff p.hdrLen = p.ipHdrLen + tcpOff
p.payLen = len(pkt) - p.hdrLen p.payLen = len(pkt) - p.hdrLen
if p.payLen <= 0 {
return p, false
}
p.seq = binary.BigEndian.Uint32(pkt[p.ipHdrLen+4 : p.ipHdrLen+8]) p.seq = binary.BigEndian.Uint32(pkt[p.ipHdrLen+4 : p.ipHdrLen+8])
p.flags = flags p.flags = pkt[p.ipHdrLen+13]
p.fk.sport = binary.BigEndian.Uint16(pkt[p.ipHdrLen : p.ipHdrLen+2])
p.fk.dport = binary.BigEndian.Uint16(pkt[p.ipHdrLen+2 : p.ipHdrLen+4])
return p, true return p, true
} }
// Add takes a plaintext inbound packet destined for the tun. If GSO is // coalesceable reports whether a parsed TCP segment is eligible for
// unavailable or the packet isn't coalesceable, Add falls through to a // coalescing. Accepts only ACK or ACK|PSH with a non-empty payload.
// plain Write on the underlying queue (flushing any pending superpacket func (p parsedTCP) coalesceable() bool {
// first). const ack = 0x10
const psh = 0x08
if p.flags&^(ack|psh) != 0 || p.flags&ack == 0 {
return false
}
return p.payLen > 0
}
// Add borrows pkt. The caller must keep pkt valid until the next Flush,
// whether or not the packet was coalesced — passthrough (non-admissible)
// packets are queued and written at Flush time, not synchronously.
func (c *tcpCoalescer) Add(pkt []byte) error { func (c *tcpCoalescer) Add(pkt []byte) error {
if c.gsoW == nil { if c.gsoW == nil {
_, err := c.plainW.Write(pkt) c.addPassthrough(pkt)
return err return nil
} }
info, ok := parseCoalesceable(pkt) info, ok := parseTCPBase(pkt)
if !ok { if !ok {
if c.active { // Non-TCP or malformed — can't possibly collide with an open flow.
if err := c.flushLocked(); err != nil { c.addPassthrough(pkt)
return err return nil
} }
} if !info.coalesceable() {
_, err := c.plainW.Write(pkt) // TCP but not admissible (SYN/FIN/RST/URG/CWR/ECE or zero-payload).
return err // Seal this flow's open slot so later in-flow packets don't extend
// it and accidentally reorder past this passthrough.
delete(c.openSlots, info.fk)
c.addPassthrough(pkt)
return nil
} }
if c.active { if open := c.openSlots[info.fk]; open != nil {
if c.canAppend(pkt, info) { if c.canAppend(open, pkt, info) {
c.appendPayload(pkt, info) c.appendPayload(open, pkt, info)
if info.flags&0x08 != 0 { if open.psh {
c.psh = true delete(c.openSlots, info.fk)
} }
return nil return nil
} }
if err := c.flushLocked(); err != nil { // Can't extend — seal it and fall through to seed a fresh slot.
return err delete(c.openSlots, info.fk)
} }
} c.seed(pkt, info)
return c.seed(pkt, info) return nil
} }
// Flush emits any pending superpacket. Called by the UDP read loop at // Flush emits every queued event in arrival order. Coalesced slots go out
// recvmmsg batch boundaries — "no more packets coming right now". // via WriteGSO; passthrough slots go out via plainW.Write. Returns the
// first error observed; keeps draining so one bad packet doesn't hold up
// the rest. After Flush returns, borrowed payload slices may be recycled.
func (c *tcpCoalescer) Flush() error { func (c *tcpCoalescer) Flush() error {
if !c.active { var first error
return nil for _, s := range c.slots {
var err error
if s.passthrough {
_, err = c.plainW.Write(s.rawPkt)
} else {
err = c.flushSlot(s)
} }
return c.flushLocked() if err != nil && first == nil {
first = err
}
c.release(s)
}
for i := range c.slots {
c.slots[i] = nil
}
c.slots = c.slots[:0]
for k := range c.openSlots {
delete(c.openSlots, k)
}
return first
} }
func (c *tcpCoalescer) reset() { func (c *tcpCoalescer) addPassthrough(pkt []byte) {
c.active = false s := c.take()
c.bufLen = 0 s.passthrough = true
c.numSeg = 0 s.rawPkt = pkt
c.gsoSize = 0 c.slots = append(c.slots, s)
c.hdrLen = 0
c.ipHdrLen = 0
c.nextSeq = 0
c.psh = false
} }
func (c *tcpCoalescer) seed(pkt []byte, info parsedTCP) error { func (c *tcpCoalescer) seed(pkt []byte, info parsedTCP) {
if info.hdrLen+info.payLen > len(c.buf) { if info.hdrLen > tcpCoalesceHdrCap || info.hdrLen+info.payLen > tcpCoalesceBufSize {
// Oversize single packet — flush (already done above) and passthrough. // Pathological shape — can't fit our scratch, emit as-is.
_, err := c.plainW.Write(pkt) c.addPassthrough(pkt)
return err return
}
s := c.take()
s.passthrough = false
s.rawPkt = nil
copy(s.hdrBuf[:], pkt[:info.hdrLen])
s.hdrLen = info.hdrLen
s.ipHdrLen = info.ipHdrLen
s.isV6 = info.fk.isV6
s.fk = info.fk
s.gsoSize = info.payLen
s.numSeg = 1
s.totalPay = info.payLen
s.nextSeq = info.seq + uint32(info.payLen)
s.psh = info.flags&0x08 != 0
s.payIovs = append(s.payIovs[:0], pkt[info.hdrLen:info.hdrLen+info.payLen])
c.slots = append(c.slots, s)
if !s.psh {
c.openSlots[info.fk] = s
} }
copy(c.buf, pkt[:info.hdrLen+info.payLen])
c.active = true
c.bufLen = info.hdrLen + info.payLen
c.numSeg = 1
c.gsoSize = info.payLen
c.isV6 = info.isV6
c.ipHdrLen = info.ipHdrLen
c.hdrLen = info.hdrLen
c.nextSeq = info.seq + uint32(info.payLen)
c.psh = info.flags&0x08 != 0
return nil
} }
// canAppend reports whether info's packet extends the current seed: same // canAppend reports whether info's packet extends the slot's seed: same
// flow, adjacent seq, payload size rule, and no-PSH-mid-chain. // header shape and stable contents, adjacent seq, not oversized, chain not
func (c *tcpCoalescer) canAppend(pkt []byte, info parsedTCP) bool { // closed.
if c.psh { func (c *tcpCoalescer) canAppend(s *coalesceSlot, pkt []byte, info parsedTCP) bool {
return false // we already accepted a PSH — chain is closed if s.psh {
}
if info.isV6 != c.isV6 {
return false return false
} }
if info.hdrLen != c.hdrLen { if info.hdrLen != s.hdrLen {
return false return false
} }
if info.seq != c.nextSeq { if info.seq != s.nextSeq {
return false return false
} }
if c.numSeg >= tcpCoalesceMaxSegs { if s.numSeg >= tcpCoalesceMaxSegs {
return false return false
} }
if c.bufLen+info.payLen > len(c.buf) { if info.payLen > s.gsoSize {
return false return false
} }
// Every mid-chain segment must be exactly gsoSize. The final segment may if s.hdrLen+s.totalPay+info.payLen > tcpCoalesceBufSize {
// be shorter, but once a short segment is appended we can't add another.
if info.payLen > c.gsoSize {
return false return false
} }
if info.payLen < c.gsoSize { if !headersMatch(s.hdrBuf[:s.hdrLen], pkt[:info.hdrLen], s.isV6, s.ipHdrLen) {
// Will become the last segment — always OK to append, just no more.
}
// Compare the stable parts of the header.
if !headersMatch(c.buf[:c.hdrLen], pkt[:info.hdrLen], c.isV6, c.ipHdrLen) {
return false return false
} }
return true return true
} }
func (c *tcpCoalescer) appendPayload(pkt []byte, info parsedTCP) { func (c *tcpCoalescer) appendPayload(s *coalesceSlot, pkt []byte, info parsedTCP) {
copy(c.buf[c.bufLen:], pkt[info.hdrLen:info.hdrLen+info.payLen]) s.payIovs = append(s.payIovs, pkt[info.hdrLen:info.hdrLen+info.payLen])
c.bufLen += info.payLen s.numSeg++
c.numSeg++ s.totalPay += info.payLen
c.nextSeq = info.seq + uint32(info.payLen) s.nextSeq = info.seq + uint32(info.payLen)
// If this was a sub-gsoSize last segment, mark chain as closed. if info.payLen < s.gsoSize || info.flags&0x08 != 0 {
if info.payLen < c.gsoSize { s.psh = true
c.psh = true
} }
} }
func (c *tcpCoalescer) take() *coalesceSlot {
if n := len(c.pool); n > 0 {
s := c.pool[n-1]
c.pool[n-1] = nil
c.pool = c.pool[:n-1]
return s
}
return &coalesceSlot{}
}
func (c *tcpCoalescer) release(s *coalesceSlot) {
s.passthrough = false
s.rawPkt = nil
for i := range s.payIovs {
s.payIovs[i] = nil
}
s.payIovs = s.payIovs[:0]
s.numSeg = 0
s.totalPay = 0
s.psh = false
c.pool = append(c.pool, s)
}
// flushSlot patches the header and calls WriteGSO. Does not remove the
// slot from c.slots.
func (c *tcpCoalescer) flushSlot(s *coalesceSlot) error {
total := s.hdrLen + s.totalPay
l4Len := total - s.ipHdrLen
hdr := s.hdrBuf[:s.hdrLen]
if s.isV6 {
binary.BigEndian.PutUint16(hdr[4:6], uint16(l4Len))
} else {
binary.BigEndian.PutUint16(hdr[2:4], uint16(total))
hdr[10] = 0
hdr[11] = 0
binary.BigEndian.PutUint16(hdr[10:12], ipv4HdrChecksum(hdr[:s.ipHdrLen]))
}
var psum uint32
if s.isV6 {
psum = pseudoSumIPv6(hdr[8:24], hdr[24:40], ipProtoTCP, l4Len)
} else {
psum = pseudoSumIPv4(hdr[12:16], hdr[16:20], ipProtoTCP, l4Len)
}
tcsum := s.ipHdrLen + 16
binary.BigEndian.PutUint16(hdr[tcsum:tcsum+2], foldOnceNoInvert(psum))
return c.gsoW.WriteGSO(hdr, s.payIovs, uint16(s.gsoSize), s.isV6, uint16(s.ipHdrLen))
}
// headersMatch compares two IP+TCP header prefixes for byte-for-byte // headersMatch compares two IP+TCP header prefixes for byte-for-byte
// equality on every field that must be identical across coalesced // equality on every field that must be identical across coalesced
// segments. Size/IPID/IPCsum/seq/flags/tcpCsum are masked out. // segments. Size/IPID/IPCsum/seq/flags/tcpCsum are masked out.
@@ -330,58 +441,6 @@ func bytesEq(a, b []byte) bool {
return true return true
} }
func (c *tcpCoalescer) flushLocked() error {
// Guarantee the coalescer is empty on exit regardless of how we leave.
defer c.reset()
if c.numSeg <= 1 {
_, err := c.plainW.Write(c.buf[:c.bufLen])
return err
}
total := c.bufLen
l4Len := total - c.ipHdrLen
// Fix IP header length field.
if c.isV6 {
if l4Len > 0xffff {
// Shouldn't happen given buffer size, but guard against it.
return c.flushAsPerSegment()
}
binary.BigEndian.PutUint16(c.buf[4:6], uint16(l4Len))
} else {
if total > 0xffff {
return c.flushAsPerSegment()
}
binary.BigEndian.PutUint16(c.buf[2:4], uint16(total))
// Recompute IPv4 header checksum.
c.buf[10] = 0
c.buf[11] = 0
binary.BigEndian.PutUint16(c.buf[10:12], ipv4HdrChecksum(c.buf[:c.ipHdrLen]))
}
// Write the virtio NEEDS_CSUM pseudo-header partial into the TCP csum field.
var psum uint32
if c.isV6 {
psum = pseudoSumIPv6(c.buf[8:24], c.buf[24:40], ipProtoTCP, l4Len)
} else {
psum = pseudoSumIPv4(c.buf[12:16], c.buf[16:20], ipProtoTCP, l4Len)
}
tcsum := c.ipHdrLen + 16
binary.BigEndian.PutUint16(c.buf[tcsum:tcsum+2], foldOnceNoInvert(psum))
return c.gsoW.WriteGSO(c.buf[:total], uint16(c.gsoSize), c.isV6, uint16(c.hdrLen), uint16(c.ipHdrLen))
}
// flushAsPerSegment is a defensive fallback used if the coalesced superpacket
// somehow exceeds 16-bit length fields. It writes the packet as-is through
// the plain writer (the kernel will reject it, but that's a visible error
// rather than silent corruption).
func (c *tcpCoalescer) flushAsPerSegment() error {
_, err := c.plainW.Write(c.buf[:c.bufLen])
return err
}
// ipv4HdrChecksum computes the IPv4 header checksum over hdr (which must // ipv4HdrChecksum computes the IPv4 header checksum over hdr (which must
// already have its checksum field zeroed) and returns the folded/inverted // already have its checksum field zeroed) and returns the folded/inverted
// 16-bit value to store. // 16-bit value to store.

View File

@@ -5,8 +5,9 @@ import (
"testing" "testing"
) )
// A minimal stub writer that records each plain Write and each WriteGSO // fakeTunWriter records plain Writes and WriteGSO calls without touching a
// call without touching a real TUN fd. // real TUN fd. WriteGSO preserves the split between hdr and borrowed pays
// so tests can inspect each independently.
type fakeTunWriter struct { type fakeTunWriter struct {
gsoEnabled bool gsoEnabled bool
writes [][]byte writes [][]byte
@@ -14,13 +15,31 @@ type fakeTunWriter struct {
} }
type fakeGSOWrite struct { type fakeGSOWrite struct {
pkt []byte hdr []byte
pays [][]byte
gsoSize uint16 gsoSize uint16
isV6 bool isV6 bool
hdrLen uint16
csumStart uint16 csumStart uint16
} }
// total returns hdrLen + sum of pay lens.
func (g fakeGSOWrite) total() int {
n := len(g.hdr)
for _, p := range g.pays {
n += len(p)
}
return n
}
// payLen sums the pays.
func (g fakeGSOWrite) payLen() int {
var n int
for _, p := range g.pays {
n += len(p)
}
return n
}
func (w *fakeTunWriter) Write(p []byte) (int, error) { func (w *fakeTunWriter) Write(p []byte) (int, error) {
buf := make([]byte, len(p)) buf := make([]byte, len(p))
copy(buf, p) copy(buf, p)
@@ -28,10 +47,22 @@ func (w *fakeTunWriter) Write(p []byte) (int, error) {
return len(p), nil return len(p), nil
} }
func (w *fakeTunWriter) WriteGSO(pkt []byte, gsoSize uint16, isV6 bool, hdrLen, csumStart uint16) error { func (w *fakeTunWriter) WriteGSO(hdr []byte, pays [][]byte, gsoSize uint16, isV6 bool, csumStart uint16) error {
buf := make([]byte, len(pkt)) hcopy := make([]byte, len(hdr))
copy(buf, pkt) copy(hcopy, hdr)
w.gsoWrites = append(w.gsoWrites, fakeGSOWrite{pkt: buf, gsoSize: gsoSize, isV6: isV6, hdrLen: hdrLen, csumStart: csumStart}) paysCopy := make([][]byte, len(pays))
for i, p := range pays {
pc := make([]byte, len(p))
copy(pc, p)
paysCopy[i] = pc
}
w.gsoWrites = append(w.gsoWrites, fakeGSOWrite{
hdr: hcopy,
pays: paysCopy,
gsoSize: gsoSize,
isV6: isV6,
csumStart: csumStart,
})
return nil return nil
} }
@@ -40,33 +71,34 @@ func (w *fakeTunWriter) GSOSupported() bool { return w.gsoEnabled }
// buildTCPv4 constructs a minimal IPv4+TCP packet with the given payload, // buildTCPv4 constructs a minimal IPv4+TCP packet with the given payload,
// seq, and flags. Assumes no IP options and a 20-byte TCP header. // seq, and flags. Assumes no IP options and a 20-byte TCP header.
func buildTCPv4(seq uint32, flags byte, payload []byte) []byte { func buildTCPv4(seq uint32, flags byte, payload []byte) []byte {
return buildTCPv4Ports(1000, 2000, seq, flags, payload)
}
// buildTCPv4Ports is buildTCPv4 with caller-specified ports so tests can
// build distinct flows.
func buildTCPv4Ports(sport, dport uint16, seq uint32, flags byte, payload []byte) []byte {
const ipHdrLen = 20 const ipHdrLen = 20
const tcpHdrLen = 20 const tcpHdrLen = 20
total := ipHdrLen + tcpHdrLen + len(payload) total := ipHdrLen + tcpHdrLen + len(payload)
pkt := make([]byte, total) pkt := make([]byte, total)
// IPv4 header. pkt[0] = 0x45
pkt[0] = 0x45 // version 4, IHL 5 pkt[1] = 0x00
pkt[1] = 0x00 // TOS
binary.BigEndian.PutUint16(pkt[2:4], uint16(total)) binary.BigEndian.PutUint16(pkt[2:4], uint16(total))
binary.BigEndian.PutUint16(pkt[4:6], 0) // id binary.BigEndian.PutUint16(pkt[4:6], 0)
binary.BigEndian.PutUint16(pkt[6:8], 0x4000) // DF binary.BigEndian.PutUint16(pkt[6:8], 0x4000)
pkt[8] = 64 // TTL pkt[8] = 64
pkt[9] = ipProtoTCP pkt[9] = ipProtoTCP
// csum left zero — coalescer recomputes on emit. copy(pkt[12:16], []byte{10, 0, 0, 1})
copy(pkt[12:16], []byte{10, 0, 0, 1}) // src copy(pkt[16:20], []byte{10, 0, 0, 2})
copy(pkt[16:20], []byte{10, 0, 0, 2}) // dst
// TCP header. binary.BigEndian.PutUint16(pkt[20:22], sport)
binary.BigEndian.PutUint16(pkt[20:22], 1000) // sport binary.BigEndian.PutUint16(pkt[22:24], dport)
binary.BigEndian.PutUint16(pkt[22:24], 2000) // dport
binary.BigEndian.PutUint32(pkt[24:28], seq) binary.BigEndian.PutUint32(pkt[24:28], seq)
binary.BigEndian.PutUint32(pkt[28:32], 12345) // ack binary.BigEndian.PutUint32(pkt[28:32], 12345)
pkt[32] = 0x50 // data offset = 5 << 4 pkt[32] = 0x50
pkt[33] = flags pkt[33] = flags
binary.BigEndian.PutUint16(pkt[34:36], 0xffff) // window binary.BigEndian.PutUint16(pkt[34:36], 0xffff)
// tcp csum zero
// urgent zero
copy(pkt[40:], payload) copy(pkt[40:], payload)
return pkt return pkt
@@ -87,6 +119,13 @@ func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) {
if err := c.Add(pkt); err != nil { if err := c.Add(pkt); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// No sync write — passthrough is deferred to Flush.
if len(w.writes) != 0 || len(w.gsoWrites) != 0 {
t.Fatalf("no Add-time writes: got writes=%d gso=%d", len(w.writes), len(w.gsoWrites))
}
if err := c.Flush(); err != nil {
t.Fatal(err)
}
if len(w.writes) != 1 || len(w.gsoWrites) != 0 { if len(w.writes) != 1 || len(w.gsoWrites) != 0 {
t.Fatalf("want single plain write, got writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) t.Fatalf("want single plain write, got writes=%d gso=%d", len(w.writes), len(w.gsoWrites))
} }
@@ -95,7 +134,6 @@ func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) {
func TestCoalescerNonTCPPassthrough(t *testing.T) { func TestCoalescerNonTCPPassthrough(t *testing.T) {
w := &fakeTunWriter{gsoEnabled: true} w := &fakeTunWriter{gsoEnabled: true}
c := newTCPCoalescer(w) c := newTCPCoalescer(w)
// ICMP packet: proto=1.
pkt := make([]byte, 28) pkt := make([]byte, 28)
pkt[0] = 0x45 pkt[0] = 0x45
binary.BigEndian.PutUint16(pkt[2:4], 28) binary.BigEndian.PutUint16(pkt[2:4], 28)
@@ -105,6 +143,9 @@ func TestCoalescerNonTCPPassthrough(t *testing.T) {
if err := c.Add(pkt); err != nil { if err := c.Add(pkt); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Flush(); err != nil {
t.Fatal(err)
}
if len(w.writes) != 1 || len(w.gsoWrites) != 0 { if len(w.writes) != 1 || len(w.gsoWrites) != 0 {
t.Fatalf("ICMP should pass through unchanged") t.Fatalf("ICMP should pass through unchanged")
} }
@@ -117,17 +158,24 @@ func TestCoalescerSeedThenFlushAlone(t *testing.T) {
if err := c.Add(pkt); err != nil { if err := c.Add(pkt); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// No flush yet — still pending.
if len(w.writes) != 0 || len(w.gsoWrites) != 0 { if len(w.writes) != 0 || len(w.gsoWrites) != 0 {
t.Fatalf("unexpected output before flush") t.Fatalf("unexpected output before flush")
} }
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Single segment — should use plain write, not gso. // Single-segment flush now goes through WriteGSO with GSO_NONE
if len(w.writes) != 1 || len(w.gsoWrites) != 0 { // (virtio NEEDS_CSUM lets the kernel fill in the L4 csum).
if len(w.gsoWrites) != 1 || len(w.writes) != 0 {
t.Fatalf("single-seg flush: writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) t.Fatalf("single-seg flush: writes=%d gso=%d", len(w.writes), len(w.gsoWrites))
} }
g := w.gsoWrites[0]
if g.total() != 40+1000 {
t.Errorf("super total=%d want %d", g.total(), 40+1000)
}
if g.payLen() != 1000 {
t.Errorf("payLen=%d want 1000", g.payLen())
}
} }
func TestCoalescerCoalescesAdjacentACKs(t *testing.T) { func TestCoalescerCoalescesAdjacentACKs(t *testing.T) {
@@ -153,18 +201,20 @@ func TestCoalescerCoalescesAdjacentACKs(t *testing.T) {
if g.gsoSize != 1200 { if g.gsoSize != 1200 {
t.Errorf("gsoSize=%d want 1200", g.gsoSize) t.Errorf("gsoSize=%d want 1200", g.gsoSize)
} }
if g.hdrLen != 40 { if len(g.hdr) != 40 {
t.Errorf("hdrLen=%d want 40", g.hdrLen) t.Errorf("hdrLen=%d want 40", len(g.hdr))
} }
if g.csumStart != 20 { if g.csumStart != 20 {
t.Errorf("csumStart=%d want 20", g.csumStart) t.Errorf("csumStart=%d want 20", g.csumStart)
} }
if len(g.pkt) != 40+3*1200 { if len(g.pays) != 3 {
t.Errorf("superpacket len=%d want %d", len(g.pkt), 40+3*1200) t.Errorf("pay count=%d want 3", len(g.pays))
} }
// IP total length should reflect superpacket. if g.total() != 40+3*1200 {
if tot := binary.BigEndian.Uint16(g.pkt[2:4]); int(tot) != len(g.pkt) { t.Errorf("superpacket len=%d want %d", g.total(), 40+3*1200)
t.Errorf("ip total_length=%d want %d", tot, len(g.pkt)) }
if tot := binary.BigEndian.Uint16(g.hdr[2:4]); int(tot) != g.total() {
t.Errorf("ip total_length=%d want %d", tot, g.total())
} }
} }
@@ -175,17 +225,15 @@ func TestCoalescerRejectsSeqGap(t *testing.T) {
if err := c.Add(buildTCPv4(1000, tcpAck, pay)); err != nil { if err := c.Add(buildTCPv4(1000, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// seq should be 2200; use 3000 to simulate a gap.
if err := c.Add(buildTCPv4(3000, tcpAck, pay)); err != nil { if err := c.Add(buildTCPv4(3000, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// First packet should have been flushed as a plain write (single seg), // Each packet flushes as its own single-segment WriteGSO now.
// then second packet seeded and flushed likewise. if len(w.gsoWrites) != 2 || len(w.writes) != 0 {
if len(w.writes) != 2 || len(w.gsoWrites) != 0 { t.Fatalf("seq gap: want 2 gso writes got writes=%d gso=%d", len(w.writes), len(w.gsoWrites))
t.Fatalf("seq gap: want 2 plain writes got writes=%d gso=%d", len(w.writes), len(w.gsoWrites))
} }
} }
@@ -196,7 +244,8 @@ func TestCoalescerRejectsFlagMismatch(t *testing.T) {
if err := c.Add(buildTCPv4(1000, tcpAck, pay)); err != nil { if err := c.Add(buildTCPv4(1000, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// SYN flag — not admissible at all. Should flush first packet + plain-write second. // SYN|ACK is non-admissible. Must flush matching flow's slot (gso)
// and then plain-write the SYN packet itself.
syn := buildTCPv4(2200, tcpSyn|tcpAck, pay) syn := buildTCPv4(2200, tcpSyn|tcpAck, pay)
if err := c.Add(syn); err != nil { if err := c.Add(syn); err != nil {
t.Fatal(err) t.Fatal(err)
@@ -204,8 +253,8 @@ func TestCoalescerRejectsFlagMismatch(t *testing.T) {
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if len(w.writes) != 2 || len(w.gsoWrites) != 0 { if len(w.writes) != 1 || len(w.gsoWrites) != 1 {
t.Fatalf("flag mismatch: want 2 plain writes got writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) t.Fatalf("flag mismatch: want 1 plain + 1 gso, got writes=%d gso=%d", len(w.writes), len(w.gsoWrites))
} }
} }
@@ -219,6 +268,7 @@ func TestCoalescerRejectsFIN(t *testing.T) {
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// FIN isn't admissible — passthrough as plain, no slot, no gso.
if len(w.writes) != 1 || len(w.gsoWrites) != 0 { if len(w.writes) != 1 || len(w.gsoWrites) != 0 {
t.Fatalf("FIN should be passthrough, got writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) t.Fatalf("FIN should be passthrough, got writes=%d gso=%d", len(w.writes), len(w.gsoWrites))
} }
@@ -235,26 +285,26 @@ func TestCoalescerShortLastSegmentClosesChain(t *testing.T) {
if err := c.Add(buildTCPv4(2200, tcpAck, half)); err != nil { if err := c.Add(buildTCPv4(2200, tcpAck, half)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Next full-size would have to start at 2700 but chain is closed — // Chain now closed; next packet seeds a new slot on the same flow
// should flush + seed. // after flushing the old one.
if err := c.Add(buildTCPv4(2700, tcpAck, full)); err != nil { if err := c.Add(buildTCPv4(2700, tcpAck, full)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Expect: one gso write (first two coalesced) + one plain write (the // Expect two gso writes: the first two packets coalesced, then the
// third, flushed alone). // third flushed alone (single-seg via GSO_NONE).
if len(w.gsoWrites) != 1 { if len(w.gsoWrites) != 2 {
t.Fatalf("want 1 gso write got %d", len(w.gsoWrites)) t.Fatalf("want 2 gso writes got %d", len(w.gsoWrites))
} }
if len(w.writes) != 1 { if len(w.writes) != 0 {
t.Fatalf("want 1 plain write got %d", len(w.writes)) t.Fatalf("want 0 plain writes got %d", len(w.writes))
} }
if w.gsoWrites[0].gsoSize != 1200 { if w.gsoWrites[0].gsoSize != 1200 {
t.Errorf("gsoSize=%d want 1200", w.gsoWrites[0].gsoSize) t.Errorf("gsoSize=%d want 1200", w.gsoWrites[0].gsoSize)
} }
if got, want := len(w.gsoWrites[0].pkt), 40+1200+500; got != want { if got, want := w.gsoWrites[0].total(), 40+1200+500; got != want {
t.Errorf("super len=%d want %d", got, want) t.Errorf("super len=%d want %d", got, want)
} }
} }
@@ -266,22 +316,21 @@ func TestCoalescerPSHFinalizesChain(t *testing.T) {
if err := c.Add(buildTCPv4(1000, tcpAck, pay)); err != nil { if err := c.Add(buildTCPv4(1000, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Last full-size segment with PSH — admitted but chain is now closed.
if err := c.Add(buildTCPv4(2200, tcpAckPsh, pay)); err != nil { if err := c.Add(buildTCPv4(2200, tcpAckPsh, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Further full-size would not coalesce.
if err := c.Add(buildTCPv4(3400, tcpAck, pay)); err != nil { if err := c.Add(buildTCPv4(3400, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if len(w.gsoWrites) != 1 { // First two coalesce; the third seeds a fresh slot that flushes alone.
t.Fatalf("want 1 gso write got %d", len(w.gsoWrites)) if len(w.gsoWrites) != 2 {
t.Fatalf("want 2 gso writes got %d", len(w.gsoWrites))
} }
if len(w.writes) != 1 { if len(w.writes) != 0 {
t.Fatalf("want 1 plain write got %d", len(w.writes)) t.Fatalf("want 0 plain writes got %d", len(w.writes))
} }
} }
@@ -291,7 +340,6 @@ func TestCoalescerRejectsDifferentFlow(t *testing.T) {
pay := make([]byte, 1200) pay := make([]byte, 1200)
p1 := buildTCPv4(1000, tcpAck, pay) p1 := buildTCPv4(1000, tcpAck, pay)
p2 := buildTCPv4(2200, tcpAck, pay) p2 := buildTCPv4(2200, tcpAck, pay)
// Mutate p2's source port to break flow match.
binary.BigEndian.PutUint16(p2[20:22], 9999) binary.BigEndian.PutUint16(p2[20:22], 9999)
if err := c.Add(p1); err != nil { if err := c.Add(p1); err != nil {
t.Fatal(err) t.Fatal(err)
@@ -302,9 +350,9 @@ func TestCoalescerRejectsDifferentFlow(t *testing.T) {
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Both flushed as plain writes. // Two independent flows, each flushes its own single-segment WriteGSO.
if len(w.writes) != 2 || len(w.gsoWrites) != 0 { if len(w.gsoWrites) != 2 || len(w.writes) != 0 {
t.Fatalf("diff flow: writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) t.Fatalf("diff flow: want 2 gso writes got writes=%d gso=%d", len(w.writes), len(w.gsoWrites))
} }
} }
@@ -322,6 +370,7 @@ func TestCoalescerRejectsIPOptions(t *testing.T) {
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Non-admissible parse → passthrough as plain.
if len(w.writes) != 1 || len(w.gsoWrites) != 0 { if len(w.writes) != 1 || len(w.gsoWrites) != 0 {
t.Fatalf("IP options should passthrough, got writes=%d gso=%d", len(w.writes), len(w.gsoWrites)) t.Fatalf("IP options should passthrough, got writes=%d gso=%d", len(w.writes), len(w.gsoWrites))
} }
@@ -330,7 +379,7 @@ func TestCoalescerRejectsIPOptions(t *testing.T) {
func TestCoalescerCapBySegments(t *testing.T) { func TestCoalescerCapBySegments(t *testing.T) {
w := &fakeTunWriter{gsoEnabled: true} w := &fakeTunWriter{gsoEnabled: true}
c := newTCPCoalescer(w) c := newTCPCoalescer(w)
pay := make([]byte, 512) // small so we can fit many before byte cap pay := make([]byte, 512)
seq := uint32(1000) seq := uint32(1000)
for i := 0; i < tcpCoalesceMaxSegs+5; i++ { for i := 0; i < tcpCoalesceMaxSegs+5; i++ {
if err := c.Add(buildTCPv4(seq, tcpAck, pay)); err != nil { if err := c.Add(buildTCPv4(seq, tcpAck, pay)); err != nil {
@@ -341,16 +390,187 @@ func TestCoalescerCapBySegments(t *testing.T) {
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// We expect the first tcpCoalesceMaxSegs to form one gso, then 5 more:
// The 5 follow-ons seed a new super that completes as another gso if >=2,
// or a mix. Just assert we never exceed the cap per super.
for _, g := range w.gsoWrites { for _, g := range w.gsoWrites {
segs := (len(g.pkt) - int(g.hdrLen)) / int(g.gsoSize) segs := len(g.pays)
if rem := (len(g.pkt) - int(g.hdrLen)) % int(g.gsoSize); rem != 0 {
segs++
}
if segs > tcpCoalesceMaxSegs { if segs > tcpCoalesceMaxSegs {
t.Fatalf("super exceeded seg cap: %d > %d", segs, tcpCoalesceMaxSegs) t.Fatalf("super exceeded seg cap: %d > %d", segs, tcpCoalesceMaxSegs)
} }
} }
} }
// TestCoalescerMultipleFlowsInSameBatch proves two interleaved bulk TCP
// flows coalesce independently in a single Flush.
func TestCoalescerMultipleFlowsInSameBatch(t *testing.T) {
w := &fakeTunWriter{gsoEnabled: true}
c := newTCPCoalescer(w)
pay := make([]byte, 1200)
// Flow A: sport 1000. Flow B: sport 3000.
if err := c.Add(buildTCPv4Ports(1000, 2000, 100, tcpAck, pay)); err != nil {
t.Fatal(err)
}
if err := c.Add(buildTCPv4Ports(3000, 2000, 500, tcpAck, pay)); err != nil {
t.Fatal(err)
}
if err := c.Add(buildTCPv4Ports(1000, 2000, 1300, tcpAck, pay)); err != nil {
t.Fatal(err)
}
if err := c.Add(buildTCPv4Ports(3000, 2000, 1700, tcpAck, pay)); err != nil {
t.Fatal(err)
}
if err := c.Add(buildTCPv4Ports(1000, 2000, 2500, tcpAck, pay)); err != nil {
t.Fatal(err)
}
if err := c.Add(buildTCPv4Ports(3000, 2000, 2900, tcpAck, pay)); err != nil {
t.Fatal(err)
}
if err := c.Flush(); err != nil {
t.Fatal(err)
}
if len(w.gsoWrites) != 2 {
t.Fatalf("want 2 gso writes (one per flow), got %d", len(w.gsoWrites))
}
if len(w.writes) != 0 {
t.Fatalf("want no plain writes, got %d", len(w.writes))
}
// Each superpacket should carry 3 segments.
for i, g := range w.gsoWrites {
if len(g.pays) != 3 {
t.Errorf("gso[%d]: segs=%d want 3", i, len(g.pays))
}
if g.gsoSize != 1200 {
t.Errorf("gso[%d]: gsoSize=%d want 1200", i, g.gsoSize)
}
}
// Verify each superpacket carries the source port it was seeded with.
seenSports := map[uint16]bool{}
for _, g := range w.gsoWrites {
sp := binary.BigEndian.Uint16(g.hdr[20:22])
seenSports[sp] = true
}
if !seenSports[1000] || !seenSports[3000] {
t.Errorf("expected superpackets for sports 1000 and 3000, got %v", seenSports)
}
}
// TestCoalescerPreservesArrivalOrder confirms that with passthrough and
// coalesced events both queued, Flush emits them in Add order rather than
// writing passthrough packets synchronously.
func TestCoalescerPreservesArrivalOrder(t *testing.T) {
w := &orderedFakeWriter{gsoEnabled: true}
c := newTCPCoalescer(w)
// Sequence: coalesceable TCP, ICMP (passthrough), coalesceable TCP on
// a different flow. Expected emit order: gso(X), plain(ICMP), gso(Y).
pay := make([]byte, 1200)
if err := c.Add(buildTCPv4Ports(1000, 2000, 100, tcpAck, pay)); err != nil {
t.Fatal(err)
}
icmp := make([]byte, 28)
icmp[0] = 0x45
binary.BigEndian.PutUint16(icmp[2:4], 28)
icmp[9] = 1
copy(icmp[12:16], []byte{10, 0, 0, 1})
copy(icmp[16:20], []byte{10, 0, 0, 3})
if err := c.Add(icmp); err != nil {
t.Fatal(err)
}
if err := c.Add(buildTCPv4Ports(3000, 2000, 500, tcpAck, pay)); err != nil {
t.Fatal(err)
}
// Nothing should have hit the writer synchronously.
if len(w.events) != 0 {
t.Fatalf("Add emitted events synchronously: %v", w.events)
}
if err := c.Flush(); err != nil {
t.Fatal(err)
}
if got, want := w.events, []string{"gso", "plain", "gso"}; !stringSliceEq(got, want) {
t.Fatalf("flush order=%v want %v", got, want)
}
}
// orderedFakeWriter records only the sequence of call types so tests can
// assert arrival order without inspecting bytes.
type orderedFakeWriter struct {
gsoEnabled bool
events []string
}
func (w *orderedFakeWriter) Write(p []byte) (int, error) {
w.events = append(w.events, "plain")
return len(p), nil
}
func (w *orderedFakeWriter) WriteGSO(hdr []byte, pays [][]byte, gsoSize uint16, isV6 bool, csumStart uint16) error {
w.events = append(w.events, "gso")
return nil
}
func (w *orderedFakeWriter) GSOSupported() bool { return w.gsoEnabled }
func stringSliceEq(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
// TestCoalescerInterleavedFlowsPreserveOrdering checks that a non-admissible
// packet (SYN) mid-flow only flushes its own flow, not others.
func TestCoalescerInterleavedFlowsPreserveOrdering(t *testing.T) {
w := &fakeTunWriter{gsoEnabled: true}
c := newTCPCoalescer(w)
pay := make([]byte, 1200)
// Flow A two segments.
if err := c.Add(buildTCPv4Ports(1000, 2000, 100, tcpAck, pay)); err != nil {
t.Fatal(err)
}
if err := c.Add(buildTCPv4Ports(1000, 2000, 1300, tcpAck, pay)); err != nil {
t.Fatal(err)
}
// Flow B two segments.
if err := c.Add(buildTCPv4Ports(3000, 2000, 500, tcpAck, pay)); err != nil {
t.Fatal(err)
}
if err := c.Add(buildTCPv4Ports(3000, 2000, 1700, tcpAck, pay)); err != nil {
t.Fatal(err)
}
// Flow A SYN (non-admissible) — must flush only flow A's slot.
syn := buildTCPv4Ports(1000, 2000, 9999, tcpSyn|tcpAck, pay)
if err := c.Add(syn); err != nil {
t.Fatal(err)
}
// Flow B continues — should still be coalesced with its seed.
if err := c.Add(buildTCPv4Ports(3000, 2000, 2900, tcpAck, pay)); err != nil {
t.Fatal(err)
}
if err := c.Flush(); err != nil {
t.Fatal(err)
}
// Expected:
// - 1 gso for flow A (first 2 segments)
// - 1 plain for flow A SYN
// - 1 gso for flow B (3 segments)
if len(w.gsoWrites) != 2 {
t.Fatalf("want 2 gso writes, got %d", len(w.gsoWrites))
}
if len(w.writes) != 1 {
t.Fatalf("want 1 plain write (SYN), got %d", len(w.writes))
}
// Find the 3-segment gso (flow B) and the 2-segment gso (flow A).
var segCounts []int
for _, g := range w.gsoWrites {
segCounts = append(segCounts, len(g.pays))
}
if !(segCounts[0] == 2 && segCounts[1] == 3) && !(segCounts[0] == 3 && segCounts[1] == 2) {
t.Errorf("unexpected segment counts: %v (want 2 and 3)", segCounts)
}
}