From 3b1e658bef8f97df2cd32c4a1026cd4252cab386 Mon Sep 17 00:00:00 2001 From: JackDoan Date: Thu, 14 May 2026 11:42:59 -0500 Subject: [PATCH] change Queue.Read signature --- interface.go | 21 +++++------ overlay/tio/tio.go | 36 ++++++------------- overlay/tun_android.go | 21 +++++------ overlay/tun_darwin.go | 30 ++++++---------- overlay/tun_disabled.go | 79 ++++++++++++++++++++--------------------- overlay/tun_freebsd.go | 21 +++++------ overlay/tun_ios.go | 34 +++++++----------- overlay/tun_netbsd.go | 21 +++++------ overlay/tun_openbsd.go | 30 ++++++---------- overlay/tun_tester.go | 23 ++++++------ overlay/tun_windows.go | 21 +++++------ overlay/user.go | 19 +++++----- wire/wire.go | 11 ++++++ 13 files changed, 170 insertions(+), 197 deletions(-) create mode 100644 wire/wire.go diff --git a/interface.go b/interface.go index dda43e51..303771d6 100644 --- a/interface.go +++ b/interface.go @@ -14,6 +14,7 @@ import ( "github.com/gaissmai/bart" "github.com/rcrowley/go-metrics" "github.com/slackhq/nebula/util" + "github.com/slackhq/nebula/wire" "github.com/slackhq/nebula/config" "github.com/slackhq/nebula/firewall" @@ -372,45 +373,45 @@ func (f *Interface) listenOut(i int) { f.l.Debug("underlay reader is done", "reader", i) } -func (f *Interface) listenIn(reader tio.Queue, i int) { +func (f *Interface) listenIn(reader tio.Queue, q int) { // Pinning this thread (and goroutine) to a single CPU keeps every sendmmsg from this goroutine going through the // same TX ring on the nic, so the wire sees per-flow order. - cpu := i % runtime.NumCPU() + cpu := q % runtime.NumCPU() if n := len(f.cpuAffinity); n > 0 { - cpu = f.cpuAffinity[i%n] + cpu = f.cpuAffinity[q%n] } if err := util.PinThreadToCPU(cpu); err != nil { - f.l.Warn("failed to pin tun reader to CPU", "queue", i, "cpu", cpu, "err", err) + f.l.Warn("failed to pin tun reader to CPU", "queue", q, "cpu", cpu, "err", err) } rejectBuf := make([]byte, mtu) arenaSize := batch.SendBatchCap * (udp.MTU + 32) - sb := batch.NewSendBatch(f.writers[i], batch.SendBatchCap, arenaSize) + sb := batch.NewSendBatch(f.writers[q], batch.SendBatchCap, arenaSize) fwPacket := &firewall.Packet{} nb := make([]byte, 12, 12) conntrackCache := firewall.NewConntrackCacheTicker(f.ctx, f.l, f.conntrackCacheTimeout) for { - pkts, err := reader.Read() + n, err := reader.Read(packets, packetMem) if err != nil { if !f.closed.Load() { - f.l.Error("Error while reading outbound packet, closing", "error", err, "reader", i) + f.l.Error("Error while reading outbound packet, closing", "error", err, "reader", q) f.onFatal(err) } break } ctCache := conntrackCache.Get() - for _, pkt := range pkts { - f.consumeInsidePacket(pkt, fwPacket, nb, sb, rejectBuf, i, ctCache) + for i := range n { + f.consumeInsidePacket(packets[i], fwPacket, nb, sb, rejectBuf, q, ctCache) } if err := sb.Flush(); err != nil { f.l.Error("Failed to write outgoing batch", "error", err, "writer", i) } } - f.l.Debug("overlay reader is done", "reader", i) + f.l.Debug("overlay reader is done", "reader", q) } func (f *Interface) RegisterConfigChangeCallbacks(c *config.C) { diff --git a/overlay/tio/tio.go b/overlay/tio/tio.go index bcb32007..20fcf388 100644 --- a/overlay/tio/tio.go +++ b/overlay/tio/tio.go @@ -2,6 +2,8 @@ package tio import ( "io" + + "github.com/slackhq/nebula/wire" ) // QueueSet holds one or many Queue objects and helps close them in an orderly way. @@ -13,10 +15,8 @@ type QueueSet interface { Add(fd int) error } -// Capabilities advertises which kernel offload features a Queue -// successfully negotiated. Callers consult this to decide which coalescers -// to wire onto the write path — a Queue without TSO can't usefully accept a -// TCPCoalescer, and a Queue without USO can't accept a UDPCoalescer. +// Capabilities advertises which kernel offload features a Queue successfully negotiated. +// Callers consult this to decide which coalescers to wire onto the write path. type Capabilities struct { // TSO means the FD was opened with IFF_VNET_HDR and the kernel agreed // to TUN_F_TSO4|TSO6 — i.e. WriteGSO with GSOProtoTCP is safe. @@ -31,14 +31,13 @@ type Capabilities struct { type Queue interface { io.Closer - // Read returns one or more packets. The returned Packet.Bytes slices - // are borrowed from the Queue's internal buffer and are only valid - // until the next Read or Close on this Queue - callers must encrypt - // or copy each slice before the next call. - Read() ([]Packet, error) + // Read will read at least 1 packet from the tun (up to len(p)) + // mem will be used to provide the backing for each of p[n].Bytes + // Returns the number of packets actually read, or error + Read(p []wire.TunPacket, mem []byte) (int, error) // Write emits a single packet on the plaintext (outside→inside) - // delivery path. Not safe for concurrent Writes. + // delivery path. Write(p []byte) (int, error) // Capabilities returns the Queue's negotiated offload capabilities, @@ -46,15 +45,6 @@ type Queue interface { Capabilities() Capabilities } -// Packet is the unit Queue.Read returns. Bytes points into the queue's -// internal buffer and is only valid until the next Read or Close on the -// queue that produced it. GSO is the zero value for an already-segmented -// IP datagram; when non-zero it describes a kernel-supplied TSO/USO -// superpacket the caller must segment before consuming. -type Packet struct { - Bytes []byte - GSO GSOInfo -} // GSOInfo describes a kernel-supplied superpacket sitting in Packet.Bytes. // The zero value means "not a superpacket" — Bytes is one regular IP @@ -120,16 +110,12 @@ type GSOWriter interface { // queue advertises the negotiated capability for `want`. A writer that // implements GSOWriter but not CapsProvider is treated as permissive // (used by tests and fakes that don't negotiate). -func SupportsGSO(w any, want GSOProto) (GSOWriter, bool) { +func SupportsGSO(w Queue, want GSOProto) (GSOWriter, bool) { gw, ok := w.(GSOWriter) if !ok { return nil, false } - cp, ok := w.(CapsProvider) - if !ok { - return gw, true - } - caps := cp.Capabilities() + caps := w.Capabilities() switch want { case GSOProtoTCP: return gw, caps.TSO diff --git a/overlay/tun_android.go b/overlay/tun_android.go index d3664298..c26033c6 100644 --- a/overlay/tun_android.go +++ b/overlay/tun_android.go @@ -16,6 +16,7 @@ import ( "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/util" + "github.com/slackhq/nebula/wire" ) type tun struct { @@ -25,18 +26,19 @@ type tun struct { Routes atomic.Pointer[[]Route] routeTree atomic.Pointer[bart.Table[routing.Gateways]] l *slog.Logger - - readBuf []byte - batchRet [1]tio.Packet } -func (t *tun) Read() ([]tio.Packet, error) { - n, err := t.rwc.Read(t.readBuf) - if err != nil { - return nil, err +func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) == 0 { + return 0, nil //todo should this be an err? } - t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.rwc.Read(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[:n] + return 1, nil } func (t *tun) Write(p []byte) (int, error) { @@ -57,7 +59,6 @@ func newTunFromFd(c *config.C, l *slog.Logger, deviceFd int, vpnNetworks []netip fd: deviceFd, vpnNetworks: vpnNetworks, l: l, - readBuf: make([]byte, defaultBatchBufSize), } err := t.reload(c, true) diff --git a/overlay/tun_darwin.go b/overlay/tun_darwin.go index adf11bdf..3a4fa183 100644 --- a/overlay/tun_darwin.go +++ b/overlay/tun_darwin.go @@ -19,6 +19,7 @@ import ( "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/util" + "github.com/slackhq/nebula/wire" netroute "golang.org/x/net/route" "golang.org/x/sys/unix" ) @@ -35,9 +36,6 @@ type tun struct { // cache out buffer since we need to prepend 4 bytes for tun metadata out []byte - - readBuf []byte - batchRet [1]tio.Packet } type ifReq struct { @@ -133,7 +131,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*t vpnNetworks: vpnNetworks, DefaultMTU: c.GetInt("tun.mtu", DefaultMTU), l: l, - readBuf: make([]byte, defaultBatchBufSize), } err = t.reload(c, true) @@ -507,22 +504,17 @@ func delRoute(prefix netip.Prefix, gateway netroute.Addr) error { return nil } -func (t *tun) readOne(to []byte) (int, error) { - buf := make([]byte, len(to)+4) - - n, err := t.rwc.Read(buf) - - copy(to, buf[4:]) - return n - 4, err -} - -func (t *tun) Read() ([]tio.Packet, error) { - n, err := t.readOne(t.readBuf) - if err != nil { - return nil, err +func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) <= 4 { + return 0, nil //todo should this be an err? } - t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.rwc.Read(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[4:n] + return 1, nil } // Write is only valid for single threaded use diff --git a/overlay/tun_disabled.go b/overlay/tun_disabled.go index 524b6a0b..d96f912b 100644 --- a/overlay/tun_disabled.go +++ b/overlay/tun_disabled.go @@ -12,6 +12,7 @@ import ( "github.com/slackhq/nebula/iputil" "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" + "github.com/slackhq/nebula/wire" ) type disabledTun struct { @@ -21,47 +22,8 @@ type disabledTun struct { // Track these metrics since we don't have the tun device to do it for us tx metrics.Counter rx metrics.Counter - l *slog.Logger numReaders int -} - -// disabledQueue is one tio.Queue view onto a shared disabledTun. Each queue -// owns a private batchRet so concurrent Read calls from different reader -// goroutines do not race on the returned slice. -type disabledQueue struct { - parent *disabledTun - batchRet [1]tio.Packet -} - -func (q *disabledQueue) Capabilities() tio.Capabilities { - return tio.Capabilities{} -} - -func (q *disabledQueue) Read() ([]tio.Packet, error) { - r, ok := <-q.parent.read - if !ok { - return nil, io.EOF - } - - q.parent.tx.Inc(1) - if q.parent.l.Enabled(context.Background(), slog.LevelDebug) { - q.parent.l.Debug("Write payload", "raw", prettyPacket(r)) - } - - q.batchRet[0] = tio.Packet{Bytes: r} - return q.batchRet[:], nil -} - -// Write on a queue forwards to the underlying disabledTun. All queues share -// one ICMP-handling/log path so this is a thin pass-through. -func (q *disabledQueue) Write(b []byte) (int, error) { - return q.parent.Write(b) -} - -// Close on a queue is a no-op. The shared channel and metrics are owned by -// the disabledTun; Close on the device tears them down once for everybody. -func (q *disabledQueue) Close() error { - return nil + l *slog.Logger } func newDisabledTun(vpnNetworks []netip.Prefix, queueLen int, metricsEnabled bool, l *slog.Logger) *disabledTun { @@ -99,6 +61,37 @@ func (*disabledTun) Name() string { return "disabled" } +func (t *disabledTun) readOne(b []byte) (int, error) { + r, ok := <-t.read + if !ok { + return 0, io.EOF + } + + if len(r) > len(b) { + return 0, fmt.Errorf("packet larger than mtu: %d > %d bytes", len(r), len(b)) + } + + t.tx.Inc(1) + if t.l.Enabled(context.Background(), slog.LevelDebug) { + t.l.Debug("Write payload", "raw", prettyPacket(r)) + } + + return copy(b, r), nil +} + +func (t *disabledTun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) == 0 { + return 0, nil //todo should this be an err? + } + p[0].Meta = struct{}{} + n, err := t.readOne(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[:n] + return 1, nil +} + func (t *disabledTun) handleICMPEchoRequest(b []byte) bool { out := make([]byte, len(b)) out = iputil.CreateICMPEchoResponse(b, out) @@ -142,11 +135,15 @@ func (t *disabledTun) NewMultiQueueReader() error { func (t *disabledTun) Readers() []tio.Queue { out := make([]tio.Queue, t.numReaders) for i := range t.numReaders { - out[i] = &disabledQueue{parent: t} + out[i] = t } return out } +func (t *disabledTun) Capabilities() tio.Capabilities { + return tio.Capabilities{} +} + func (t *disabledTun) Close() error { if t.read != nil { close(t.read) diff --git a/overlay/tun_freebsd.go b/overlay/tun_freebsd.go index f22d15ac..b67cde40 100644 --- a/overlay/tun_freebsd.go +++ b/overlay/tun_freebsd.go @@ -17,6 +17,7 @@ import ( "unsafe" "github.com/gaissmai/bart" + "github.com/slackhq/nebula/wire" "github.com/slackhq/nebula/config" "github.com/slackhq/nebula/overlay/tio" @@ -102,9 +103,6 @@ type tun struct { readPoll [2]unix.PollFd writePoll [2]unix.PollFd closed atomic.Bool - - readBuf []byte - batchRet [1]tio.Packet } // blockOnRead waits until the tun fd is readable or shutdown has been signaled. @@ -159,13 +157,17 @@ func (t *tun) blockOnWrite() error { return nil } -func (t *tun) Read() ([]tio.Packet, error) { - n, err := t.readOne(t.readBuf) - if err != nil { - return nil, err +func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) == 0 { + return 0, nil //todo should this be an err? } - t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.readOne(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[4:n] + return 1, nil } func (t *tun) readOne(to []byte) (int, error) { @@ -386,7 +388,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*t MTU: c.GetInt("tun.mtu", DefaultMTU), l: l, fd: fd, - readBuf: make([]byte, defaultBatchBufSize), shutdownR: shutdownR, shutdownW: shutdownW, readPoll: [2]unix.PollFd{ diff --git a/overlay/tun_ios.go b/overlay/tun_ios.go index 79edd90b..7c6a5caf 100644 --- a/overlay/tun_ios.go +++ b/overlay/tun_ios.go @@ -19,6 +19,7 @@ import ( "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/util" + "github.com/slackhq/nebula/wire" ) type tun struct { @@ -27,18 +28,19 @@ type tun struct { Routes atomic.Pointer[[]Route] routeTree atomic.Pointer[bart.Table[routing.Gateways]] l *slog.Logger - - readBuf []byte - batchRet [1]tio.Packet } -func (t *tun) Read() ([]tio.Packet, error) { - n, err := t.rwc.Read(t.readBuf) - if err != nil { - return nil, err +func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) <= 4 { + return 0, nil //todo should this be an err? } - t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.rwc.Read(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[4:n] + return 1, nil } func (t *tun) Write(p []byte) (int, error) { @@ -59,7 +61,6 @@ func newTunFromFd(c *config.C, l *slog.Logger, deviceFd int, vpnNetworks []netip vpnNetworks: vpnNetworks, rwc: &tunReadCloser{f: file}, l: l, - readBuf: make([]byte, defaultBatchBufSize), } err := t.reload(c, true) @@ -118,18 +119,9 @@ type tunReadCloser struct { wBuf []byte } +// Read returns a packet with the BSD 4-byte header, watch out! func (tr *tunReadCloser) Read(to []byte) (int, error) { - tr.rMu.Lock() - defer tr.rMu.Unlock() - - if cap(tr.rBuf) < len(to)+4 { - tr.rBuf = make([]byte, len(to)+4) - } - tr.rBuf = tr.rBuf[:len(to)+4] - - n, err := tr.f.Read(tr.rBuf) - copy(to, tr.rBuf[4:]) - return n - 4, err + return tr.f.Read(to) } func (tr *tunReadCloser) Write(from []byte) (int, error) { diff --git a/overlay/tun_netbsd.go b/overlay/tun_netbsd.go index 8b373800..98d2f742 100644 --- a/overlay/tun_netbsd.go +++ b/overlay/tun_netbsd.go @@ -19,6 +19,7 @@ import ( "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/util" + "github.com/slackhq/nebula/wire" netroute "golang.org/x/net/route" "golang.org/x/sys/unix" ) @@ -66,18 +67,19 @@ type tun struct { l *slog.Logger f *os.File fd int - - readBuf []byte - batchRet [1]tio.Packet } -func (t *tun) Read() ([]tio.Packet, error) { - n, err := t.readOne(t.readBuf) - if err != nil { - return nil, err +func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) == 0 { + return 0, nil //todo should this be an err? } - t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.readOne(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[:n] + return 1, nil } func (t *tun) Readers() []tio.Queue { @@ -122,7 +124,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*t vpnNetworks: vpnNetworks, MTU: c.GetInt("tun.mtu", DefaultMTU), l: l, - readBuf: make([]byte, defaultBatchBufSize), } err = t.reload(c, true) diff --git a/overlay/tun_openbsd.go b/overlay/tun_openbsd.go index 80005b14..9c0df29b 100644 --- a/overlay/tun_openbsd.go +++ b/overlay/tun_openbsd.go @@ -19,6 +19,7 @@ import ( "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/util" + "github.com/slackhq/nebula/wire" netroute "golang.org/x/net/route" "golang.org/x/sys/unix" ) @@ -59,18 +60,19 @@ type tun struct { fd int // cache out buffer since we need to prepend 4 bytes for tun metadata out []byte - - readBuf []byte - batchRet [1]tio.Packet } -func (t *tun) Read() ([]tio.Packet, error) { - n, err := t.readOne(t.readBuf) - if err != nil { - return nil, err +func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) <= 4 { + return 0, nil //todo should this be an err? } - t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.f.Read(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[4:n] + return 1, nil } var deviceNameRE = regexp.MustCompile(`^tun[0-9]+$`) @@ -107,7 +109,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*t vpnNetworks: vpnNetworks, MTU: c.GetInt("tun.mtu", DefaultMTU), l: l, - readBuf: make([]byte, defaultBatchBufSize), } err = t.reload(c, true) @@ -137,15 +138,6 @@ func (t *tun) Close() error { return nil } -func (t *tun) readOne(to []byte) (int, error) { - buf := make([]byte, len(to)+4) - - n, err := t.f.Read(buf) - - copy(to, buf[4:]) - return n - 4, err -} - // Write is only valid for single threaded use func (t *tun) Write(from []byte) (int, error) { buf := t.out diff --git a/overlay/tun_tester.go b/overlay/tun_tester.go index fe33703e..37bea32c 100644 --- a/overlay/tun_tester.go +++ b/overlay/tun_tester.go @@ -17,6 +17,7 @@ import ( "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/udp" + "github.com/slackhq/nebula/wire" ) type TestTun struct { @@ -29,8 +30,6 @@ type TestTun struct { closed atomic.Bool rxPackets chan []byte // Packets to receive into nebula TxPackets chan []byte // Packets transmitted outside by nebula - - batchRet [1]tio.Packet } func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*TestTun, error) { @@ -51,9 +50,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*T l: l, rxPackets: make(chan []byte, 10), TxPackets: make(chan []byte, 10), - batchRet: [1]tio.Packet{ - tio.Packet{Bytes: make([]byte, udp.MTU)}, - }, }, nil } @@ -168,14 +164,17 @@ func (t *TestTun) Close() error { return nil } -func (t *TestTun) Read() ([]tio.Packet, error) { - t.batchRet[0].Bytes = t.batchRet[0].Bytes[:udp.MTU] - n, err := t.read(t.batchRet[0].Bytes) - if err != nil { - return nil, err +func (t *TestTun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) == 0 { + return 0, nil //todo should this be an err? } - t.batchRet[0].Bytes = t.batchRet[0].Bytes[:n] - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.read(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[:n] + return 1, nil } func (t *TestTun) read(b []byte) (int, error) { diff --git a/overlay/tun_windows.go b/overlay/tun_windows.go index 2ab83821..41a32bd6 100644 --- a/overlay/tun_windows.go +++ b/overlay/tun_windows.go @@ -21,6 +21,7 @@ import ( "github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/util" "github.com/slackhq/nebula/wintun" + "github.com/slackhq/nebula/wire" "golang.org/x/sys/windows" "golang.zx2c4.com/wireguard/windows/tunnel/winipcfg" ) @@ -45,18 +46,19 @@ type winTun struct { l *slog.Logger tun *wintun.NativeTun - - readBuf []byte - batchRet [1]tio.Packet } -func (t *winTun) Read() ([]tio.Packet, error) { - n, err := t.tun.Read(t.readBuf, 0) - if err != nil { - return nil, err +func (t *winTun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) == 0 { + return 0, nil //todo should this be an err? } - t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.tun.Read(mem, 0) + if err != nil { + return 0, err + } + p[0].Bytes = mem[:n] + return 1, nil } func newTunFromFd(_ *config.C, _ *slog.Logger, _ int, _ []netip.Prefix) (Device, error) { @@ -81,7 +83,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*w } t := &winTun{ - readBuf: make([]byte, defaultBatchBufSize), Device: deviceName, vpnNetworks: vpnNetworks, MTU: c.GetInt("tun.mtu", DefaultMTU), diff --git a/overlay/user.go b/overlay/user.go index 20f1e65d..64d772e5 100644 --- a/overlay/user.go +++ b/overlay/user.go @@ -8,6 +8,7 @@ import ( "github.com/slackhq/nebula/config" "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" + "github.com/slackhq/nebula/wire" ) func NewUserDeviceFromConfig(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, routines int) (Device, error) { @@ -37,25 +38,23 @@ type UserDevice struct { inboundReader *io.PipeReader inboundWriter *io.PipeWriter - - readBuf []byte - batchRet [1]tio.Packet } func (d *UserDevice) Capabilities() tio.Capabilities { return tio.Capabilities{} } -func (d *UserDevice) Read() ([]tio.Packet, error) { - if d.readBuf == nil { - d.readBuf = make([]byte, defaultBatchBufSize) +func (d *UserDevice) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) == 0 { + return 0, nil //todo should this be an err? } - n, err := d.outboundReader.Read(d.readBuf) + p[0].Meta = struct{}{} + n, err := d.outboundReader.Read(mem) if err != nil { - return nil, err + return 0, err } - d.batchRet[0] = tio.Packet{Bytes: d.readBuf[:n]} - return d.batchRet[:], nil + p[0].Bytes = mem[:n] + return 1, nil } func (d *UserDevice) Activate() error { diff --git a/wire/wire.go b/wire/wire.go new file mode 100644 index 00000000..acc20dd8 --- /dev/null +++ b/wire/wire.go @@ -0,0 +1,11 @@ +package wire + +// TunPacket is the unit a read from a tun device returns. +// On supported platforms, it may be a superpacket, but a single TunPacket will never have more than one destination. +type TunPacket struct { + // Bytes contains the actual packet + Bytes []byte + // Meta contains other information to help process the packet correctly, such as offsets for segmentation offloads + // Fields in Meta should be as portable/platform-agnostic as possible. + Meta struct{} +}