diff --git a/interface.go b/interface.go index fa446186..f4f87415 100644 --- a/interface.go +++ b/interface.go @@ -13,6 +13,7 @@ import ( "github.com/gaissmai/bart" "github.com/rcrowley/go-metrics" "github.com/slackhq/nebula/overlay/tio" + "github.com/slackhq/nebula/wire" "github.com/slackhq/nebula/config" "github.com/slackhq/nebula/firewall" @@ -326,7 +327,10 @@ func (f *Interface) listenOut(i int) { f.l.Debug("underlay reader is done", "reader", i) } -func (f *Interface) listenIn(reader tio.Queue, i int) { +func (f *Interface) listenIn(reader tio.Queue, q int) { + packetMem := make([]byte, mtu+16) //MTU + some leading slack space for platforms that return "bonus info" + // TODO get the amount of bonus info from the reader + packets := make([]wire.TunPacket, 1) out := make([]byte, mtu) fwPacket := &firewall.Packet{} nb := make([]byte, 12, 12) @@ -334,22 +338,21 @@ func (f *Interface) listenIn(reader tio.Queue, i int) { conntrackCache := firewall.NewConntrackCacheTicker(f.ctx, f.l, f.conntrackCacheTimeout) for { - pkts, err := reader.Read() + n, err := reader.Read(packets, packetMem) if err != nil { if !f.closed.Load() { - f.l.Error("Error while reading outbound packet, closing", "error", err, "reader", i) + f.l.Error("Error while reading outbound packet, closing", "error", err, "reader", q) f.onFatal(err) } break } ctCache := conntrackCache.Get() - for _, pkt := range pkts { - f.consumeInsidePacket(pkt.Bytes, fwPacket, nb, out, i, ctCache) + for i := range n { + f.consumeInsidePacket(packets[i].Bytes, fwPacket, nb, out, q, ctCache) } - } - f.l.Debug("overlay reader is done", "reader", i) + f.l.Debug("overlay reader is done", "reader", q) } func (f *Interface) RegisterConfigChangeCallbacks(c *config.C) { diff --git a/overlay/tio/tio.go b/overlay/tio/tio.go index d576084e..e16403fd 100644 --- a/overlay/tio/tio.go +++ b/overlay/tio/tio.go @@ -2,6 +2,8 @@ package tio import ( "io" + + "github.com/slackhq/nebula/wire" ) // QueueSet holds one or many Queue objects and helps close them in an orderly way. @@ -13,17 +15,10 @@ type QueueSet interface { Add(fd int) error } -// Capabilities advertises which kernel offload features a Queue -// successfully negotiated. Callers consult this to decide which coalescers -// to wire onto the write path — a Queue without TSO can't usefully accept a -// TCPCoalescer, and a Queue without USO can't accept a UDPCoalescer. +// Capabilities advertises which kernel offload features a Queue successfully negotiated. +// Callers consult this to decide which coalescers to wire onto the write path. type Capabilities struct { - // TSO means the FD was opened with IFF_VNET_HDR and the kernel agreed - // to TUN_F_TSO4|TSO6 — i.e. WriteGSO with GSOProtoTCP is safe. - TSO bool - // USO means the kernel additionally agreed to TUN_F_USO4|USO6, so - // WriteGSO with GSOProtoUDP is safe. Linux ≥ 6.2. - USO bool + //none yet! } // Queue is a readable/writable Poll queue. One Queue is driven by a single @@ -31,62 +26,16 @@ type Capabilities struct { type Queue interface { io.Closer - // Read returns one or more packets. The returned Packet.Bytes slices - // are borrowed from the Queue's internal buffer and are only valid - // until the next Read or Close on this Queue - callers must encrypt - // or copy each slice before the next call. - Read() ([]Packet, error) + // Read will read at least 1 packet from the tun (up to len(p)) + // mem will be used to provide the backing for each of p[n].Bytes + // Returns the number of packets actually read, or error + Read(p []wire.TunPacket, mem []byte) (int, error) // Write emits a single packet on the plaintext (outside→inside) - // delivery path. Not safe for concurrent Writes. + // delivery path. Write(p []byte) (int, error) // Capabilities returns the Queue's negotiated offload capabilities, // or the zero value when q does not advertise any. Capabilities() Capabilities } - -// Packet is the unit Queue.Read returns. Bytes points into the queue's -// internal buffer and is only valid until the next Read or Close on the -// queue that produced it. GSO is the zero value for an already-segmented -// IP datagram; when non-zero it describes a kernel-supplied TSO/USO -// superpacket the caller must segment before consuming. -type Packet struct { - Bytes []byte - GSO GSOInfo -} - -// GSOInfo describes a kernel-supplied superpacket sitting in Packet.Bytes. -// The zero value means "not a superpacket" — Bytes is one regular IP -// datagram and no segmentation is required. -type GSOInfo struct { - // Size is the GSO segment size: max payload bytes per segment - // (== TCP MSS for TSO, == UDP payload chunk for USO). Zero means - // not a superpacket. - Size uint16 - // HdrLen is the total L3+L4 header length within Bytes (already - // corrected via correctHdrLen, so safe to slice on). - HdrLen uint16 - // CsumStart is the L4 header offset inside Bytes (== L3 header - // length). - CsumStart uint16 - // Proto picks the L4 protocol (TCP or UDP) so the segmenter knows - // which checksum/header layout to apply. - Proto GSOProto -} - -// IsSuperpacket reports whether g describes a multi-segment GSO/USO -// superpacket that needs segmentation before its bytes can be encrypted -// and sent on the wire. -func (g GSOInfo) IsSuperpacket() bool { return g.Size > 0 } - -// GSOProto selects the L4 protocol for a GSO superpacket. Determines which -// VIRTIO_NET_HDR_GSO_* type the writer stamps and which checksum offset -// inside the transport header virtio NEEDS_CSUM expects. -type GSOProto uint8 - -const ( - GSOProtoNone GSOProto = iota - GSOProtoTCP - GSOProtoUDP -) diff --git a/overlay/tio/tio_poll_linux.go b/overlay/tio/tio_poll_linux.go index a5c17542..602c7606 100644 --- a/overlay/tio/tio_poll_linux.go +++ b/overlay/tio/tio_poll_linux.go @@ -6,6 +6,7 @@ import ( "sync" "sync/atomic" + "github.com/slackhq/nebula/wire" "golang.org/x/sys/unix" ) @@ -16,9 +17,6 @@ type Poll struct { writePoll [2]unix.PollFd writeLock sync.Mutex closed atomic.Bool - - readBuf []byte - batchRet [1]Packet } func newPoll(fd int, shutdownFd int) (*Poll, error) { @@ -28,8 +26,7 @@ func newPoll(fd int, shutdownFd int) (*Poll, error) { } out := &Poll{ - fd: fd, - readBuf: make([]byte, 65535), + fd: fd, readPoll: [2]unix.PollFd{ {Fd: int32(fd), Events: unix.POLLIN}, {Fd: int32(shutdownFd), Events: unix.POLLIN}, @@ -97,13 +94,17 @@ func (t *Poll) blockOnWrite() error { return nil } -func (t *Poll) Read() ([]Packet, error) { - n, err := t.readOne(t.readBuf) - if err != nil { - return nil, err +func (t *Poll) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) == 0 { + return 0, nil //todo should this be an err? } - t.batchRet[0] = Packet{Bytes: t.readBuf[:n]} - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.readOne(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[:n] + return 1, nil } func (t *Poll) readOne(to []byte) (int, error) { @@ -163,5 +164,5 @@ func (t *Poll) Close() error { } func (t *Poll) Capabilities() Capabilities { - return Capabilities{TSO: false, USO: false} + return Capabilities{} } diff --git a/overlay/tun_android.go b/overlay/tun_android.go index d3664298..c26033c6 100644 --- a/overlay/tun_android.go +++ b/overlay/tun_android.go @@ -16,6 +16,7 @@ import ( "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/util" + "github.com/slackhq/nebula/wire" ) type tun struct { @@ -25,18 +26,19 @@ type tun struct { Routes atomic.Pointer[[]Route] routeTree atomic.Pointer[bart.Table[routing.Gateways]] l *slog.Logger - - readBuf []byte - batchRet [1]tio.Packet } -func (t *tun) Read() ([]tio.Packet, error) { - n, err := t.rwc.Read(t.readBuf) - if err != nil { - return nil, err +func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) == 0 { + return 0, nil //todo should this be an err? } - t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.rwc.Read(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[:n] + return 1, nil } func (t *tun) Write(p []byte) (int, error) { @@ -57,7 +59,6 @@ func newTunFromFd(c *config.C, l *slog.Logger, deviceFd int, vpnNetworks []netip fd: deviceFd, vpnNetworks: vpnNetworks, l: l, - readBuf: make([]byte, defaultBatchBufSize), } err := t.reload(c, true) diff --git a/overlay/tun_darwin.go b/overlay/tun_darwin.go index adf11bdf..3a4fa183 100644 --- a/overlay/tun_darwin.go +++ b/overlay/tun_darwin.go @@ -19,6 +19,7 @@ import ( "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/util" + "github.com/slackhq/nebula/wire" netroute "golang.org/x/net/route" "golang.org/x/sys/unix" ) @@ -35,9 +36,6 @@ type tun struct { // cache out buffer since we need to prepend 4 bytes for tun metadata out []byte - - readBuf []byte - batchRet [1]tio.Packet } type ifReq struct { @@ -133,7 +131,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*t vpnNetworks: vpnNetworks, DefaultMTU: c.GetInt("tun.mtu", DefaultMTU), l: l, - readBuf: make([]byte, defaultBatchBufSize), } err = t.reload(c, true) @@ -507,22 +504,17 @@ func delRoute(prefix netip.Prefix, gateway netroute.Addr) error { return nil } -func (t *tun) readOne(to []byte) (int, error) { - buf := make([]byte, len(to)+4) - - n, err := t.rwc.Read(buf) - - copy(to, buf[4:]) - return n - 4, err -} - -func (t *tun) Read() ([]tio.Packet, error) { - n, err := t.readOne(t.readBuf) - if err != nil { - return nil, err +func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) <= 4 { + return 0, nil //todo should this be an err? } - t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.rwc.Read(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[4:n] + return 1, nil } // Write is only valid for single threaded use diff --git a/overlay/tun_disabled.go b/overlay/tun_disabled.go index 524b6a0b..d96f912b 100644 --- a/overlay/tun_disabled.go +++ b/overlay/tun_disabled.go @@ -12,6 +12,7 @@ import ( "github.com/slackhq/nebula/iputil" "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" + "github.com/slackhq/nebula/wire" ) type disabledTun struct { @@ -21,47 +22,8 @@ type disabledTun struct { // Track these metrics since we don't have the tun device to do it for us tx metrics.Counter rx metrics.Counter - l *slog.Logger numReaders int -} - -// disabledQueue is one tio.Queue view onto a shared disabledTun. Each queue -// owns a private batchRet so concurrent Read calls from different reader -// goroutines do not race on the returned slice. -type disabledQueue struct { - parent *disabledTun - batchRet [1]tio.Packet -} - -func (q *disabledQueue) Capabilities() tio.Capabilities { - return tio.Capabilities{} -} - -func (q *disabledQueue) Read() ([]tio.Packet, error) { - r, ok := <-q.parent.read - if !ok { - return nil, io.EOF - } - - q.parent.tx.Inc(1) - if q.parent.l.Enabled(context.Background(), slog.LevelDebug) { - q.parent.l.Debug("Write payload", "raw", prettyPacket(r)) - } - - q.batchRet[0] = tio.Packet{Bytes: r} - return q.batchRet[:], nil -} - -// Write on a queue forwards to the underlying disabledTun. All queues share -// one ICMP-handling/log path so this is a thin pass-through. -func (q *disabledQueue) Write(b []byte) (int, error) { - return q.parent.Write(b) -} - -// Close on a queue is a no-op. The shared channel and metrics are owned by -// the disabledTun; Close on the device tears them down once for everybody. -func (q *disabledQueue) Close() error { - return nil + l *slog.Logger } func newDisabledTun(vpnNetworks []netip.Prefix, queueLen int, metricsEnabled bool, l *slog.Logger) *disabledTun { @@ -99,6 +61,37 @@ func (*disabledTun) Name() string { return "disabled" } +func (t *disabledTun) readOne(b []byte) (int, error) { + r, ok := <-t.read + if !ok { + return 0, io.EOF + } + + if len(r) > len(b) { + return 0, fmt.Errorf("packet larger than mtu: %d > %d bytes", len(r), len(b)) + } + + t.tx.Inc(1) + if t.l.Enabled(context.Background(), slog.LevelDebug) { + t.l.Debug("Write payload", "raw", prettyPacket(r)) + } + + return copy(b, r), nil +} + +func (t *disabledTun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) == 0 { + return 0, nil //todo should this be an err? + } + p[0].Meta = struct{}{} + n, err := t.readOne(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[:n] + return 1, nil +} + func (t *disabledTun) handleICMPEchoRequest(b []byte) bool { out := make([]byte, len(b)) out = iputil.CreateICMPEchoResponse(b, out) @@ -142,11 +135,15 @@ func (t *disabledTun) NewMultiQueueReader() error { func (t *disabledTun) Readers() []tio.Queue { out := make([]tio.Queue, t.numReaders) for i := range t.numReaders { - out[i] = &disabledQueue{parent: t} + out[i] = t } return out } +func (t *disabledTun) Capabilities() tio.Capabilities { + return tio.Capabilities{} +} + func (t *disabledTun) Close() error { if t.read != nil { close(t.read) diff --git a/overlay/tun_freebsd.go b/overlay/tun_freebsd.go index f22d15ac..b67cde40 100644 --- a/overlay/tun_freebsd.go +++ b/overlay/tun_freebsd.go @@ -17,6 +17,7 @@ import ( "unsafe" "github.com/gaissmai/bart" + "github.com/slackhq/nebula/wire" "github.com/slackhq/nebula/config" "github.com/slackhq/nebula/overlay/tio" @@ -102,9 +103,6 @@ type tun struct { readPoll [2]unix.PollFd writePoll [2]unix.PollFd closed atomic.Bool - - readBuf []byte - batchRet [1]tio.Packet } // blockOnRead waits until the tun fd is readable or shutdown has been signaled. @@ -159,13 +157,17 @@ func (t *tun) blockOnWrite() error { return nil } -func (t *tun) Read() ([]tio.Packet, error) { - n, err := t.readOne(t.readBuf) - if err != nil { - return nil, err +func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) == 0 { + return 0, nil //todo should this be an err? } - t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.readOne(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[4:n] + return 1, nil } func (t *tun) readOne(to []byte) (int, error) { @@ -386,7 +388,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*t MTU: c.GetInt("tun.mtu", DefaultMTU), l: l, fd: fd, - readBuf: make([]byte, defaultBatchBufSize), shutdownR: shutdownR, shutdownW: shutdownW, readPoll: [2]unix.PollFd{ diff --git a/overlay/tun_ios.go b/overlay/tun_ios.go index 79edd90b..7c6a5caf 100644 --- a/overlay/tun_ios.go +++ b/overlay/tun_ios.go @@ -19,6 +19,7 @@ import ( "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/util" + "github.com/slackhq/nebula/wire" ) type tun struct { @@ -27,18 +28,19 @@ type tun struct { Routes atomic.Pointer[[]Route] routeTree atomic.Pointer[bart.Table[routing.Gateways]] l *slog.Logger - - readBuf []byte - batchRet [1]tio.Packet } -func (t *tun) Read() ([]tio.Packet, error) { - n, err := t.rwc.Read(t.readBuf) - if err != nil { - return nil, err +func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) <= 4 { + return 0, nil //todo should this be an err? } - t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.rwc.Read(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[4:n] + return 1, nil } func (t *tun) Write(p []byte) (int, error) { @@ -59,7 +61,6 @@ func newTunFromFd(c *config.C, l *slog.Logger, deviceFd int, vpnNetworks []netip vpnNetworks: vpnNetworks, rwc: &tunReadCloser{f: file}, l: l, - readBuf: make([]byte, defaultBatchBufSize), } err := t.reload(c, true) @@ -118,18 +119,9 @@ type tunReadCloser struct { wBuf []byte } +// Read returns a packet with the BSD 4-byte header, watch out! func (tr *tunReadCloser) Read(to []byte) (int, error) { - tr.rMu.Lock() - defer tr.rMu.Unlock() - - if cap(tr.rBuf) < len(to)+4 { - tr.rBuf = make([]byte, len(to)+4) - } - tr.rBuf = tr.rBuf[:len(to)+4] - - n, err := tr.f.Read(tr.rBuf) - copy(to, tr.rBuf[4:]) - return n - 4, err + return tr.f.Read(to) } func (tr *tunReadCloser) Write(from []byte) (int, error) { diff --git a/overlay/tun_netbsd.go b/overlay/tun_netbsd.go index 8b373800..98d2f742 100644 --- a/overlay/tun_netbsd.go +++ b/overlay/tun_netbsd.go @@ -19,6 +19,7 @@ import ( "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/util" + "github.com/slackhq/nebula/wire" netroute "golang.org/x/net/route" "golang.org/x/sys/unix" ) @@ -66,18 +67,19 @@ type tun struct { l *slog.Logger f *os.File fd int - - readBuf []byte - batchRet [1]tio.Packet } -func (t *tun) Read() ([]tio.Packet, error) { - n, err := t.readOne(t.readBuf) - if err != nil { - return nil, err +func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) == 0 { + return 0, nil //todo should this be an err? } - t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.readOne(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[:n] + return 1, nil } func (t *tun) Readers() []tio.Queue { @@ -122,7 +124,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*t vpnNetworks: vpnNetworks, MTU: c.GetInt("tun.mtu", DefaultMTU), l: l, - readBuf: make([]byte, defaultBatchBufSize), } err = t.reload(c, true) diff --git a/overlay/tun_openbsd.go b/overlay/tun_openbsd.go index 80005b14..9c0df29b 100644 --- a/overlay/tun_openbsd.go +++ b/overlay/tun_openbsd.go @@ -19,6 +19,7 @@ import ( "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/util" + "github.com/slackhq/nebula/wire" netroute "golang.org/x/net/route" "golang.org/x/sys/unix" ) @@ -59,18 +60,19 @@ type tun struct { fd int // cache out buffer since we need to prepend 4 bytes for tun metadata out []byte - - readBuf []byte - batchRet [1]tio.Packet } -func (t *tun) Read() ([]tio.Packet, error) { - n, err := t.readOne(t.readBuf) - if err != nil { - return nil, err +func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) <= 4 { + return 0, nil //todo should this be an err? } - t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.f.Read(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[4:n] + return 1, nil } var deviceNameRE = regexp.MustCompile(`^tun[0-9]+$`) @@ -107,7 +109,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*t vpnNetworks: vpnNetworks, MTU: c.GetInt("tun.mtu", DefaultMTU), l: l, - readBuf: make([]byte, defaultBatchBufSize), } err = t.reload(c, true) @@ -137,15 +138,6 @@ func (t *tun) Close() error { return nil } -func (t *tun) readOne(to []byte) (int, error) { - buf := make([]byte, len(to)+4) - - n, err := t.f.Read(buf) - - copy(to, buf[4:]) - return n - 4, err -} - // Write is only valid for single threaded use func (t *tun) Write(from []byte) (int, error) { buf := t.out diff --git a/overlay/tun_tester.go b/overlay/tun_tester.go index fe33703e..37bea32c 100644 --- a/overlay/tun_tester.go +++ b/overlay/tun_tester.go @@ -17,6 +17,7 @@ import ( "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/udp" + "github.com/slackhq/nebula/wire" ) type TestTun struct { @@ -29,8 +30,6 @@ type TestTun struct { closed atomic.Bool rxPackets chan []byte // Packets to receive into nebula TxPackets chan []byte // Packets transmitted outside by nebula - - batchRet [1]tio.Packet } func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*TestTun, error) { @@ -51,9 +50,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*T l: l, rxPackets: make(chan []byte, 10), TxPackets: make(chan []byte, 10), - batchRet: [1]tio.Packet{ - tio.Packet{Bytes: make([]byte, udp.MTU)}, - }, }, nil } @@ -168,14 +164,17 @@ func (t *TestTun) Close() error { return nil } -func (t *TestTun) Read() ([]tio.Packet, error) { - t.batchRet[0].Bytes = t.batchRet[0].Bytes[:udp.MTU] - n, err := t.read(t.batchRet[0].Bytes) - if err != nil { - return nil, err +func (t *TestTun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) == 0 { + return 0, nil //todo should this be an err? } - t.batchRet[0].Bytes = t.batchRet[0].Bytes[:n] - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.read(mem) + if err != nil { + return 0, err + } + p[0].Bytes = mem[:n] + return 1, nil } func (t *TestTun) read(b []byte) (int, error) { diff --git a/overlay/tun_windows.go b/overlay/tun_windows.go index 2ab83821..41a32bd6 100644 --- a/overlay/tun_windows.go +++ b/overlay/tun_windows.go @@ -21,6 +21,7 @@ import ( "github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/util" "github.com/slackhq/nebula/wintun" + "github.com/slackhq/nebula/wire" "golang.org/x/sys/windows" "golang.zx2c4.com/wireguard/windows/tunnel/winipcfg" ) @@ -45,18 +46,19 @@ type winTun struct { l *slog.Logger tun *wintun.NativeTun - - readBuf []byte - batchRet [1]tio.Packet } -func (t *winTun) Read() ([]tio.Packet, error) { - n, err := t.tun.Read(t.readBuf, 0) - if err != nil { - return nil, err +func (t *winTun) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) == 0 { + return 0, nil //todo should this be an err? } - t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} - return t.batchRet[:], nil + p[0].Meta = struct{}{} + n, err := t.tun.Read(mem, 0) + if err != nil { + return 0, err + } + p[0].Bytes = mem[:n] + return 1, nil } func newTunFromFd(_ *config.C, _ *slog.Logger, _ int, _ []netip.Prefix) (Device, error) { @@ -81,7 +83,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*w } t := &winTun{ - readBuf: make([]byte, defaultBatchBufSize), Device: deviceName, vpnNetworks: vpnNetworks, MTU: c.GetInt("tun.mtu", DefaultMTU), diff --git a/overlay/user.go b/overlay/user.go index 1ffd03b7..93938198 100644 --- a/overlay/user.go +++ b/overlay/user.go @@ -9,6 +9,7 @@ import ( "github.com/slackhq/nebula/config" "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" + "github.com/slackhq/nebula/wire" ) func NewUserDeviceFromConfig(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, routines int) (Device, error) { @@ -37,25 +38,23 @@ type UserDevice struct { inboundReader *io.PipeReader inboundWriter *io.PipeWriter - - readBuf []byte - batchRet [1]tio.Packet } func (d *UserDevice) Capabilities() tio.Capabilities { return tio.Capabilities{} } -func (d *UserDevice) Read() ([]tio.Packet, error) { - if d.readBuf == nil { - d.readBuf = make([]byte, defaultBatchBufSize) +func (d *UserDevice) Read(p []wire.TunPacket, mem []byte) (int, error) { + if len(p) == 0 || len(mem) == 0 { + return 0, nil //todo should this be an err? } - n, err := d.outboundReader.Read(d.readBuf) + p[0].Meta = struct{}{} + n, err := d.outboundReader.Read(mem) if err != nil { - return nil, err + return 0, err } - d.batchRet[0] = tio.Packet{Bytes: d.readBuf[:n]} - return d.batchRet[:], nil + p[0].Bytes = mem[:n] + return 1, nil } func (d *UserDevice) Activate() error { diff --git a/wire/wire.go b/wire/wire.go new file mode 100644 index 00000000..acc20dd8 --- /dev/null +++ b/wire/wire.go @@ -0,0 +1,11 @@ +package wire + +// TunPacket is the unit a read from a tun device returns. +// On supported platforms, it may be a superpacket, but a single TunPacket will never have more than one destination. +type TunPacket struct { + // Bytes contains the actual packet + Bytes []byte + // Meta contains other information to help process the packet correctly, such as offsets for segmentation offloads + // Fields in Meta should be as portable/platform-agnostic as possible. + Meta struct{} +}