drain reads before batching

This commit is contained in:
JackDoan
2026-04-17 12:56:20 -05:00
parent 4a2134775d
commit a13afb2cf8
2 changed files with 77 additions and 12 deletions

View File

@@ -43,6 +43,7 @@ type tunFile struct {
vnetHdr bool
readBuf []byte // scratch for a single raw read (virtio hdr + superpacket)
segBuf []byte // backing store for segmented output
segOff int // cursor into segBuf for the current ReadBatch drain
pending [][]byte // segments waiting to be drained by Read
pendingIdx int
writeIovs [2]unix.Iovec // preallocated iovecs for vnetHdr writes; iovs[0] is fixed to zeroVnetHdr
@@ -73,7 +74,7 @@ func (r *tunFile) newFriend(fd int) (*tunFile, error) {
},
}
if r.vnetHdr {
out.segBuf = make([]byte, tunSegBufSize)
out.segBuf = make([]byte, tunSegBufCap)
out.writeIovs[0].Base = &zeroVnetHdr[0]
out.writeIovs[0].SetLen(virtioNetHdrLen)
}
@@ -106,7 +107,7 @@ func newTunFd(fd int, vnetHdr bool) (*tunFile, error) {
},
}
if vnetHdr {
out.segBuf = make([]byte, tunSegBufSize)
out.segBuf = make([]byte, tunSegBufCap)
out.writeIovs[0].Base = &zeroVnetHdr[0]
out.writeIovs[0].SetLen(virtioNetHdrLen)
}
@@ -181,13 +182,20 @@ func (r *tunFile) readRaw(buf []byte) (int, error) {
}
}
// ReadBatch reads one superpacket from the tun and returns the resulting
// packets. Slices point into the tunFile's internal buffers and are only
// valid until the next ReadBatch / Read / Close on this Queue.
// ReadBatch reads one or more superpackets from the tun and returns the
// resulting packets. The first read blocks via poll; once the fd is known
// readable we drain additional packets non-blocking until the kernel queue
// is empty (EAGAIN), we've collected tunDrainCap packets, or we're out of
// segBuf headroom. This amortizes the poll wake over bursts of small
// packets (e.g. TCP ACKs). Slices point into the tunFile's internal buffers
// and are only valid until the next ReadBatch / Read / Close on this Queue.
func (r *tunFile) ReadBatch() ([][]byte, error) {
r.pending = r.pending[:0]
r.pendingIdx = 0
r.segOff = 0
// Initial (blocking) read. Retry on decode errors so a single bad
// packet does not stall the reader.
for {
n, err := r.readRaw(r.readBuf)
if err != nil {
@@ -195,19 +203,57 @@ func (r *tunFile) ReadBatch() ([][]byte, error) {
}
if !r.vnetHdr {
r.pending = append(r.pending, r.readBuf[:n])
// Non-vnetHdr mode shares one readBuf so we can't drain safely
// without copying; return the single packet as before.
return r.pending, nil
}
if n < virtioNetHdrLen {
return nil, fmt.Errorf("short tun read: %d < %d", n, virtioNetHdrLen)
}
var hdr virtioNetHdr
hdr.decode(r.readBuf[:virtioNetHdrLen])
if err := segmentInto(r.readBuf[virtioNetHdrLen:n], hdr, &r.pending, r.segBuf); err != nil {
if err := r.decodeRead(n); err != nil {
// Drop and read again — a bad packet should not kill the reader.
continue
}
return r.pending, nil
break
}
// Drain: non-blocking reads until the kernel queue is empty, the drain
// cap is reached, or segBuf no longer has room for another worst-case
// superpacket.
for len(r.pending) < tunDrainCap && tunSegBufCap-r.segOff >= tunSegBufSize {
n, err := unix.Read(r.fd, r.readBuf)
if err != nil {
// EAGAIN / EINTR / anything else: stop draining. We already
// have a valid batch from the first read.
break
}
if n <= 0 {
break
}
if err := r.decodeRead(n); err != nil {
// Drop this packet and stop the drain; we'd rather hand off
// what we have than keep spinning here.
break
}
}
return r.pending, nil
}
// decodeRead decodes the virtio header plus payload in r.readBuf[:n], appends
// the segments to r.pending, and advances r.segOff by the total scratch used.
// Caller must have already ensured r.vnetHdr is true.
func (r *tunFile) decodeRead(n int) error {
if n < virtioNetHdrLen {
return fmt.Errorf("short tun read: %d < %d", n, virtioNetHdrLen)
}
var hdr virtioNetHdr
hdr.decode(r.readBuf[:virtioNetHdrLen])
before := len(r.pending)
if err := segmentInto(r.readBuf[virtioNetHdrLen:n], hdr, &r.pending, r.segBuf[r.segOff:]); err != nil {
return err
}
for k := before; k < len(r.pending); k++ {
r.segOff += len(r.pending[k])
}
return nil
}
// Read drains segments produced by the last ReadBatch one at a time; when the