Files
nebula/batch_pipeline.go
2025-11-04 16:08:31 -05:00

165 lines
3.7 KiB
Go

package nebula
import (
"net/netip"
"github.com/slackhq/nebula/overlay"
"github.com/slackhq/nebula/udp"
)
// batchPipelines tracks whether the inside device can operate on packet batches
// and, if so, holds the shared packet pool sized for the virtio headroom and
// payload limits advertised by the device. It also owns the fan-in/fan-out
// queues between the TUN readers, encrypt/decrypt workers, and the UDP writers.
type batchPipelines struct {
enabled bool
inside overlay.BatchCapableDevice
headroom int
payloadCap int
pool *overlay.PacketPool
batchSize int
routines int
rxQueues []chan *overlay.Packet
txQueues []chan queuedDatagram
tunQueues []chan *overlay.Packet
}
type queuedDatagram struct {
packet *overlay.Packet
addr netip.AddrPort
}
func (bp *batchPipelines) init(device overlay.Device, routines int, queueDepth int, maxSegments int) {
if device == nil || routines <= 0 {
return
}
bcap, ok := device.(overlay.BatchCapableDevice)
if !ok {
return
}
headroom := bcap.BatchHeadroom()
payload := bcap.BatchPayloadCap()
if maxSegments < 1 {
maxSegments = 1
}
requiredPayload := udp.MTU * maxSegments
if payload < requiredPayload {
payload = requiredPayload
}
batchSize := bcap.BatchSize()
if headroom <= 0 || payload <= 0 || batchSize <= 0 {
return
}
bp.enabled = true
bp.inside = bcap
bp.headroom = headroom
bp.payloadCap = payload
bp.batchSize = batchSize
bp.routines = routines
bp.pool = overlay.NewPacketPool(headroom, payload)
queueCap := batchSize * defaultBatchQueueDepthFactor
if queueDepth > 0 {
queueCap = queueDepth
}
if queueCap < batchSize {
queueCap = batchSize
}
bp.rxQueues = make([]chan *overlay.Packet, routines)
bp.txQueues = make([]chan queuedDatagram, routines)
bp.tunQueues = make([]chan *overlay.Packet, routines)
for i := 0; i < routines; i++ {
bp.rxQueues[i] = make(chan *overlay.Packet, queueCap)
bp.txQueues[i] = make(chan queuedDatagram, queueCap)
bp.tunQueues[i] = make(chan *overlay.Packet, queueCap)
}
}
func (bp *batchPipelines) Pool() *overlay.PacketPool {
if bp == nil || !bp.enabled {
return nil
}
return bp.pool
}
func (bp *batchPipelines) Enabled() bool {
return bp != nil && bp.enabled
}
func (bp *batchPipelines) batchSizeHint() int {
if bp == nil || bp.batchSize <= 0 {
return 1
}
return bp.batchSize
}
func (bp *batchPipelines) rxQueue(i int) chan *overlay.Packet {
if bp == nil || !bp.enabled || i < 0 || i >= len(bp.rxQueues) {
return nil
}
return bp.rxQueues[i]
}
func (bp *batchPipelines) txQueue(i int) chan queuedDatagram {
if bp == nil || !bp.enabled || i < 0 || i >= len(bp.txQueues) {
return nil
}
return bp.txQueues[i]
}
func (bp *batchPipelines) tunQueue(i int) chan *overlay.Packet {
if bp == nil || !bp.enabled || i < 0 || i >= len(bp.tunQueues) {
return nil
}
return bp.tunQueues[i]
}
func (bp *batchPipelines) txQueueLen(i int) int {
q := bp.txQueue(i)
if q == nil {
return 0
}
return len(q)
}
func (bp *batchPipelines) tunQueueLen(i int) int {
q := bp.tunQueue(i)
if q == nil {
return 0
}
return len(q)
}
func (bp *batchPipelines) enqueueRx(i int, pkt *overlay.Packet) bool {
q := bp.rxQueue(i)
if q == nil {
return false
}
q <- pkt
return true
}
func (bp *batchPipelines) enqueueTx(i int, pkt *overlay.Packet, addr netip.AddrPort) bool {
q := bp.txQueue(i)
if q == nil {
return false
}
q <- queuedDatagram{packet: pkt, addr: addr}
return true
}
func (bp *batchPipelines) enqueueTun(i int, pkt *overlay.Packet) bool {
q := bp.tunQueue(i)
if q == nil {
return false
}
q <- pkt
return true
}
func (bp *batchPipelines) newPacket() *overlay.Packet {
if bp == nil || !bp.enabled || bp.pool == nil {
return nil
}
return bp.pool.Get()
}