change Queue.Read signature

This commit is contained in:
JackDoan
2026-05-14 11:42:59 -05:00
parent 3192e47ff4
commit 3b1e658bef
13 changed files with 170 additions and 197 deletions

View File

@@ -14,6 +14,7 @@ import (
"github.com/gaissmai/bart"
"github.com/rcrowley/go-metrics"
"github.com/slackhq/nebula/util"
"github.com/slackhq/nebula/wire"
"github.com/slackhq/nebula/config"
"github.com/slackhq/nebula/firewall"
@@ -372,45 +373,45 @@ func (f *Interface) listenOut(i int) {
f.l.Debug("underlay reader is done", "reader", i)
}
func (f *Interface) listenIn(reader tio.Queue, i int) {
func (f *Interface) listenIn(reader tio.Queue, q int) {
// Pinning this thread (and goroutine) to a single CPU keeps every sendmmsg from this goroutine going through the
// same TX ring on the nic, so the wire sees per-flow order.
cpu := i % runtime.NumCPU()
cpu := q % runtime.NumCPU()
if n := len(f.cpuAffinity); n > 0 {
cpu = f.cpuAffinity[i%n]
cpu = f.cpuAffinity[q%n]
}
if err := util.PinThreadToCPU(cpu); err != nil {
f.l.Warn("failed to pin tun reader to CPU", "queue", i, "cpu", cpu, "err", err)
f.l.Warn("failed to pin tun reader to CPU", "queue", q, "cpu", cpu, "err", err)
}
rejectBuf := make([]byte, mtu)
arenaSize := batch.SendBatchCap * (udp.MTU + 32)
sb := batch.NewSendBatch(f.writers[i], batch.SendBatchCap, arenaSize)
sb := batch.NewSendBatch(f.writers[q], batch.SendBatchCap, arenaSize)
fwPacket := &firewall.Packet{}
nb := make([]byte, 12, 12)
conntrackCache := firewall.NewConntrackCacheTicker(f.ctx, f.l, f.conntrackCacheTimeout)
for {
pkts, err := reader.Read()
n, err := reader.Read(packets, packetMem)
if err != nil {
if !f.closed.Load() {
f.l.Error("Error while reading outbound packet, closing", "error", err, "reader", i)
f.l.Error("Error while reading outbound packet, closing", "error", err, "reader", q)
f.onFatal(err)
}
break
}
ctCache := conntrackCache.Get()
for _, pkt := range pkts {
f.consumeInsidePacket(pkt, fwPacket, nb, sb, rejectBuf, i, ctCache)
for i := range n {
f.consumeInsidePacket(packets[i], fwPacket, nb, sb, rejectBuf, q, ctCache)
}
if err := sb.Flush(); err != nil {
f.l.Error("Failed to write outgoing batch", "error", err, "writer", i)
}
}
f.l.Debug("overlay reader is done", "reader", i)
f.l.Debug("overlay reader is done", "reader", q)
}
func (f *Interface) RegisterConfigChangeCallbacks(c *config.C) {