Compare commits

..

2 Commits

Author SHA1 Message Date
Ryan
2c6f81c224 config tweaks for batching 2025-11-06 10:01:20 -05:00
Ryan
ad37749c5e add batching of packets 2025-11-06 09:42:13 -05:00
5 changed files with 214 additions and 43 deletions

View File

@@ -132,6 +132,13 @@ listen:
# Sets the max number of packets to pull from the kernel for each syscall (under systems that support recvmmsg)
# default is 64, does not support reload
#batch: 64
# Control batching between UDP and TUN pipelines
#batch:
# inbound_size: 32 # packets to queue from UDP before handing to workers
# outbound_size: 32 # packets to queue from TUN before handing to workers
# flush_interval: 50us # flush partially filled batches after this duration
# max_outstanding: 1028 # batches buffered per routine on each channel
# Configure socket buffers for the udp side (outside), leave unset to use the system defaults. Values will be doubled by the kernel
# Default is net.core.rmem_default and net.core.wmem_default (/proc/sys/net/core/rmem_default and /proc/sys/net/core/rmem_default)
# Maximum is limited by memory in the system, SO_RCVBUFFORCE and SO_SNDBUFFORCE is used to avoid having to raise the system wide

View File

@@ -22,7 +22,14 @@ import (
"github.com/slackhq/nebula/udp"
)
const mtu = 9001
const (
mtu = 9001
inboundBatchSizeDefault = 32
outboundBatchSizeDefault = 32
batchFlushIntervalDefault = 50 * time.Microsecond
maxOutstandingBatchesDefault = 1028
)
type InterfaceConfig struct {
HostMap *HostMap
@@ -48,9 +55,17 @@ type InterfaceConfig struct {
reQueryWait time.Duration
ConntrackCacheTimeout time.Duration
BatchConfig BatchConfig
l *logrus.Logger
}
type BatchConfig struct {
InboundBatchSize int
OutboundBatchSize int
FlushInterval time.Duration
MaxOutstandingPerChan int
}
type Interface struct {
hostMap *HostMap
outside udp.Conn
@@ -97,10 +112,86 @@ type Interface struct {
l *logrus.Logger
inPool sync.Pool
inbound chan *packet.Packet
inbound []chan *packetBatch
outPool sync.Pool
outbound chan *[]byte
outbound []chan *outboundBatch
packetBatchPool sync.Pool
outboundBatchPool sync.Pool
inboundBatchSize int
outboundBatchSize int
batchFlushInterval time.Duration
maxOutstandingPerChan int
}
type packetBatch struct {
packets []*packet.Packet
}
func newPacketBatch(capacity int) *packetBatch {
return &packetBatch{
packets: make([]*packet.Packet, 0, capacity),
}
}
func (b *packetBatch) add(p *packet.Packet) {
b.packets = append(b.packets, p)
}
func (b *packetBatch) reset() {
for i := range b.packets {
b.packets[i] = nil
}
b.packets = b.packets[:0]
}
func (f *Interface) getPacketBatch() *packetBatch {
if v := f.packetBatchPool.Get(); v != nil {
b := v.(*packetBatch)
b.reset()
return b
}
return newPacketBatch(f.inboundBatchSize)
}
func (f *Interface) releasePacketBatch(b *packetBatch) {
b.reset()
f.packetBatchPool.Put(b)
}
type outboundBatch struct {
payloads []*[]byte
}
func newOutboundBatch(capacity int) *outboundBatch {
return &outboundBatch{payloads: make([]*[]byte, 0, capacity)}
}
func (b *outboundBatch) add(buf *[]byte) {
b.payloads = append(b.payloads, buf)
}
func (b *outboundBatch) reset() {
for i := range b.payloads {
b.payloads[i] = nil
}
b.payloads = b.payloads[:0]
}
func (f *Interface) getOutboundBatch() *outboundBatch {
if v := f.outboundBatchPool.Get(); v != nil {
b := v.(*outboundBatch)
b.reset()
return b
}
return newOutboundBatch(f.outboundBatchSize)
}
func (f *Interface) releaseOutboundBatch(b *outboundBatch) {
b.reset()
f.outboundBatchPool.Put(b)
}
type EncWriter interface {
@@ -170,6 +261,20 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
}
cs := c.pki.getCertState()
bc := c.BatchConfig
if bc.InboundBatchSize <= 0 {
bc.InboundBatchSize = inboundBatchSizeDefault
}
if bc.OutboundBatchSize <= 0 {
bc.OutboundBatchSize = outboundBatchSizeDefault
}
if bc.FlushInterval <= 0 {
bc.FlushInterval = batchFlushIntervalDefault
}
if bc.MaxOutstandingPerChan <= 0 {
bc.MaxOutstandingPerChan = maxOutstandingBatchesDefault
}
ifce := &Interface{
pki: c.pki,
hostMap: c.HostMap,
@@ -202,11 +307,20 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
dropped: metrics.GetOrRegisterCounter("hostinfo.cached_packets.dropped", nil),
},
//TODO: configurable size
inbound: make(chan *packet.Packet, 1028),
outbound: make(chan *[]byte, 1028),
inbound: make([]chan *packetBatch, c.routines),
outbound: make([]chan *outboundBatch, c.routines),
l: c.l,
inboundBatchSize: bc.InboundBatchSize,
outboundBatchSize: bc.OutboundBatchSize,
batchFlushInterval: bc.FlushInterval,
maxOutstandingPerChan: bc.MaxOutstandingPerChan,
}
for i := 0; i < c.routines; i++ {
ifce.inbound[i] = make(chan *packetBatch, ifce.maxOutstandingPerChan)
ifce.outbound[i] = make(chan *outboundBatch, ifce.maxOutstandingPerChan)
}
ifce.inPool = sync.Pool{New: func() any {
@@ -218,6 +332,14 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
return &t
}}
ifce.packetBatchPool = sync.Pool{New: func() any {
return newPacketBatch(ifce.inboundBatchSize)
}}
ifce.outboundBatchPool = sync.Pool{New: func() any {
return newOutboundBatch(ifce.outboundBatchSize)
}}
ifce.tryPromoteEvery.Store(c.tryPromoteEvery)
ifce.reQueryEvery.Store(c.reQueryEvery)
ifce.reQueryWait.Store(int64(c.reQueryWait))
@@ -296,22 +418,41 @@ func (f *Interface) listenOut(i int) {
li = f.outside
}
batch := f.getPacketBatch()
lastFlush := time.Now()
flush := func(force bool) {
if len(batch.packets) == 0 {
if force {
f.releasePacketBatch(batch)
}
return
}
f.inbound[i] <- batch
batch = f.getPacketBatch()
lastFlush = time.Now()
}
err := li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) {
p := f.inPool.Get().(*packet.Packet)
//TODO: have the listener store this in the msgs array after a read instead of doing a copy
p.Payload = p.Payload[:mtu]
copy(p.Payload, payload)
p.Payload = p.Payload[:len(payload)]
p.Addr = fromUdpAddr
f.inbound <- p
//select {
//case f.inbound <- p:
//default:
// f.l.Error("Dropped packet from inbound channel")
//}
batch.add(p)
if len(batch.packets) >= f.inboundBatchSize || time.Since(lastFlush) >= f.batchFlushInterval {
flush(false)
}
})
if len(batch.packets) > 0 {
f.inbound[i] <- batch
} else {
f.releasePacketBatch(batch)
}
if err != nil && !f.closed.Load() {
f.l.WithError(err).Error("Error while reading packet inbound packet, closing")
//TODO: Trigger Control to close
@@ -324,6 +465,22 @@ func (f *Interface) listenOut(i int) {
func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
runtime.LockOSThread()
batch := f.getOutboundBatch()
lastFlush := time.Now()
flush := func(force bool) {
if len(batch.payloads) == 0 {
if force {
f.releaseOutboundBatch(batch)
}
return
}
f.outbound[i] <- batch
batch = f.getOutboundBatch()
lastFlush = time.Now()
}
for {
p := f.outPool.Get().(*[]byte)
*p = (*p)[:mtu]
@@ -337,13 +494,17 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
}
*p = (*p)[:n]
//TODO: nonblocking channel write
f.outbound <- p
//select {
//case f.outbound <- p:
//default:
// f.l.Error("Dropped packet from outbound channel")
//}
batch.add(p)
if len(batch.payloads) >= f.outboundBatchSize || time.Since(lastFlush) >= f.batchFlushInterval {
flush(false)
}
}
if len(batch.payloads) > 0 {
f.outbound[i] <- batch
} else {
f.releaseOutboundBatch(batch)
}
f.l.Debugf("overlay reader %v is done", i)
@@ -360,10 +521,13 @@ func (f *Interface) workerIn(i int, ctx context.Context) {
for {
select {
case p := <-f.inbound:
f.readOutsidePackets(p.Addr, nil, result2[:0], p.Payload, h, fwPacket2, lhh, nb2, i, conntrackCache.Get(f.l))
p.Payload = p.Payload[:mtu]
f.inPool.Put(p)
case batch := <-f.inbound[i]:
for _, p := range batch.packets {
f.readOutsidePackets(p.Addr, nil, result2[:0], p.Payload, h, fwPacket2, lhh, nb2, i, conntrackCache.Get(f.l))
p.Payload = p.Payload[:mtu]
f.inPool.Put(p)
}
f.releasePacketBatch(batch)
case <-ctx.Done():
f.wg.Done()
return
@@ -379,10 +543,13 @@ func (f *Interface) workerOut(i int, ctx context.Context) {
for {
select {
case data := <-f.outbound:
f.consumeInsidePacket(*data, fwPacket1, nb1, result1, i, conntrackCache.Get(f.l))
*data = (*data)[:mtu]
f.outPool.Put(data)
case batch := <-f.outbound[i]:
for _, data := range batch.payloads {
f.consumeInsidePacket(*data, fwPacket1, nb1, result1, i, conntrackCache.Get(f.l))
*data = (*data)[:mtu]
f.outPool.Put(data)
}
f.releaseOutboundBatch(batch)
case <-ctx.Done():
f.wg.Done()
return

View File

@@ -221,6 +221,13 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
}
}
batchCfg := BatchConfig{
InboundBatchSize: c.GetInt("batch.inbound_size", inboundBatchSizeDefault),
OutboundBatchSize: c.GetInt("batch.outbound_size", outboundBatchSizeDefault),
FlushInterval: c.GetDuration("batch.flush_interval", batchFlushIntervalDefault),
MaxOutstandingPerChan: c.GetInt("batch.max_outstanding", maxOutstandingBatchesDefault),
}
ifConfig := &InterfaceConfig{
HostMap: hostMap,
Inside: tun,
@@ -242,6 +249,7 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
relayManager: NewRelayManager(ctx, l, hostMap, c),
punchy: punchy,
ConntrackCacheTimeout: conntrackCacheTimeout,
BatchConfig: batchCfg,
l: l,
}

View File

@@ -9,13 +9,10 @@ import (
"math"
"net"
"net/netip"
"os"
"strings"
"sync"
"github.com/sirupsen/logrus"
"github.com/slackhq/nebula"
"github.com/slackhq/nebula/config"
"github.com/slackhq/nebula/overlay"
"golang.org/x/sync/errgroup"
"gvisor.dev/gvisor/pkg/buffer"
@@ -46,15 +43,7 @@ type Service struct {
}
}
func New(config *config.C) (*Service, error) {
logger := logrus.New()
logger.Out = os.Stdout
control, err := nebula.Main(config, false, "custom-app", logger, overlay.NewUserDeviceFromConfig)
if err != nil {
return nil, err
}
func New(control *nebula.Control) (*Service, error) {
wait, err := control.Start()
if err != nil {
return nil, err

View File

@@ -30,8 +30,8 @@ func (NoopConn) Rebind() error {
func (NoopConn) LocalAddr() (netip.AddrPort, error) {
return netip.AddrPort{}, nil
}
func (NoopConn) ListenOut(_ EncReader) {
return
func (NoopConn) ListenOut(_ EncReader) error {
return nil
}
func (NoopConn) WriteTo(_ []byte, _ netip.AddrPort) error {
return nil