change Queue.Read signature

This commit is contained in:
JackDoan
2026-05-14 11:42:59 -05:00
parent 1b59636028
commit c61de54ec3
14 changed files with 181 additions and 242 deletions

View File

@@ -13,6 +13,7 @@ import (
"github.com/gaissmai/bart" "github.com/gaissmai/bart"
"github.com/rcrowley/go-metrics" "github.com/rcrowley/go-metrics"
"github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/overlay/tio"
"github.com/slackhq/nebula/wire"
"github.com/slackhq/nebula/config" "github.com/slackhq/nebula/config"
"github.com/slackhq/nebula/firewall" "github.com/slackhq/nebula/firewall"
@@ -326,7 +327,10 @@ func (f *Interface) listenOut(i int) {
f.l.Debug("underlay reader is done", "reader", i) f.l.Debug("underlay reader is done", "reader", i)
} }
func (f *Interface) listenIn(reader tio.Queue, i int) { func (f *Interface) listenIn(reader tio.Queue, q int) {
packetMem := make([]byte, mtu+16) //MTU + some leading slack space for platforms that return "bonus info"
// TODO get the amount of bonus info from the reader
packets := make([]wire.TunPacket, 1)
out := make([]byte, mtu) out := make([]byte, mtu)
fwPacket := &firewall.Packet{} fwPacket := &firewall.Packet{}
nb := make([]byte, 12, 12) nb := make([]byte, 12, 12)
@@ -334,22 +338,21 @@ func (f *Interface) listenIn(reader tio.Queue, i int) {
conntrackCache := firewall.NewConntrackCacheTicker(f.ctx, f.l, f.conntrackCacheTimeout) conntrackCache := firewall.NewConntrackCacheTicker(f.ctx, f.l, f.conntrackCacheTimeout)
for { for {
pkts, err := reader.Read() n, err := reader.Read(packets, packetMem)
if err != nil { if err != nil {
if !f.closed.Load() { if !f.closed.Load() {
f.l.Error("Error while reading outbound packet, closing", "error", err, "reader", i) f.l.Error("Error while reading outbound packet, closing", "error", err, "reader", q)
f.onFatal(err) f.onFatal(err)
} }
break break
} }
ctCache := conntrackCache.Get() ctCache := conntrackCache.Get()
for _, pkt := range pkts { for i := range n {
f.consumeInsidePacket(pkt.Bytes, fwPacket, nb, out, i, ctCache) f.consumeInsidePacket(packets[i].Bytes, fwPacket, nb, out, q, ctCache)
}
} }
} f.l.Debug("overlay reader is done", "reader", q)
f.l.Debug("overlay reader is done", "reader", i)
} }
func (f *Interface) RegisterConfigChangeCallbacks(c *config.C) { func (f *Interface) RegisterConfigChangeCallbacks(c *config.C) {

View File

@@ -2,6 +2,8 @@ package tio
import ( import (
"io" "io"
"github.com/slackhq/nebula/wire"
) )
// QueueSet holds one or many Queue objects and helps close them in an orderly way. // QueueSet holds one or many Queue objects and helps close them in an orderly way.
@@ -13,17 +15,10 @@ type QueueSet interface {
Add(fd int) error Add(fd int) error
} }
// Capabilities advertises which kernel offload features a Queue // Capabilities advertises which kernel offload features a Queue successfully negotiated.
// successfully negotiated. Callers consult this to decide which coalescers // Callers consult this to decide which coalescers to wire onto the write path.
// to wire onto the write path — a Queue without TSO can't usefully accept a
// TCPCoalescer, and a Queue without USO can't accept a UDPCoalescer.
type Capabilities struct { type Capabilities struct {
// TSO means the FD was opened with IFF_VNET_HDR and the kernel agreed //none yet!
// to TUN_F_TSO4|TSO6 — i.e. WriteGSO with GSOProtoTCP is safe.
TSO bool
// USO means the kernel additionally agreed to TUN_F_USO4|USO6, so
// WriteGSO with GSOProtoUDP is safe. Linux ≥ 6.2.
USO bool
} }
// Queue is a readable/writable Poll queue. One Queue is driven by a single // Queue is a readable/writable Poll queue. One Queue is driven by a single
@@ -31,62 +26,16 @@ type Capabilities struct {
type Queue interface { type Queue interface {
io.Closer io.Closer
// Read returns one or more packets. The returned Packet.Bytes slices // Read will read at least 1 packet from the tun (up to len(p))
// are borrowed from the Queue's internal buffer and are only valid // mem will be used to provide the backing for each of p[n].Bytes
// until the next Read or Close on this Queue - callers must encrypt // Returns the number of packets actually read, or error
// or copy each slice before the next call. Read(p []wire.TunPacket, mem []byte) (int, error)
Read() ([]Packet, error)
// Write emits a single packet on the plaintext (outside→inside) // Write emits a single packet on the plaintext (outside→inside)
// delivery path. Not safe for concurrent Writes. // delivery path.
Write(p []byte) (int, error) Write(p []byte) (int, error)
// Capabilities returns the Queue's negotiated offload capabilities, // Capabilities returns the Queue's negotiated offload capabilities,
// or the zero value when q does not advertise any. // or the zero value when q does not advertise any.
Capabilities() Capabilities Capabilities() Capabilities
} }
// Packet is the unit Queue.Read returns. Bytes points into the queue's
// internal buffer and is only valid until the next Read or Close on the
// queue that produced it. GSO is the zero value for an already-segmented
// IP datagram; when non-zero it describes a kernel-supplied TSO/USO
// superpacket the caller must segment before consuming.
type Packet struct {
Bytes []byte
GSO GSOInfo
}
// GSOInfo describes a kernel-supplied superpacket sitting in Packet.Bytes.
// The zero value means "not a superpacket" — Bytes is one regular IP
// datagram and no segmentation is required.
type GSOInfo struct {
// Size is the GSO segment size: max payload bytes per segment
// (== TCP MSS for TSO, == UDP payload chunk for USO). Zero means
// not a superpacket.
Size uint16
// HdrLen is the total L3+L4 header length within Bytes (already
// corrected via correctHdrLen, so safe to slice on).
HdrLen uint16
// CsumStart is the L4 header offset inside Bytes (== L3 header
// length).
CsumStart uint16
// Proto picks the L4 protocol (TCP or UDP) so the segmenter knows
// which checksum/header layout to apply.
Proto GSOProto
}
// IsSuperpacket reports whether g describes a multi-segment GSO/USO
// superpacket that needs segmentation before its bytes can be encrypted
// and sent on the wire.
func (g GSOInfo) IsSuperpacket() bool { return g.Size > 0 }
// GSOProto selects the L4 protocol for a GSO superpacket. Determines which
// VIRTIO_NET_HDR_GSO_* type the writer stamps and which checksum offset
// inside the transport header virtio NEEDS_CSUM expects.
type GSOProto uint8
const (
GSOProtoNone GSOProto = iota
GSOProtoTCP
GSOProtoUDP
)

View File

@@ -6,6 +6,7 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/slackhq/nebula/wire"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
@@ -16,9 +17,6 @@ type Poll struct {
writePoll [2]unix.PollFd writePoll [2]unix.PollFd
writeLock sync.Mutex writeLock sync.Mutex
closed atomic.Bool closed atomic.Bool
readBuf []byte
batchRet [1]Packet
} }
func newPoll(fd int, shutdownFd int) (*Poll, error) { func newPoll(fd int, shutdownFd int) (*Poll, error) {
@@ -29,7 +27,6 @@ func newPoll(fd int, shutdownFd int) (*Poll, error) {
out := &Poll{ out := &Poll{
fd: fd, fd: fd,
readBuf: make([]byte, 65535),
readPoll: [2]unix.PollFd{ readPoll: [2]unix.PollFd{
{Fd: int32(fd), Events: unix.POLLIN}, {Fd: int32(fd), Events: unix.POLLIN},
{Fd: int32(shutdownFd), Events: unix.POLLIN}, {Fd: int32(shutdownFd), Events: unix.POLLIN},
@@ -97,13 +94,17 @@ func (t *Poll) blockOnWrite() error {
return nil return nil
} }
func (t *Poll) Read() ([]Packet, error) { func (t *Poll) Read(p []wire.TunPacket, mem []byte) (int, error) {
n, err := t.readOne(t.readBuf) if len(p) == 0 || len(mem) == 0 {
if err != nil { return 0, nil //todo should this be an err?
return nil, err
} }
t.batchRet[0] = Packet{Bytes: t.readBuf[:n]} p[0].Meta = struct{}{}
return t.batchRet[:], nil n, err := t.readOne(mem)
if err != nil {
return 0, err
}
p[0].Bytes = mem[:n]
return 1, nil
} }
func (t *Poll) readOne(to []byte) (int, error) { func (t *Poll) readOne(to []byte) (int, error) {
@@ -163,5 +164,5 @@ func (t *Poll) Close() error {
} }
func (t *Poll) Capabilities() Capabilities { func (t *Poll) Capabilities() Capabilities {
return Capabilities{TSO: false, USO: false} return Capabilities{}
} }

View File

@@ -16,6 +16,7 @@ import (
"github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/overlay/tio"
"github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/routing"
"github.com/slackhq/nebula/util" "github.com/slackhq/nebula/util"
"github.com/slackhq/nebula/wire"
) )
type tun struct { type tun struct {
@@ -25,18 +26,19 @@ type tun struct {
Routes atomic.Pointer[[]Route] Routes atomic.Pointer[[]Route]
routeTree atomic.Pointer[bart.Table[routing.Gateways]] routeTree atomic.Pointer[bart.Table[routing.Gateways]]
l *slog.Logger l *slog.Logger
readBuf []byte
batchRet [1]tio.Packet
} }
func (t *tun) Read() ([]tio.Packet, error) { func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) {
n, err := t.rwc.Read(t.readBuf) if len(p) == 0 || len(mem) == 0 {
if err != nil { return 0, nil //todo should this be an err?
return nil, err
} }
t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} p[0].Meta = struct{}{}
return t.batchRet[:], nil n, err := t.rwc.Read(mem)
if err != nil {
return 0, err
}
p[0].Bytes = mem[:n]
return 1, nil
} }
func (t *tun) Write(p []byte) (int, error) { func (t *tun) Write(p []byte) (int, error) {
@@ -57,7 +59,6 @@ func newTunFromFd(c *config.C, l *slog.Logger, deviceFd int, vpnNetworks []netip
fd: deviceFd, fd: deviceFd,
vpnNetworks: vpnNetworks, vpnNetworks: vpnNetworks,
l: l, l: l,
readBuf: make([]byte, defaultBatchBufSize),
} }
err := t.reload(c, true) err := t.reload(c, true)

View File

@@ -19,6 +19,7 @@ import (
"github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/overlay/tio"
"github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/routing"
"github.com/slackhq/nebula/util" "github.com/slackhq/nebula/util"
"github.com/slackhq/nebula/wire"
netroute "golang.org/x/net/route" netroute "golang.org/x/net/route"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
@@ -35,9 +36,6 @@ type tun struct {
// cache out buffer since we need to prepend 4 bytes for tun metadata // cache out buffer since we need to prepend 4 bytes for tun metadata
out []byte out []byte
readBuf []byte
batchRet [1]tio.Packet
} }
type ifReq struct { type ifReq struct {
@@ -133,7 +131,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*t
vpnNetworks: vpnNetworks, vpnNetworks: vpnNetworks,
DefaultMTU: c.GetInt("tun.mtu", DefaultMTU), DefaultMTU: c.GetInt("tun.mtu", DefaultMTU),
l: l, l: l,
readBuf: make([]byte, defaultBatchBufSize),
} }
err = t.reload(c, true) err = t.reload(c, true)
@@ -507,22 +504,17 @@ func delRoute(prefix netip.Prefix, gateway netroute.Addr) error {
return nil return nil
} }
func (t *tun) readOne(to []byte) (int, error) { func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) {
buf := make([]byte, len(to)+4) if len(p) == 0 || len(mem) <= 4 {
return 0, nil //todo should this be an err?
n, err := t.rwc.Read(buf)
copy(to, buf[4:])
return n - 4, err
}
func (t *tun) Read() ([]tio.Packet, error) {
n, err := t.readOne(t.readBuf)
if err != nil {
return nil, err
} }
t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} p[0].Meta = struct{}{}
return t.batchRet[:], nil n, err := t.rwc.Read(mem)
if err != nil {
return 0, err
}
p[0].Bytes = mem[4:n]
return 1, nil
} }
// Write is only valid for single threaded use // Write is only valid for single threaded use

View File

@@ -12,6 +12,7 @@ import (
"github.com/slackhq/nebula/iputil" "github.com/slackhq/nebula/iputil"
"github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/overlay/tio"
"github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/routing"
"github.com/slackhq/nebula/wire"
) )
type disabledTun struct { type disabledTun struct {
@@ -21,47 +22,8 @@ type disabledTun struct {
// Track these metrics since we don't have the tun device to do it for us // Track these metrics since we don't have the tun device to do it for us
tx metrics.Counter tx metrics.Counter
rx metrics.Counter rx metrics.Counter
l *slog.Logger
numReaders int numReaders int
} l *slog.Logger
// disabledQueue is one tio.Queue view onto a shared disabledTun. Each queue
// owns a private batchRet so concurrent Read calls from different reader
// goroutines do not race on the returned slice.
type disabledQueue struct {
parent *disabledTun
batchRet [1]tio.Packet
}
func (q *disabledQueue) Capabilities() tio.Capabilities {
return tio.Capabilities{}
}
func (q *disabledQueue) Read() ([]tio.Packet, error) {
r, ok := <-q.parent.read
if !ok {
return nil, io.EOF
}
q.parent.tx.Inc(1)
if q.parent.l.Enabled(context.Background(), slog.LevelDebug) {
q.parent.l.Debug("Write payload", "raw", prettyPacket(r))
}
q.batchRet[0] = tio.Packet{Bytes: r}
return q.batchRet[:], nil
}
// Write on a queue forwards to the underlying disabledTun. All queues share
// one ICMP-handling/log path so this is a thin pass-through.
func (q *disabledQueue) Write(b []byte) (int, error) {
return q.parent.Write(b)
}
// Close on a queue is a no-op. The shared channel and metrics are owned by
// the disabledTun; Close on the device tears them down once for everybody.
func (q *disabledQueue) Close() error {
return nil
} }
func newDisabledTun(vpnNetworks []netip.Prefix, queueLen int, metricsEnabled bool, l *slog.Logger) *disabledTun { func newDisabledTun(vpnNetworks []netip.Prefix, queueLen int, metricsEnabled bool, l *slog.Logger) *disabledTun {
@@ -99,6 +61,37 @@ func (*disabledTun) Name() string {
return "disabled" return "disabled"
} }
func (t *disabledTun) readOne(b []byte) (int, error) {
r, ok := <-t.read
if !ok {
return 0, io.EOF
}
if len(r) > len(b) {
return 0, fmt.Errorf("packet larger than mtu: %d > %d bytes", len(r), len(b))
}
t.tx.Inc(1)
if t.l.Enabled(context.Background(), slog.LevelDebug) {
t.l.Debug("Write payload", "raw", prettyPacket(r))
}
return copy(b, r), nil
}
func (t *disabledTun) Read(p []wire.TunPacket, mem []byte) (int, error) {
if len(p) == 0 || len(mem) == 0 {
return 0, nil //todo should this be an err?
}
p[0].Meta = struct{}{}
n, err := t.readOne(mem)
if err != nil {
return 0, err
}
p[0].Bytes = mem[:n]
return 1, nil
}
func (t *disabledTun) handleICMPEchoRequest(b []byte) bool { func (t *disabledTun) handleICMPEchoRequest(b []byte) bool {
out := make([]byte, len(b)) out := make([]byte, len(b))
out = iputil.CreateICMPEchoResponse(b, out) out = iputil.CreateICMPEchoResponse(b, out)
@@ -142,11 +135,15 @@ func (t *disabledTun) NewMultiQueueReader() error {
func (t *disabledTun) Readers() []tio.Queue { func (t *disabledTun) Readers() []tio.Queue {
out := make([]tio.Queue, t.numReaders) out := make([]tio.Queue, t.numReaders)
for i := range t.numReaders { for i := range t.numReaders {
out[i] = &disabledQueue{parent: t} out[i] = t
} }
return out return out
} }
func (t *disabledTun) Capabilities() tio.Capabilities {
return tio.Capabilities{}
}
func (t *disabledTun) Close() error { func (t *disabledTun) Close() error {
if t.read != nil { if t.read != nil {
close(t.read) close(t.read)

View File

@@ -17,6 +17,7 @@ import (
"unsafe" "unsafe"
"github.com/gaissmai/bart" "github.com/gaissmai/bart"
"github.com/slackhq/nebula/wire"
"github.com/slackhq/nebula/config" "github.com/slackhq/nebula/config"
"github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/overlay/tio"
@@ -102,9 +103,6 @@ type tun struct {
readPoll [2]unix.PollFd readPoll [2]unix.PollFd
writePoll [2]unix.PollFd writePoll [2]unix.PollFd
closed atomic.Bool closed atomic.Bool
readBuf []byte
batchRet [1]tio.Packet
} }
// blockOnRead waits until the tun fd is readable or shutdown has been signaled. // blockOnRead waits until the tun fd is readable or shutdown has been signaled.
@@ -159,13 +157,17 @@ func (t *tun) blockOnWrite() error {
return nil return nil
} }
func (t *tun) Read() ([]tio.Packet, error) { func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) {
n, err := t.readOne(t.readBuf) if len(p) == 0 || len(mem) == 0 {
if err != nil { return 0, nil //todo should this be an err?
return nil, err
} }
t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} p[0].Meta = struct{}{}
return t.batchRet[:], nil n, err := t.readOne(mem)
if err != nil {
return 0, err
}
p[0].Bytes = mem[4:n]
return 1, nil
} }
func (t *tun) readOne(to []byte) (int, error) { func (t *tun) readOne(to []byte) (int, error) {
@@ -386,7 +388,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*t
MTU: c.GetInt("tun.mtu", DefaultMTU), MTU: c.GetInt("tun.mtu", DefaultMTU),
l: l, l: l,
fd: fd, fd: fd,
readBuf: make([]byte, defaultBatchBufSize),
shutdownR: shutdownR, shutdownR: shutdownR,
shutdownW: shutdownW, shutdownW: shutdownW,
readPoll: [2]unix.PollFd{ readPoll: [2]unix.PollFd{

View File

@@ -19,6 +19,7 @@ import (
"github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/overlay/tio"
"github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/routing"
"github.com/slackhq/nebula/util" "github.com/slackhq/nebula/util"
"github.com/slackhq/nebula/wire"
) )
type tun struct { type tun struct {
@@ -27,18 +28,19 @@ type tun struct {
Routes atomic.Pointer[[]Route] Routes atomic.Pointer[[]Route]
routeTree atomic.Pointer[bart.Table[routing.Gateways]] routeTree atomic.Pointer[bart.Table[routing.Gateways]]
l *slog.Logger l *slog.Logger
readBuf []byte
batchRet [1]tio.Packet
} }
func (t *tun) Read() ([]tio.Packet, error) { func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) {
n, err := t.rwc.Read(t.readBuf) if len(p) == 0 || len(mem) <= 4 {
if err != nil { return 0, nil //todo should this be an err?
return nil, err
} }
t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} p[0].Meta = struct{}{}
return t.batchRet[:], nil n, err := t.rwc.Read(mem)
if err != nil {
return 0, err
}
p[0].Bytes = mem[4:n]
return 1, nil
} }
func (t *tun) Write(p []byte) (int, error) { func (t *tun) Write(p []byte) (int, error) {
@@ -59,7 +61,6 @@ func newTunFromFd(c *config.C, l *slog.Logger, deviceFd int, vpnNetworks []netip
vpnNetworks: vpnNetworks, vpnNetworks: vpnNetworks,
rwc: &tunReadCloser{f: file}, rwc: &tunReadCloser{f: file},
l: l, l: l,
readBuf: make([]byte, defaultBatchBufSize),
} }
err := t.reload(c, true) err := t.reload(c, true)
@@ -118,18 +119,9 @@ type tunReadCloser struct {
wBuf []byte wBuf []byte
} }
// Read returns a packet with the BSD 4-byte header, watch out!
func (tr *tunReadCloser) Read(to []byte) (int, error) { func (tr *tunReadCloser) Read(to []byte) (int, error) {
tr.rMu.Lock() return tr.f.Read(to)
defer tr.rMu.Unlock()
if cap(tr.rBuf) < len(to)+4 {
tr.rBuf = make([]byte, len(to)+4)
}
tr.rBuf = tr.rBuf[:len(to)+4]
n, err := tr.f.Read(tr.rBuf)
copy(to, tr.rBuf[4:])
return n - 4, err
} }
func (tr *tunReadCloser) Write(from []byte) (int, error) { func (tr *tunReadCloser) Write(from []byte) (int, error) {

View File

@@ -19,6 +19,7 @@ import (
"github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/overlay/tio"
"github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/routing"
"github.com/slackhq/nebula/util" "github.com/slackhq/nebula/util"
"github.com/slackhq/nebula/wire"
netroute "golang.org/x/net/route" netroute "golang.org/x/net/route"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
@@ -66,18 +67,19 @@ type tun struct {
l *slog.Logger l *slog.Logger
f *os.File f *os.File
fd int fd int
readBuf []byte
batchRet [1]tio.Packet
} }
func (t *tun) Read() ([]tio.Packet, error) { func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) {
n, err := t.readOne(t.readBuf) if len(p) == 0 || len(mem) == 0 {
if err != nil { return 0, nil //todo should this be an err?
return nil, err
} }
t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} p[0].Meta = struct{}{}
return t.batchRet[:], nil n, err := t.readOne(mem)
if err != nil {
return 0, err
}
p[0].Bytes = mem[:n]
return 1, nil
} }
func (t *tun) Readers() []tio.Queue { func (t *tun) Readers() []tio.Queue {
@@ -122,7 +124,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*t
vpnNetworks: vpnNetworks, vpnNetworks: vpnNetworks,
MTU: c.GetInt("tun.mtu", DefaultMTU), MTU: c.GetInt("tun.mtu", DefaultMTU),
l: l, l: l,
readBuf: make([]byte, defaultBatchBufSize),
} }
err = t.reload(c, true) err = t.reload(c, true)

View File

@@ -19,6 +19,7 @@ import (
"github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/overlay/tio"
"github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/routing"
"github.com/slackhq/nebula/util" "github.com/slackhq/nebula/util"
"github.com/slackhq/nebula/wire"
netroute "golang.org/x/net/route" netroute "golang.org/x/net/route"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
@@ -59,18 +60,19 @@ type tun struct {
fd int fd int
// cache out buffer since we need to prepend 4 bytes for tun metadata // cache out buffer since we need to prepend 4 bytes for tun metadata
out []byte out []byte
readBuf []byte
batchRet [1]tio.Packet
} }
func (t *tun) Read() ([]tio.Packet, error) { func (t *tun) Read(p []wire.TunPacket, mem []byte) (int, error) {
n, err := t.readOne(t.readBuf) if len(p) == 0 || len(mem) <= 4 {
if err != nil { return 0, nil //todo should this be an err?
return nil, err
} }
t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} p[0].Meta = struct{}{}
return t.batchRet[:], nil n, err := t.f.Read(mem)
if err != nil {
return 0, err
}
p[0].Bytes = mem[4:n]
return 1, nil
} }
var deviceNameRE = regexp.MustCompile(`^tun[0-9]+$`) var deviceNameRE = regexp.MustCompile(`^tun[0-9]+$`)
@@ -107,7 +109,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*t
vpnNetworks: vpnNetworks, vpnNetworks: vpnNetworks,
MTU: c.GetInt("tun.mtu", DefaultMTU), MTU: c.GetInt("tun.mtu", DefaultMTU),
l: l, l: l,
readBuf: make([]byte, defaultBatchBufSize),
} }
err = t.reload(c, true) err = t.reload(c, true)
@@ -137,15 +138,6 @@ func (t *tun) Close() error {
return nil return nil
} }
func (t *tun) readOne(to []byte) (int, error) {
buf := make([]byte, len(to)+4)
n, err := t.f.Read(buf)
copy(to, buf[4:])
return n - 4, err
}
// Write is only valid for single threaded use // Write is only valid for single threaded use
func (t *tun) Write(from []byte) (int, error) { func (t *tun) Write(from []byte) (int, error) {
buf := t.out buf := t.out

View File

@@ -17,6 +17,7 @@ import (
"github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/overlay/tio"
"github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/routing"
"github.com/slackhq/nebula/udp" "github.com/slackhq/nebula/udp"
"github.com/slackhq/nebula/wire"
) )
type TestTun struct { type TestTun struct {
@@ -29,8 +30,6 @@ type TestTun struct {
closed atomic.Bool closed atomic.Bool
rxPackets chan []byte // Packets to receive into nebula rxPackets chan []byte // Packets to receive into nebula
TxPackets chan []byte // Packets transmitted outside by nebula TxPackets chan []byte // Packets transmitted outside by nebula
batchRet [1]tio.Packet
} }
func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*TestTun, error) { func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*TestTun, error) {
@@ -51,9 +50,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*T
l: l, l: l,
rxPackets: make(chan []byte, 10), rxPackets: make(chan []byte, 10),
TxPackets: make(chan []byte, 10), TxPackets: make(chan []byte, 10),
batchRet: [1]tio.Packet{
tio.Packet{Bytes: make([]byte, udp.MTU)},
},
}, nil }, nil
} }
@@ -168,14 +164,17 @@ func (t *TestTun) Close() error {
return nil return nil
} }
func (t *TestTun) Read() ([]tio.Packet, error) { func (t *TestTun) Read(p []wire.TunPacket, mem []byte) (int, error) {
t.batchRet[0].Bytes = t.batchRet[0].Bytes[:udp.MTU] if len(p) == 0 || len(mem) == 0 {
n, err := t.read(t.batchRet[0].Bytes) return 0, nil //todo should this be an err?
if err != nil {
return nil, err
} }
t.batchRet[0].Bytes = t.batchRet[0].Bytes[:n] p[0].Meta = struct{}{}
return t.batchRet[:], nil n, err := t.read(mem)
if err != nil {
return 0, err
}
p[0].Bytes = mem[:n]
return 1, nil
} }
func (t *TestTun) read(b []byte) (int, error) { func (t *TestTun) read(b []byte) (int, error) {

View File

@@ -21,6 +21,7 @@ import (
"github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/routing"
"github.com/slackhq/nebula/util" "github.com/slackhq/nebula/util"
"github.com/slackhq/nebula/wintun" "github.com/slackhq/nebula/wintun"
"github.com/slackhq/nebula/wire"
"golang.org/x/sys/windows" "golang.org/x/sys/windows"
"golang.zx2c4.com/wireguard/windows/tunnel/winipcfg" "golang.zx2c4.com/wireguard/windows/tunnel/winipcfg"
) )
@@ -45,18 +46,19 @@ type winTun struct {
l *slog.Logger l *slog.Logger
tun *wintun.NativeTun tun *wintun.NativeTun
readBuf []byte
batchRet [1]tio.Packet
} }
func (t *winTun) Read() ([]tio.Packet, error) { func (t *winTun) Read(p []wire.TunPacket, mem []byte) (int, error) {
n, err := t.tun.Read(t.readBuf, 0) if len(p) == 0 || len(mem) == 0 {
if err != nil { return 0, nil //todo should this be an err?
return nil, err
} }
t.batchRet[0] = tio.Packet{Bytes: t.readBuf[:n]} p[0].Meta = struct{}{}
return t.batchRet[:], nil n, err := t.tun.Read(mem, 0)
if err != nil {
return 0, err
}
p[0].Bytes = mem[:n]
return 1, nil
} }
func newTunFromFd(_ *config.C, _ *slog.Logger, _ int, _ []netip.Prefix) (Device, error) { func newTunFromFd(_ *config.C, _ *slog.Logger, _ int, _ []netip.Prefix) (Device, error) {
@@ -81,7 +83,6 @@ func newTun(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, _ bool) (*w
} }
t := &winTun{ t := &winTun{
readBuf: make([]byte, defaultBatchBufSize),
Device: deviceName, Device: deviceName,
vpnNetworks: vpnNetworks, vpnNetworks: vpnNetworks,
MTU: c.GetInt("tun.mtu", DefaultMTU), MTU: c.GetInt("tun.mtu", DefaultMTU),

View File

@@ -9,6 +9,7 @@ import (
"github.com/slackhq/nebula/config" "github.com/slackhq/nebula/config"
"github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/overlay/tio"
"github.com/slackhq/nebula/routing" "github.com/slackhq/nebula/routing"
"github.com/slackhq/nebula/wire"
) )
func NewUserDeviceFromConfig(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, routines int) (Device, error) { func NewUserDeviceFromConfig(c *config.C, l *slog.Logger, vpnNetworks []netip.Prefix, routines int) (Device, error) {
@@ -37,25 +38,23 @@ type UserDevice struct {
inboundReader *io.PipeReader inboundReader *io.PipeReader
inboundWriter *io.PipeWriter inboundWriter *io.PipeWriter
readBuf []byte
batchRet [1]tio.Packet
} }
func (d *UserDevice) Capabilities() tio.Capabilities { func (d *UserDevice) Capabilities() tio.Capabilities {
return tio.Capabilities{} return tio.Capabilities{}
} }
func (d *UserDevice) Read() ([]tio.Packet, error) { func (d *UserDevice) Read(p []wire.TunPacket, mem []byte) (int, error) {
if d.readBuf == nil { if len(p) == 0 || len(mem) == 0 {
d.readBuf = make([]byte, defaultBatchBufSize) return 0, nil //todo should this be an err?
} }
n, err := d.outboundReader.Read(d.readBuf) p[0].Meta = struct{}{}
n, err := d.outboundReader.Read(mem)
if err != nil { if err != nil {
return nil, err return 0, err
} }
d.batchRet[0] = tio.Packet{Bytes: d.readBuf[:n]} p[0].Bytes = mem[:n]
return d.batchRet[:], nil return 1, nil
} }
func (d *UserDevice) Activate() error { func (d *UserDevice) Activate() error {

11
wire/wire.go Normal file
View File

@@ -0,0 +1,11 @@
package wire
// TunPacket is the unit a read from a tun device returns.
// On supported platforms, it may be a superpacket, but a single TunPacket will never have more than one destination.
type TunPacket struct {
// Bytes contains the actual packet
Bytes []byte
// Meta contains other information to help process the packet correctly, such as offsets for segmentation offloads
// Fields in Meta should be as portable/platform-agnostic as possible.
Meta struct{}
}