ReadBatch is named Read now

This commit is contained in:
JackDoan
2026-04-20 12:50:43 -05:00
parent f4907b6634
commit f8b09a295d
15 changed files with 102 additions and 138 deletions

View File

@@ -350,7 +350,7 @@ func (f *Interface) listenIn(reader overlay.Queue, i int) {
conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
for {
pkts, err := reader.ReadBatch()
pkts, err := reader.Read()
if err != nil {
if !f.closed.Load() {
f.l.WithError(err).WithField("reader", i).Error("Error while reading outbound packet, closing")

View File

@@ -7,18 +7,27 @@ import (
"github.com/slackhq/nebula/routing"
)
// defaultBatchBufSize is the per-Queue scratch size for ReadBatch on backends
// defaultBatchBufSize is the per-Queue scratch size for Read on backends
// that don't do TSO segmentation. 65535 covers any single IP packet.
const defaultBatchBufSize = 65535
// Queue is a readable/writable tun queue. ReadBatch returns one or more
// packets; the returned slices are borrowed from the queue's internal buffer
// and are only valid until the next ReadBatch / Read / Close on this Queue.
// Callers must encrypt or copy each slice before the next call. Not safe for
// concurrent use — one goroutine per Queue.
// Queue is a readable/writable tun queue. One Queue is driven by a single
// read goroutine plus concurrent writers (see Write / WriteReject below).
type Queue interface {
io.ReadWriteCloser
ReadBatch() ([][]byte, error)
io.Closer
// Read returns one or more packets. The returned slices are borrowed
// from the Queue's internal buffer and are only valid until the next
// Read or Close on this Queue — callers must encrypt or copy each
// slice before the next call. Not safe for concurrent Reads; exactly
// one goroutine per Queue reads.
Read() ([][]byte, error)
// Write emits a single packet on the plaintext (outside→inside)
// delivery path. May run concurrently with WriteReject on the same
// Queue, but not with itself.
Write(p []byte) (int, error)
// WriteReject writes a single packet that originated from the inside
// path (reject replies or self-forward) using scratch state distinct
// from Write, so it can run concurrently with Write on the same Queue

View File

@@ -25,11 +25,7 @@ func (NoopTun) Name() string {
return "noop"
}
func (NoopTun) Read([]byte) (int, error) {
return 0, nil
}
func (NoopTun) ReadBatch() ([][]byte, error) {
func (NoopTun) Read() ([][]byte, error) {
return nil, nil
}

View File

@@ -18,7 +18,7 @@ import (
)
type tun struct {
io.ReadWriteCloser
rwc io.ReadWriteCloser
fd int
vpnNetworks []netip.Prefix
Routes atomic.Pointer[[]Route]
@@ -29,11 +29,11 @@ type tun struct {
batchRet [1][]byte
}
func (t *tun) ReadBatch() ([][]byte, error) {
func (t *tun) Read() ([][]byte, error) {
if t.readBuf == nil {
t.readBuf = make([]byte, defaultBatchBufSize)
}
n, err := t.Read(t.readBuf)
n, err := t.rwc.Read(t.readBuf)
if err != nil {
return nil, err
}
@@ -41,8 +41,16 @@ func (t *tun) ReadBatch() ([][]byte, error) {
return t.batchRet[:], nil
}
func (t *tun) Write(p []byte) (int, error) {
return t.rwc.Write(p)
}
func (t *tun) WriteReject(p []byte) (int, error) {
return t.Write(p)
return t.rwc.Write(p)
}
func (t *tun) Close() error {
return t.rwc.Close()
}
func newTunFromFd(c *config.C, l *logrus.Logger, deviceFd int, vpnNetworks []netip.Prefix) (*tun, error) {
@@ -51,10 +59,10 @@ func newTunFromFd(c *config.C, l *logrus.Logger, deviceFd int, vpnNetworks []net
file := os.NewFile(uintptr(deviceFd), "/dev/net/tun")
t := &tun{
ReadWriteCloser: file,
fd: deviceFd,
vpnNetworks: vpnNetworks,
l: l,
rwc: file,
fd: deviceFd,
vpnNetworks: vpnNetworks,
l: l,
}
err := t.reload(c, true)

View File

@@ -23,7 +23,7 @@ import (
)
type tun struct {
io.ReadWriteCloser
rwc io.ReadWriteCloser
Device string
vpnNetworks []netip.Prefix
DefaultMTU int
@@ -127,11 +127,11 @@ func newTun(c *config.C, l *logrus.Logger, vpnNetworks []netip.Prefix, _ bool) (
}
t := &tun{
ReadWriteCloser: os.NewFile(uintptr(fd), ""),
Device: name,
vpnNetworks: vpnNetworks,
DefaultMTU: c.GetInt("tun.mtu", DefaultMTU),
l: l,
rwc: os.NewFile(uintptr(fd), ""),
Device: name,
vpnNetworks: vpnNetworks,
DefaultMTU: c.GetInt("tun.mtu", DefaultMTU),
l: l,
}
err = t.reload(c, true)
@@ -161,8 +161,8 @@ func newTunFromFd(_ *config.C, _ *logrus.Logger, _ int, _ []netip.Prefix) (*tun,
}
func (t *tun) Close() error {
if t.ReadWriteCloser != nil {
return t.ReadWriteCloser.Close()
if t.rwc != nil {
return t.rwc.Close()
}
return nil
}
@@ -506,20 +506,20 @@ func delRoute(prefix netip.Prefix, gateway netroute.Addr) error {
return nil
}
func (t *tun) Read(to []byte) (int, error) {
func (t *tun) readOne(to []byte) (int, error) {
buf := make([]byte, len(to)+4)
n, err := t.ReadWriteCloser.Read(buf)
n, err := t.rwc.Read(buf)
copy(to, buf[4:])
return n - 4, err
}
func (t *tun) ReadBatch() ([][]byte, error) {
func (t *tun) Read() ([][]byte, error) {
if t.readBuf == nil {
t.readBuf = make([]byte, defaultBatchBufSize)
}
n, err := t.Read(t.readBuf)
n, err := t.readOne(t.readBuf)
if err != nil {
return nil, err
}
@@ -556,7 +556,7 @@ func (t *tun) Write(from []byte) (int, error) {
copy(buf[4:], from)
n, err := t.ReadWriteCloser.Write(buf)
n, err := t.rwc.Write(buf)
return n - 4, err
}

View File

@@ -21,19 +21,21 @@ type disabledTun struct {
rx metrics.Counter
l *logrus.Logger
readBuf []byte
batchRet [1][]byte
}
func (t *disabledTun) ReadBatch() ([][]byte, error) {
if t.readBuf == nil {
t.readBuf = make([]byte, defaultBatchBufSize)
func (t *disabledTun) Read() ([][]byte, error) {
r, ok := <-t.read
if !ok {
return nil, io.EOF
}
n, err := t.Read(t.readBuf)
if err != nil {
return nil, err
t.tx.Inc(1)
if t.l.Level >= logrus.DebugLevel {
t.l.WithField("raw", prettyPacket(r)).Debugf("Write payload")
}
t.batchRet[0] = t.readBuf[:n]
t.batchRet[0] = r
return t.batchRet[:], nil
}
@@ -71,24 +73,6 @@ func (*disabledTun) Name() string {
return "disabled"
}
func (t *disabledTun) Read(b []byte) (int, error) {
r, ok := <-t.read
if !ok {
return 0, io.EOF
}
if len(r) > len(b) {
return 0, fmt.Errorf("packet larger than mtu: %d > %d bytes", len(r), len(b))
}
t.tx.Inc(1)
if t.l.Level >= logrus.DebugLevel {
t.l.WithField("raw", prettyPacket(r)).Debugf("Write payload")
}
return copy(b, r), nil
}
func (t *disabledTun) handleICMPEchoRequest(b []byte) bool {
out := make([]byte, len(b))
out = iputil.CreateICMPEchoResponse(b, out)

View File

@@ -98,11 +98,11 @@ type tun struct {
batchRet [1][]byte
}
func (t *tun) ReadBatch() ([][]byte, error) {
func (t *tun) Read() ([][]byte, error) {
if t.readBuf == nil {
t.readBuf = make([]byte, defaultBatchBufSize)
}
n, err := t.Read(t.readBuf)
n, err := t.readOne(t.readBuf)
if err != nil {
return nil, err
}
@@ -114,7 +114,7 @@ func (t *tun) WriteReject(p []byte) (int, error) {
return t.Write(p)
}
func (t *tun) Read(to []byte) (int, error) {
func (t *tun) readOne(to []byte) (int, error) {
// use readv() to read from the tunnel device, to eliminate the need for copying the buffer
if t.devFd < 0 {
return -1, syscall.EINVAL

View File

@@ -21,7 +21,7 @@ import (
)
type tun struct {
io.ReadWriteCloser
rwc io.ReadWriteCloser
vpnNetworks []netip.Prefix
Routes atomic.Pointer[[]Route]
routeTree atomic.Pointer[bart.Table[routing.Gateways]]
@@ -31,11 +31,11 @@ type tun struct {
batchRet [1][]byte
}
func (t *tun) ReadBatch() ([][]byte, error) {
func (t *tun) Read() ([][]byte, error) {
if t.readBuf == nil {
t.readBuf = make([]byte, defaultBatchBufSize)
}
n, err := t.Read(t.readBuf)
n, err := t.rwc.Read(t.readBuf)
if err != nil {
return nil, err
}
@@ -43,8 +43,16 @@ func (t *tun) ReadBatch() ([][]byte, error) {
return t.batchRet[:], nil
}
func (t *tun) Write(p []byte) (int, error) {
return t.rwc.Write(p)
}
func (t *tun) WriteReject(p []byte) (int, error) {
return t.Write(p)
return t.rwc.Write(p)
}
func (t *tun) Close() error {
return t.rwc.Close()
}
func newTun(_ *config.C, _ *logrus.Logger, _ []netip.Prefix, _ bool) (*tun, error) {
@@ -54,9 +62,9 @@ func newTun(_ *config.C, _ *logrus.Logger, _ []netip.Prefix, _ bool) (*tun, erro
func newTunFromFd(c *config.C, l *logrus.Logger, deviceFd int, vpnNetworks []netip.Prefix) (*tun, error) {
file := os.NewFile(uintptr(deviceFd), "/dev/tun")
t := &tun{
vpnNetworks: vpnNetworks,
ReadWriteCloser: &tunReadCloser{f: file},
l: l,
vpnNetworks: vpnNetworks,
rwc: &tunReadCloser{f: file},
l: l,
}
err := t.reload(c, true)

View File

@@ -41,13 +41,12 @@ type tunFile struct {
// kernel successfully accepted TUNSETOFFLOAD. Reads include a leading
// virtio_net_hdr and may carry a TSO superpacket we must segment;
// writes must prepend a zeroed virtio_net_hdr.
vnetHdr bool
readBuf []byte // scratch for a single raw read (virtio hdr + superpacket)
segBuf []byte // backing store for segmented output
segOff int // cursor into segBuf for the current ReadBatch drain
pending [][]byte // segments waiting to be drained by Read
pendingIdx int
writeIovs [2]unix.Iovec // preallocated iovecs for Write (coalescer passthrough); iovs[0] is fixed to validVnetHdr
vnetHdr bool
readBuf []byte // scratch for a single raw read (virtio hdr + superpacket)
segBuf []byte // backing store for segmented output
segOff int // cursor into segBuf for the current Read drain
pending [][]byte // segments returned from the most recent Read
writeIovs [2]unix.Iovec // preallocated iovecs for Write (coalescer passthrough); iovs[0] is fixed to validVnetHdr
// rejectIovs is a second preallocated iovec scratch used exclusively by
// WriteReject (reject + self-forward from the inside path). It mirrors
// writeIovs but lets listenIn goroutines emit reject packets without
@@ -217,16 +216,15 @@ func (r *tunFile) readRaw(buf []byte) (int, error) {
}
}
// ReadBatch reads one or more superpackets from the tun and returns the
// Read reads one or more superpackets from the tun and returns the
// resulting packets. The first read blocks via poll; once the fd is known
// readable we drain additional packets non-blocking until the kernel queue
// is empty (EAGAIN), we've collected tunDrainCap packets, or we're out of
// segBuf headroom. This amortizes the poll wake over bursts of small
// packets (e.g. TCP ACKs). Slices point into the tunFile's internal buffers
// and are only valid until the next ReadBatch / Read / Close on this Queue.
func (r *tunFile) ReadBatch() ([][]byte, error) {
// and are only valid until the next Read or Close on this Queue.
func (r *tunFile) Read() ([][]byte, error) {
r.pending = r.pending[:0]
r.pendingIdx = 0
r.segOff = 0
// Initial (blocking) read. Retry on decode errors so a single bad
@@ -291,25 +289,6 @@ func (r *tunFile) decodeRead(n int) error {
return nil
}
// Read drains segments produced by the last ReadBatch one at a time; when the
// batch is exhausted it fetches a fresh one. Kept for io.Reader compatibility;
// batch-aware callers should use ReadBatch directly.
func (r *tunFile) Read(buf []byte) (int, error) {
for {
if r.pendingIdx < len(r.pending) {
seg := r.pending[r.pendingIdx]
r.pendingIdx++
if len(seg) > len(buf) {
return 0, io.ErrShortBuffer
}
return copy(buf, seg), nil
}
if _, err := r.ReadBatch(); err != nil {
return 0, err
}
}
}
func (r *tunFile) Write(buf []byte) (int, error) {
return r.writeWithScratch(buf, &r.writeIovs)
}

View File

@@ -25,11 +25,11 @@ const tunSegBufSize = 131072
// tunSegBufCap is the total size we allocate for the per-reader segment
// buffer. It is sized as one worst-case TSO superpacket (tunSegBufSize) plus
// the same again as drain headroom so a ReadBatch wake can accumulate
// the same again as drain headroom so a Read wake can accumulate
// additional packets after an initial big read without overflowing.
const tunSegBufCap = tunSegBufSize * 2
// tunDrainCap caps how many packets a single ReadBatch will accumulate via
// tunDrainCap caps how many packets a single Read will accumulate via
// the post-wake drain loop. Sized to soak up a burst of small ACKs while
// bounding how much work a single caller holds before handing off.
const tunDrainCap = 64

View File

@@ -70,11 +70,11 @@ type tun struct {
batchRet [1][]byte
}
func (t *tun) ReadBatch() ([][]byte, error) {
func (t *tun) Read() ([][]byte, error) {
if t.readBuf == nil {
t.readBuf = make([]byte, defaultBatchBufSize)
}
n, err := t.Read(t.readBuf)
n, err := t.readOne(t.readBuf)
if err != nil {
return nil, err
}
@@ -159,7 +159,7 @@ func (t *tun) Close() error {
return nil
}
func (t *tun) Read(to []byte) (int, error) {
func (t *tun) readOne(to []byte) (int, error) {
rc, err := t.f.SyscallConn()
if err != nil {
return 0, fmt.Errorf("failed to get syscall conn for tun: %w", err)

View File

@@ -63,11 +63,11 @@ type tun struct {
batchRet [1][]byte
}
func (t *tun) ReadBatch() ([][]byte, error) {
func (t *tun) Read() ([][]byte, error) {
if t.readBuf == nil {
t.readBuf = make([]byte, defaultBatchBufSize)
}
n, err := t.Read(t.readBuf)
n, err := t.readOne(t.readBuf)
if err != nil {
return nil, err
}
@@ -142,7 +142,7 @@ func (t *tun) Close() error {
return nil
}
func (t *tun) Read(to []byte) (int, error) {
func (t *tun) readOne(to []byte) (int, error) {
buf := make([]byte, len(to)+4)
n, err := t.f.Read(buf)

View File

@@ -27,19 +27,15 @@ type TestTun struct {
rxPackets chan []byte // Packets to receive into nebula
TxPackets chan []byte // Packets transmitted outside by nebula
readBuf []byte
batchRet [1][]byte
}
func (t *TestTun) ReadBatch() ([][]byte, error) {
if t.readBuf == nil {
t.readBuf = make([]byte, defaultBatchBufSize)
func (t *TestTun) Read() ([][]byte, error) {
p, ok := <-t.rxPackets
if !ok {
return nil, os.ErrClosed
}
n, err := t.Read(t.readBuf)
if err != nil {
return nil, err
}
t.batchRet[0] = t.readBuf[:n]
t.batchRet[0] = p
return t.batchRet[:], nil
}
@@ -142,15 +138,6 @@ func (t *TestTun) Close() error {
return nil
}
func (t *TestTun) Read(b []byte) (int, error) {
p, ok := <-t.rxPackets
if !ok {
return 0, os.ErrClosed
}
copy(b, p)
return len(p), nil
}
func (t *TestTun) SupportsMultiqueue() bool {
return false
}

View File

@@ -40,11 +40,11 @@ type winTun struct {
batchRet [1][]byte
}
func (t *winTun) ReadBatch() ([][]byte, error) {
func (t *winTun) Read() ([][]byte, error) {
if t.readBuf == nil {
t.readBuf = make([]byte, defaultBatchBufSize)
}
n, err := t.Read(t.readBuf)
n, err := t.tun.Read(t.readBuf, 0)
if err != nil {
return nil, err
}
@@ -247,10 +247,6 @@ func (t *winTun) Name() string {
return t.Device
}
func (t *winTun) Read(b []byte) (int, error) {
return t.tun.Read(b, 0)
}
func (t *winTun) Write(b []byte) (int, error) {
return t.tun.Write(b, 0)
}

View File

@@ -39,11 +39,11 @@ type UserDevice struct {
batchRet [1][]byte
}
func (d *UserDevice) ReadBatch() ([][]byte, error) {
func (d *UserDevice) Read() ([][]byte, error) {
if d.readBuf == nil {
d.readBuf = make([]byte, defaultBatchBufSize)
}
n, err := d.Read(d.readBuf)
n, err := d.outboundReader.Read(d.readBuf)
if err != nil {
return nil, err
}
@@ -73,9 +73,6 @@ func (d *UserDevice) Pipe() (*io.PipeReader, *io.PipeWriter) {
return d.inboundReader, d.outboundWriter
}
func (d *UserDevice) Read(p []byte) (n int, err error) {
return d.outboundReader.Read(p)
}
func (d *UserDevice) Write(p []byte) (n int, err error) {
return d.inboundWriter.Write(p)
}