better and batched tun interface

This commit is contained in:
JackDoan
2026-04-17 10:25:05 -05:00
parent 1ab1f71dba
commit 8b02b8128e
34 changed files with 1242 additions and 491 deletions

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"net/netip"
"sync"
@@ -18,6 +17,8 @@ import (
"github.com/slackhq/nebula/firewall"
"github.com/slackhq/nebula/header"
"github.com/slackhq/nebula/overlay"
"github.com/slackhq/nebula/overlay/batch"
"github.com/slackhq/nebula/overlay/tio"
"github.com/slackhq/nebula/udp"
)
@@ -88,8 +89,12 @@ type Interface struct {
ctx context.Context
writers []udp.Conn
readers []io.ReadWriteCloser
wg sync.WaitGroup
readers []tio.Queue
// batchers is one per tun queue, wrapping readers[i].
// decryptToTun sends plaintext into the batch.RxBatcher;
// listenOut calls its Flush at the end of each UDP recvmmsg batch.
batchers []batch.RxBatcher
wg sync.WaitGroup
// fatalErr holds the first unexpected reader error that caused shutdown.
// nil means "no fatal error" (yet)
@@ -187,7 +192,8 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
routines: c.routines,
version: c.version,
writers: make([]udp.Conn, c.routines),
readers: make([]io.ReadWriteCloser, c.routines),
readers: make([]tio.Queue, c.routines),
batchers: make([]batch.RxBatcher, c.routines),
myVpnNetworks: cs.myVpnNetworks,
myVpnNetworksTable: cs.myVpnNetworksTable,
myVpnAddrs: cs.myVpnAddrs,
@@ -245,15 +251,16 @@ func (f *Interface) activate() error {
metrics.GetOrRegisterGauge("routines", nil).Update(int64(f.routines))
// Prepare n tun queues
var reader io.ReadWriteCloser = f.inside
for i := 0; i < f.routines; i++ {
if i > 0 {
reader, err = f.inside.NewMultiQueueReader()
if err != nil {
if err = f.inside.NewMultiQueueReader(); err != nil {
return err
}
}
f.readers[i] = reader
}
f.readers = f.inside.Readers()
for i := range f.readers {
f.batchers[i] = batch.NewPassthrough(f.readers[i])
}
f.wg.Add(1) // for us to wait on Close() to return
@@ -311,14 +318,24 @@ func (f *Interface) listenOut(i int) {
ctCache := firewall.NewConntrackCacheTicker(f.ctx, f.l, f.conntrackCacheTimeout)
lhh := f.lightHouse.NewRequestHandler()
plaintext := make([]byte, udp.MTU)
h := &header.H{}
fwPacket := &firewall.Packet{}
nb := make([]byte, 12, 12)
err := li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) {
coalescer := f.batchers[i]
listener := func(fromUdpAddr netip.AddrPort, payload []byte) {
plaintext := f.batchers[i].Reserve(len(payload))
f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get())
})
}
flusher := func() {
if err := coalescer.Flush(); err != nil {
f.l.Error("Failed to flush tun coalescer", "error", err)
}
}
err := li.ListenOut(listener, flusher)
if err != nil && !f.closed.Load() {
f.l.Error("Error while reading inbound packet, closing", "error", err)
@@ -328,16 +345,16 @@ func (f *Interface) listenOut(i int) {
f.l.Debug("underlay reader is done", "reader", i)
}
func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
packet := make([]byte, mtu)
out := make([]byte, mtu)
func (f *Interface) listenIn(reader tio.Queue, i int) {
rejectBuf := make([]byte, mtu)
sb := batch.NewSendBatch(batch.SendBatchCap, udp.MTU+32)
fwPacket := &firewall.Packet{}
nb := make([]byte, 12, 12)
conntrackCache := firewall.NewConntrackCacheTicker(f.ctx, f.l, f.conntrackCacheTimeout)
for {
n, err := reader.Read(packet)
pkts, err := reader.Read()
if err != nil {
if !f.closed.Load() {
f.l.Error("Error while reading outbound packet, closing", "error", err, "reader", i)
@@ -346,12 +363,29 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
break
}
f.consumeInsidePacket(packet[:n], fwPacket, nb, out, i, conntrackCache.Get())
sb.Reset()
for _, pkt := range pkts {
if sb.Len() >= sb.Cap() {
f.flushBatch(sb, i)
sb.Reset()
}
f.consumeInsidePacket(pkt, fwPacket, nb, sb, rejectBuf, i, conntrackCache.Get())
}
if sb.Len() > 0 {
f.flushBatch(sb, i)
}
}
f.l.Debug("overlay reader is done", "reader", i)
}
func (f *Interface) flushBatch(sb batch.TxBatcher, q int) {
bufs, dsts := sb.Get()
if err := f.writers[q].WriteBatch(bufs, dsts); err != nil {
f.l.Error("Failed to write outgoing batch", "error", err, "writer", q)
}
}
func (f *Interface) RegisterConfigChangeCallbacks(c *config.C) {
c.RegisterReloadCallback(f.reloadFirewall)
c.RegisterReloadCallback(f.reloadSendRecvError)