From a13afb2cf82a389868ac10a9915f58a80bda76db Mon Sep 17 00:00:00 2001 From: JackDoan Date: Fri, 17 Apr 2026 12:56:20 -0500 Subject: [PATCH] drain reads before batching --- overlay/tun_linux.go | 70 +++++++++++++++++++++++++++++------- overlay/tun_linux_offload.go | 19 ++++++++++ 2 files changed, 77 insertions(+), 12 deletions(-) diff --git a/overlay/tun_linux.go b/overlay/tun_linux.go index 16de1978..023d4797 100644 --- a/overlay/tun_linux.go +++ b/overlay/tun_linux.go @@ -43,6 +43,7 @@ type tunFile struct { vnetHdr bool readBuf []byte // scratch for a single raw read (virtio hdr + superpacket) segBuf []byte // backing store for segmented output + segOff int // cursor into segBuf for the current ReadBatch drain pending [][]byte // segments waiting to be drained by Read pendingIdx int writeIovs [2]unix.Iovec // preallocated iovecs for vnetHdr writes; iovs[0] is fixed to zeroVnetHdr @@ -73,7 +74,7 @@ func (r *tunFile) newFriend(fd int) (*tunFile, error) { }, } if r.vnetHdr { - out.segBuf = make([]byte, tunSegBufSize) + out.segBuf = make([]byte, tunSegBufCap) out.writeIovs[0].Base = &zeroVnetHdr[0] out.writeIovs[0].SetLen(virtioNetHdrLen) } @@ -106,7 +107,7 @@ func newTunFd(fd int, vnetHdr bool) (*tunFile, error) { }, } if vnetHdr { - out.segBuf = make([]byte, tunSegBufSize) + out.segBuf = make([]byte, tunSegBufCap) out.writeIovs[0].Base = &zeroVnetHdr[0] out.writeIovs[0].SetLen(virtioNetHdrLen) } @@ -181,13 +182,20 @@ func (r *tunFile) readRaw(buf []byte) (int, error) { } } -// ReadBatch reads one superpacket from the tun and returns the resulting -// packets. Slices point into the tunFile's internal buffers and are only -// valid until the next ReadBatch / Read / Close on this Queue. +// ReadBatch reads one or more superpackets from the tun and returns the +// resulting packets. The first read blocks via poll; once the fd is known +// readable we drain additional packets non-blocking until the kernel queue +// is empty (EAGAIN), we've collected tunDrainCap packets, or we're out of +// segBuf headroom. This amortizes the poll wake over bursts of small +// packets (e.g. TCP ACKs). Slices point into the tunFile's internal buffers +// and are only valid until the next ReadBatch / Read / Close on this Queue. func (r *tunFile) ReadBatch() ([][]byte, error) { r.pending = r.pending[:0] r.pendingIdx = 0 + r.segOff = 0 + // Initial (blocking) read. Retry on decode errors so a single bad + // packet does not stall the reader. for { n, err := r.readRaw(r.readBuf) if err != nil { @@ -195,19 +203,57 @@ func (r *tunFile) ReadBatch() ([][]byte, error) { } if !r.vnetHdr { r.pending = append(r.pending, r.readBuf[:n]) + // Non-vnetHdr mode shares one readBuf so we can't drain safely + // without copying; return the single packet as before. return r.pending, nil } - if n < virtioNetHdrLen { - return nil, fmt.Errorf("short tun read: %d < %d", n, virtioNetHdrLen) - } - var hdr virtioNetHdr - hdr.decode(r.readBuf[:virtioNetHdrLen]) - if err := segmentInto(r.readBuf[virtioNetHdrLen:n], hdr, &r.pending, r.segBuf); err != nil { + if err := r.decodeRead(n); err != nil { // Drop and read again — a bad packet should not kill the reader. continue } - return r.pending, nil + break } + + // Drain: non-blocking reads until the kernel queue is empty, the drain + // cap is reached, or segBuf no longer has room for another worst-case + // superpacket. + for len(r.pending) < tunDrainCap && tunSegBufCap-r.segOff >= tunSegBufSize { + n, err := unix.Read(r.fd, r.readBuf) + if err != nil { + // EAGAIN / EINTR / anything else: stop draining. We already + // have a valid batch from the first read. + break + } + if n <= 0 { + break + } + if err := r.decodeRead(n); err != nil { + // Drop this packet and stop the drain; we'd rather hand off + // what we have than keep spinning here. + break + } + } + + return r.pending, nil +} + +// decodeRead decodes the virtio header plus payload in r.readBuf[:n], appends +// the segments to r.pending, and advances r.segOff by the total scratch used. +// Caller must have already ensured r.vnetHdr is true. +func (r *tunFile) decodeRead(n int) error { + if n < virtioNetHdrLen { + return fmt.Errorf("short tun read: %d < %d", n, virtioNetHdrLen) + } + var hdr virtioNetHdr + hdr.decode(r.readBuf[:virtioNetHdrLen]) + before := len(r.pending) + if err := segmentInto(r.readBuf[virtioNetHdrLen:n], hdr, &r.pending, r.segBuf[r.segOff:]); err != nil { + return err + } + for k := before; k < len(r.pending); k++ { + r.segOff += len(r.pending[k]) + } + return nil } // Read drains segments produced by the last ReadBatch one at a time; when the diff --git a/overlay/tun_linux_offload.go b/overlay/tun_linux_offload.go index 02121709..f4ea672a 100644 --- a/overlay/tun_linux_offload.go +++ b/overlay/tun_linux_offload.go @@ -23,6 +23,17 @@ const tunReadBufSize = 65535 // an IP+TCP header. 128KiB comfortably covers the 64KiB payload ceiling. const tunSegBufSize = 131072 +// tunSegBufCap is the total size we allocate for the per-reader segment +// buffer. It is sized as one worst-case TSO superpacket (tunSegBufSize) plus +// the same again as drain headroom so a ReadBatch wake can accumulate +// additional packets after an initial big read without overflowing. +const tunSegBufCap = tunSegBufSize * 2 + +// tunDrainCap caps how many packets a single ReadBatch will accumulate via +// the post-wake drain loop. Sized to soak up a burst of small ACKs while +// bounding how much work a single caller holds before handing off. +const tunDrainCap = 64 + type virtioNetHdr struct { Flags uint8 GSOType uint8 @@ -47,6 +58,14 @@ func (h *virtioNetHdr) decode(b []byte) { // IP packets, each appended to *out as a slice of scratch. scratch must be // sized to hold every segment (including replicated headers). func segmentInto(pkt []byte, hdr virtioNetHdr, out *[][]byte, scratch []byte) error { + // When RSC_INFO is set the csum_start/csum_offset fields are repurposed to + // carry coalescing info rather than checksum offsets. A TUN writing via + // IFF_VNET_HDR should never emit this, but if it did we would silently + // miscompute the segment checksums — refuse the packet instead. + if hdr.Flags&unix.VIRTIO_NET_HDR_F_RSC_INFO != 0 { + return fmt.Errorf("virtio RSC_INFO flag not supported on TUN reads") + } + switch hdr.GSOType { case unix.VIRTIO_NET_HDR_GSO_NONE: if len(pkt) > len(scratch) {