From f8b09a295dc032357a604ef16f4293b397e00bc3 Mon Sep 17 00:00:00 2001 From: JackDoan Date: Mon, 20 Apr 2026 12:50:43 -0500 Subject: [PATCH] ReadBatch is named Read now --- interface.go | 2 +- overlay/device.go | 25 +++++++++++++++-------- overlay/noop.go | 6 +----- overlay/tun_android.go | 24 ++++++++++++++-------- overlay/tun_darwin.go | 26 ++++++++++++------------ overlay/tun_disabled.go | 36 +++++++++------------------------ overlay/tun_freebsd.go | 6 +++--- overlay/tun_ios.go | 22 +++++++++++++------- overlay/tun_linux.go | 39 +++++++++--------------------------- overlay/tun_linux_offload.go | 4 ++-- overlay/tun_netbsd.go | 6 +++--- overlay/tun_openbsd.go | 6 +++--- overlay/tun_tester.go | 23 +++++---------------- overlay/tun_windows.go | 8 ++------ overlay/user.go | 7 ++----- 15 files changed, 102 insertions(+), 138 deletions(-) diff --git a/interface.go b/interface.go index c5aae59a..7c7195f3 100644 --- a/interface.go +++ b/interface.go @@ -350,7 +350,7 @@ func (f *Interface) listenIn(reader overlay.Queue, i int) { conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout) for { - pkts, err := reader.ReadBatch() + pkts, err := reader.Read() if err != nil { if !f.closed.Load() { f.l.WithError(err).WithField("reader", i).Error("Error while reading outbound packet, closing") diff --git a/overlay/device.go b/overlay/device.go index de93f381..70ca01a5 100644 --- a/overlay/device.go +++ b/overlay/device.go @@ -7,18 +7,27 @@ import ( "github.com/slackhq/nebula/routing" ) -// defaultBatchBufSize is the per-Queue scratch size for ReadBatch on backends +// defaultBatchBufSize is the per-Queue scratch size for Read on backends // that don't do TSO segmentation. 65535 covers any single IP packet. const defaultBatchBufSize = 65535 -// Queue is a readable/writable tun queue. ReadBatch returns one or more -// packets; the returned slices are borrowed from the queue's internal buffer -// and are only valid until the next ReadBatch / Read / Close on this Queue. -// Callers must encrypt or copy each slice before the next call. Not safe for -// concurrent use — one goroutine per Queue. +// Queue is a readable/writable tun queue. One Queue is driven by a single +// read goroutine plus concurrent writers (see Write / WriteReject below). type Queue interface { - io.ReadWriteCloser - ReadBatch() ([][]byte, error) + io.Closer + + // Read returns one or more packets. The returned slices are borrowed + // from the Queue's internal buffer and are only valid until the next + // Read or Close on this Queue — callers must encrypt or copy each + // slice before the next call. Not safe for concurrent Reads; exactly + // one goroutine per Queue reads. + Read() ([][]byte, error) + + // Write emits a single packet on the plaintext (outside→inside) + // delivery path. May run concurrently with WriteReject on the same + // Queue, but not with itself. + Write(p []byte) (int, error) + // WriteReject writes a single packet that originated from the inside // path (reject replies or self-forward) using scratch state distinct // from Write, so it can run concurrently with Write on the same Queue diff --git a/overlay/noop.go b/overlay/noop.go index 15a36a22..dc2d3fb9 100644 --- a/overlay/noop.go +++ b/overlay/noop.go @@ -25,11 +25,7 @@ func (NoopTun) Name() string { return "noop" } -func (NoopTun) Read([]byte) (int, error) { - return 0, nil -} - -func (NoopTun) ReadBatch() ([][]byte, error) { +func (NoopTun) Read() ([][]byte, error) { return nil, nil } diff --git a/overlay/tun_android.go b/overlay/tun_android.go index d152732b..62de337d 100644 --- a/overlay/tun_android.go +++ b/overlay/tun_android.go @@ -18,7 +18,7 @@ import ( ) type tun struct { - io.ReadWriteCloser + rwc io.ReadWriteCloser fd int vpnNetworks []netip.Prefix Routes atomic.Pointer[[]Route] @@ -29,11 +29,11 @@ type tun struct { batchRet [1][]byte } -func (t *tun) ReadBatch() ([][]byte, error) { +func (t *tun) Read() ([][]byte, error) { if t.readBuf == nil { t.readBuf = make([]byte, defaultBatchBufSize) } - n, err := t.Read(t.readBuf) + n, err := t.rwc.Read(t.readBuf) if err != nil { return nil, err } @@ -41,8 +41,16 @@ func (t *tun) ReadBatch() ([][]byte, error) { return t.batchRet[:], nil } +func (t *tun) Write(p []byte) (int, error) { + return t.rwc.Write(p) +} + func (t *tun) WriteReject(p []byte) (int, error) { - return t.Write(p) + return t.rwc.Write(p) +} + +func (t *tun) Close() error { + return t.rwc.Close() } func newTunFromFd(c *config.C, l *logrus.Logger, deviceFd int, vpnNetworks []netip.Prefix) (*tun, error) { @@ -51,10 +59,10 @@ func newTunFromFd(c *config.C, l *logrus.Logger, deviceFd int, vpnNetworks []net file := os.NewFile(uintptr(deviceFd), "/dev/net/tun") t := &tun{ - ReadWriteCloser: file, - fd: deviceFd, - vpnNetworks: vpnNetworks, - l: l, + rwc: file, + fd: deviceFd, + vpnNetworks: vpnNetworks, + l: l, } err := t.reload(c, true) diff --git a/overlay/tun_darwin.go b/overlay/tun_darwin.go index e9b39ce4..7f50c705 100644 --- a/overlay/tun_darwin.go +++ b/overlay/tun_darwin.go @@ -23,7 +23,7 @@ import ( ) type tun struct { - io.ReadWriteCloser + rwc io.ReadWriteCloser Device string vpnNetworks []netip.Prefix DefaultMTU int @@ -127,11 +127,11 @@ func newTun(c *config.C, l *logrus.Logger, vpnNetworks []netip.Prefix, _ bool) ( } t := &tun{ - ReadWriteCloser: os.NewFile(uintptr(fd), ""), - Device: name, - vpnNetworks: vpnNetworks, - DefaultMTU: c.GetInt("tun.mtu", DefaultMTU), - l: l, + rwc: os.NewFile(uintptr(fd), ""), + Device: name, + vpnNetworks: vpnNetworks, + DefaultMTU: c.GetInt("tun.mtu", DefaultMTU), + l: l, } err = t.reload(c, true) @@ -161,8 +161,8 @@ func newTunFromFd(_ *config.C, _ *logrus.Logger, _ int, _ []netip.Prefix) (*tun, } func (t *tun) Close() error { - if t.ReadWriteCloser != nil { - return t.ReadWriteCloser.Close() + if t.rwc != nil { + return t.rwc.Close() } return nil } @@ -506,20 +506,20 @@ func delRoute(prefix netip.Prefix, gateway netroute.Addr) error { return nil } -func (t *tun) Read(to []byte) (int, error) { +func (t *tun) readOne(to []byte) (int, error) { buf := make([]byte, len(to)+4) - n, err := t.ReadWriteCloser.Read(buf) + n, err := t.rwc.Read(buf) copy(to, buf[4:]) return n - 4, err } -func (t *tun) ReadBatch() ([][]byte, error) { +func (t *tun) Read() ([][]byte, error) { if t.readBuf == nil { t.readBuf = make([]byte, defaultBatchBufSize) } - n, err := t.Read(t.readBuf) + n, err := t.readOne(t.readBuf) if err != nil { return nil, err } @@ -556,7 +556,7 @@ func (t *tun) Write(from []byte) (int, error) { copy(buf[4:], from) - n, err := t.ReadWriteCloser.Write(buf) + n, err := t.rwc.Write(buf) return n - 4, err } diff --git a/overlay/tun_disabled.go b/overlay/tun_disabled.go index c8a8afe5..8a691ae0 100644 --- a/overlay/tun_disabled.go +++ b/overlay/tun_disabled.go @@ -21,19 +21,21 @@ type disabledTun struct { rx metrics.Counter l *logrus.Logger - readBuf []byte batchRet [1][]byte } -func (t *disabledTun) ReadBatch() ([][]byte, error) { - if t.readBuf == nil { - t.readBuf = make([]byte, defaultBatchBufSize) +func (t *disabledTun) Read() ([][]byte, error) { + r, ok := <-t.read + if !ok { + return nil, io.EOF } - n, err := t.Read(t.readBuf) - if err != nil { - return nil, err + + t.tx.Inc(1) + if t.l.Level >= logrus.DebugLevel { + t.l.WithField("raw", prettyPacket(r)).Debugf("Write payload") } - t.batchRet[0] = t.readBuf[:n] + + t.batchRet[0] = r return t.batchRet[:], nil } @@ -71,24 +73,6 @@ func (*disabledTun) Name() string { return "disabled" } -func (t *disabledTun) Read(b []byte) (int, error) { - r, ok := <-t.read - if !ok { - return 0, io.EOF - } - - if len(r) > len(b) { - return 0, fmt.Errorf("packet larger than mtu: %d > %d bytes", len(r), len(b)) - } - - t.tx.Inc(1) - if t.l.Level >= logrus.DebugLevel { - t.l.WithField("raw", prettyPacket(r)).Debugf("Write payload") - } - - return copy(b, r), nil -} - func (t *disabledTun) handleICMPEchoRequest(b []byte) bool { out := make([]byte, len(b)) out = iputil.CreateICMPEchoResponse(b, out) diff --git a/overlay/tun_freebsd.go b/overlay/tun_freebsd.go index e4707e07..7510075f 100644 --- a/overlay/tun_freebsd.go +++ b/overlay/tun_freebsd.go @@ -98,11 +98,11 @@ type tun struct { batchRet [1][]byte } -func (t *tun) ReadBatch() ([][]byte, error) { +func (t *tun) Read() ([][]byte, error) { if t.readBuf == nil { t.readBuf = make([]byte, defaultBatchBufSize) } - n, err := t.Read(t.readBuf) + n, err := t.readOne(t.readBuf) if err != nil { return nil, err } @@ -114,7 +114,7 @@ func (t *tun) WriteReject(p []byte) (int, error) { return t.Write(p) } -func (t *tun) Read(to []byte) (int, error) { +func (t *tun) readOne(to []byte) (int, error) { // use readv() to read from the tunnel device, to eliminate the need for copying the buffer if t.devFd < 0 { return -1, syscall.EINVAL diff --git a/overlay/tun_ios.go b/overlay/tun_ios.go index 253644a5..ebf134b8 100644 --- a/overlay/tun_ios.go +++ b/overlay/tun_ios.go @@ -21,7 +21,7 @@ import ( ) type tun struct { - io.ReadWriteCloser + rwc io.ReadWriteCloser vpnNetworks []netip.Prefix Routes atomic.Pointer[[]Route] routeTree atomic.Pointer[bart.Table[routing.Gateways]] @@ -31,11 +31,11 @@ type tun struct { batchRet [1][]byte } -func (t *tun) ReadBatch() ([][]byte, error) { +func (t *tun) Read() ([][]byte, error) { if t.readBuf == nil { t.readBuf = make([]byte, defaultBatchBufSize) } - n, err := t.Read(t.readBuf) + n, err := t.rwc.Read(t.readBuf) if err != nil { return nil, err } @@ -43,8 +43,16 @@ func (t *tun) ReadBatch() ([][]byte, error) { return t.batchRet[:], nil } +func (t *tun) Write(p []byte) (int, error) { + return t.rwc.Write(p) +} + func (t *tun) WriteReject(p []byte) (int, error) { - return t.Write(p) + return t.rwc.Write(p) +} + +func (t *tun) Close() error { + return t.rwc.Close() } func newTun(_ *config.C, _ *logrus.Logger, _ []netip.Prefix, _ bool) (*tun, error) { @@ -54,9 +62,9 @@ func newTun(_ *config.C, _ *logrus.Logger, _ []netip.Prefix, _ bool) (*tun, erro func newTunFromFd(c *config.C, l *logrus.Logger, deviceFd int, vpnNetworks []netip.Prefix) (*tun, error) { file := os.NewFile(uintptr(deviceFd), "/dev/tun") t := &tun{ - vpnNetworks: vpnNetworks, - ReadWriteCloser: &tunReadCloser{f: file}, - l: l, + vpnNetworks: vpnNetworks, + rwc: &tunReadCloser{f: file}, + l: l, } err := t.reload(c, true) diff --git a/overlay/tun_linux.go b/overlay/tun_linux.go index 777f5cac..7dd241ba 100644 --- a/overlay/tun_linux.go +++ b/overlay/tun_linux.go @@ -41,13 +41,12 @@ type tunFile struct { // kernel successfully accepted TUNSETOFFLOAD. Reads include a leading // virtio_net_hdr and may carry a TSO superpacket we must segment; // writes must prepend a zeroed virtio_net_hdr. - vnetHdr bool - readBuf []byte // scratch for a single raw read (virtio hdr + superpacket) - segBuf []byte // backing store for segmented output - segOff int // cursor into segBuf for the current ReadBatch drain - pending [][]byte // segments waiting to be drained by Read - pendingIdx int - writeIovs [2]unix.Iovec // preallocated iovecs for Write (coalescer passthrough); iovs[0] is fixed to validVnetHdr + vnetHdr bool + readBuf []byte // scratch for a single raw read (virtio hdr + superpacket) + segBuf []byte // backing store for segmented output + segOff int // cursor into segBuf for the current Read drain + pending [][]byte // segments returned from the most recent Read + writeIovs [2]unix.Iovec // preallocated iovecs for Write (coalescer passthrough); iovs[0] is fixed to validVnetHdr // rejectIovs is a second preallocated iovec scratch used exclusively by // WriteReject (reject + self-forward from the inside path). It mirrors // writeIovs but lets listenIn goroutines emit reject packets without @@ -217,16 +216,15 @@ func (r *tunFile) readRaw(buf []byte) (int, error) { } } -// ReadBatch reads one or more superpackets from the tun and returns the +// Read reads one or more superpackets from the tun and returns the // resulting packets. The first read blocks via poll; once the fd is known // readable we drain additional packets non-blocking until the kernel queue // is empty (EAGAIN), we've collected tunDrainCap packets, or we're out of // segBuf headroom. This amortizes the poll wake over bursts of small // packets (e.g. TCP ACKs). Slices point into the tunFile's internal buffers -// and are only valid until the next ReadBatch / Read / Close on this Queue. -func (r *tunFile) ReadBatch() ([][]byte, error) { +// and are only valid until the next Read or Close on this Queue. +func (r *tunFile) Read() ([][]byte, error) { r.pending = r.pending[:0] - r.pendingIdx = 0 r.segOff = 0 // Initial (blocking) read. Retry on decode errors so a single bad @@ -291,25 +289,6 @@ func (r *tunFile) decodeRead(n int) error { return nil } -// Read drains segments produced by the last ReadBatch one at a time; when the -// batch is exhausted it fetches a fresh one. Kept for io.Reader compatibility; -// batch-aware callers should use ReadBatch directly. -func (r *tunFile) Read(buf []byte) (int, error) { - for { - if r.pendingIdx < len(r.pending) { - seg := r.pending[r.pendingIdx] - r.pendingIdx++ - if len(seg) > len(buf) { - return 0, io.ErrShortBuffer - } - return copy(buf, seg), nil - } - if _, err := r.ReadBatch(); err != nil { - return 0, err - } - } -} - func (r *tunFile) Write(buf []byte) (int, error) { return r.writeWithScratch(buf, &r.writeIovs) } diff --git a/overlay/tun_linux_offload.go b/overlay/tun_linux_offload.go index 660c80ad..2d6e9a58 100644 --- a/overlay/tun_linux_offload.go +++ b/overlay/tun_linux_offload.go @@ -25,11 +25,11 @@ const tunSegBufSize = 131072 // tunSegBufCap is the total size we allocate for the per-reader segment // buffer. It is sized as one worst-case TSO superpacket (tunSegBufSize) plus -// the same again as drain headroom so a ReadBatch wake can accumulate +// the same again as drain headroom so a Read wake can accumulate // additional packets after an initial big read without overflowing. const tunSegBufCap = tunSegBufSize * 2 -// tunDrainCap caps how many packets a single ReadBatch will accumulate via +// tunDrainCap caps how many packets a single Read will accumulate via // the post-wake drain loop. Sized to soak up a burst of small ACKs while // bounding how much work a single caller holds before handing off. const tunDrainCap = 64 diff --git a/overlay/tun_netbsd.go b/overlay/tun_netbsd.go index fe7827d3..995a9a9f 100644 --- a/overlay/tun_netbsd.go +++ b/overlay/tun_netbsd.go @@ -70,11 +70,11 @@ type tun struct { batchRet [1][]byte } -func (t *tun) ReadBatch() ([][]byte, error) { +func (t *tun) Read() ([][]byte, error) { if t.readBuf == nil { t.readBuf = make([]byte, defaultBatchBufSize) } - n, err := t.Read(t.readBuf) + n, err := t.readOne(t.readBuf) if err != nil { return nil, err } @@ -159,7 +159,7 @@ func (t *tun) Close() error { return nil } -func (t *tun) Read(to []byte) (int, error) { +func (t *tun) readOne(to []byte) (int, error) { rc, err := t.f.SyscallConn() if err != nil { return 0, fmt.Errorf("failed to get syscall conn for tun: %w", err) diff --git a/overlay/tun_openbsd.go b/overlay/tun_openbsd.go index 1b8115f6..aab29bb5 100644 --- a/overlay/tun_openbsd.go +++ b/overlay/tun_openbsd.go @@ -63,11 +63,11 @@ type tun struct { batchRet [1][]byte } -func (t *tun) ReadBatch() ([][]byte, error) { +func (t *tun) Read() ([][]byte, error) { if t.readBuf == nil { t.readBuf = make([]byte, defaultBatchBufSize) } - n, err := t.Read(t.readBuf) + n, err := t.readOne(t.readBuf) if err != nil { return nil, err } @@ -142,7 +142,7 @@ func (t *tun) Close() error { return nil } -func (t *tun) Read(to []byte) (int, error) { +func (t *tun) readOne(to []byte) (int, error) { buf := make([]byte, len(to)+4) n, err := t.f.Read(buf) diff --git a/overlay/tun_tester.go b/overlay/tun_tester.go index fb75e006..684d1ce1 100644 --- a/overlay/tun_tester.go +++ b/overlay/tun_tester.go @@ -27,19 +27,15 @@ type TestTun struct { rxPackets chan []byte // Packets to receive into nebula TxPackets chan []byte // Packets transmitted outside by nebula - readBuf []byte batchRet [1][]byte } -func (t *TestTun) ReadBatch() ([][]byte, error) { - if t.readBuf == nil { - t.readBuf = make([]byte, defaultBatchBufSize) +func (t *TestTun) Read() ([][]byte, error) { + p, ok := <-t.rxPackets + if !ok { + return nil, os.ErrClosed } - n, err := t.Read(t.readBuf) - if err != nil { - return nil, err - } - t.batchRet[0] = t.readBuf[:n] + t.batchRet[0] = p return t.batchRet[:], nil } @@ -142,15 +138,6 @@ func (t *TestTun) Close() error { return nil } -func (t *TestTun) Read(b []byte) (int, error) { - p, ok := <-t.rxPackets - if !ok { - return 0, os.ErrClosed - } - copy(b, p) - return len(p), nil -} - func (t *TestTun) SupportsMultiqueue() bool { return false } diff --git a/overlay/tun_windows.go b/overlay/tun_windows.go index 71e33919..b02f33d5 100644 --- a/overlay/tun_windows.go +++ b/overlay/tun_windows.go @@ -40,11 +40,11 @@ type winTun struct { batchRet [1][]byte } -func (t *winTun) ReadBatch() ([][]byte, error) { +func (t *winTun) Read() ([][]byte, error) { if t.readBuf == nil { t.readBuf = make([]byte, defaultBatchBufSize) } - n, err := t.Read(t.readBuf) + n, err := t.tun.Read(t.readBuf, 0) if err != nil { return nil, err } @@ -247,10 +247,6 @@ func (t *winTun) Name() string { return t.Device } -func (t *winTun) Read(b []byte) (int, error) { - return t.tun.Read(b, 0) -} - func (t *winTun) Write(b []byte) (int, error) { return t.tun.Write(b, 0) } diff --git a/overlay/user.go b/overlay/user.go index 71ff84d6..77c2d025 100644 --- a/overlay/user.go +++ b/overlay/user.go @@ -39,11 +39,11 @@ type UserDevice struct { batchRet [1][]byte } -func (d *UserDevice) ReadBatch() ([][]byte, error) { +func (d *UserDevice) Read() ([][]byte, error) { if d.readBuf == nil { d.readBuf = make([]byte, defaultBatchBufSize) } - n, err := d.Read(d.readBuf) + n, err := d.outboundReader.Read(d.readBuf) if err != nil { return nil, err } @@ -73,9 +73,6 @@ func (d *UserDevice) Pipe() (*io.PipeReader, *io.PipeWriter) { return d.inboundReader, d.outboundWriter } -func (d *UserDevice) Read(p []byte) (n int, err error) { - return d.outboundReader.Read(p) -} func (d *UserDevice) Write(p []byte) (n int, err error) { return d.inboundWriter.Write(p) }