From 9253f36a3c875d5738d0f026e54c763a686ef084 Mon Sep 17 00:00:00 2001 From: Ryan Date: Thu, 6 Nov 2025 13:34:58 -0500 Subject: [PATCH] tweak defaults and turn on gsogro on linux by default --- connection_state.go | 2 +- interface.go | 81 ++++++++++++++++++++++++++++++++++++--------- main.go | 5 ++- udp/udp_linux.go | 8 ++--- 4 files changed, 75 insertions(+), 21 deletions(-) diff --git a/connection_state.go b/connection_state.go index b5d6d0f..f6d5415 100644 --- a/connection_state.go +++ b/connection_state.go @@ -15,7 +15,7 @@ import ( // TODO: In a 5Gbps test, 1024 is not sufficient. With a 1400 MTU this is about 1.4Gbps of window, assuming full packets. // 4092 should be sufficient for 5Gbps -const ReplayWindow = 1024 +const ReplayWindow = 4096 type ConnectionState struct { eKey *NebulaCipherState diff --git a/interface.go b/interface.go index 43d958f..516f450 100644 --- a/interface.go +++ b/interface.go @@ -25,11 +25,14 @@ import ( const ( mtu = 9001 - inboundBatchSizeDefault = 32 - outboundBatchSizeDefault = 32 - batchFlushIntervalDefault = 50 * time.Microsecond - maxOutstandingBatchesDefault = 1028 - sendBatchSizeDefault = 32 + inboundBatchSizeDefault = 128 + outboundBatchSizeDefault = 64 + batchFlushIntervalDefault = 12 * time.Microsecond + maxOutstandingBatchesDefault = 8 + sendBatchSizeDefault = 64 + maxPendingPacketsDefault = 32 + maxPendingBytesDefault = 64 * 1024 + maxSendBufPerRoutineDefault = 16 ) type InterfaceConfig struct { @@ -65,6 +68,9 @@ type BatchConfig struct { OutboundBatchSize int FlushInterval time.Duration MaxOutstandingPerChan int + MaxPendingPackets int + MaxPendingBytes int + MaxSendBuffersPerChan int } type Interface struct { @@ -122,12 +128,16 @@ type Interface struct { outboundBatchPool sync.Pool sendPool sync.Pool + sendBufCache [][]*[]byte sendBatchSize int inboundBatchSize int outboundBatchSize int batchFlushInterval time.Duration maxOutstandingPerChan int + maxPendingPackets int + maxPendingBytes int + maxSendBufPerRoutine int } type outboundSend struct { @@ -204,7 +214,14 @@ func (f *Interface) releaseOutboundBatch(b *outboundBatch) { f.outboundBatchPool.Put(b) } -func (f *Interface) getSendBuffer() *[]byte { +func (f *Interface) getSendBuffer(q int) *[]byte { + cache := f.sendBufCache[q] + if n := len(cache); n > 0 { + buf := cache[n-1] + f.sendBufCache[q] = cache[:n-1] + *buf = (*buf)[:0] + return buf + } if v := f.sendPool.Get(); v != nil { buf := v.(*[]byte) *buf = (*buf)[:0] @@ -214,15 +231,20 @@ func (f *Interface) getSendBuffer() *[]byte { return &b } -func (f *Interface) releaseSendBuffer(buf *[]byte) { +func (f *Interface) releaseSendBuffer(q int, buf *[]byte) { if buf == nil { return } *buf = (*buf)[:0] + cache := f.sendBufCache[q] + if len(cache) < f.maxSendBufPerRoutine { + f.sendBufCache[q] = append(cache, buf) + return + } f.sendPool.Put(buf) } -func (f *Interface) flushSendQueue(q int, pending *[]outboundSend) { +func (f *Interface) flushSendQueue(q int, pending *[]outboundSend, pendingBytes *int) { if len(*pending) == 0 { return } @@ -241,9 +263,12 @@ func (f *Interface) flushSendQueue(q int, pending *[]outboundSend) { } for _, entry := range *pending { - f.releaseSendBuffer(entry.buf) + f.releaseSendBuffer(q, entry.buf) } *pending = (*pending)[:0] + if pendingBytes != nil { + *pendingBytes = 0 + } } type EncWriter interface { @@ -327,6 +352,15 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) { if bc.MaxOutstandingPerChan <= 0 { bc.MaxOutstandingPerChan = maxOutstandingBatchesDefault } + if bc.MaxPendingPackets <= 0 { + bc.MaxPendingPackets = maxPendingPacketsDefault + } + if bc.MaxPendingBytes <= 0 { + bc.MaxPendingBytes = maxPendingBytesDefault + } + if bc.MaxSendBuffersPerChan <= 0 { + bc.MaxSendBuffersPerChan = maxSendBufPerRoutineDefault + } ifce := &Interface{ pki: c.pki, hostMap: c.HostMap, @@ -368,6 +402,9 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) { outboundBatchSize: bc.OutboundBatchSize, batchFlushInterval: bc.FlushInterval, maxOutstandingPerChan: bc.MaxOutstandingPerChan, + maxPendingPackets: bc.MaxPendingPackets, + maxPendingBytes: bc.MaxPendingBytes, + maxSendBufPerRoutine: bc.MaxSendBuffersPerChan, sendBatchSize: bc.OutboundBatchSize, } @@ -397,6 +434,7 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) { buf := make([]byte, mtu) return &buf }} + ifce.sendBufCache = make([][]*[]byte, c.routines) ifce.tryPromoteEvery.Store(c.tryPromoteEvery) ifce.reQueryEvery.Store(c.reQueryEvery) @@ -598,37 +636,50 @@ func (f *Interface) workerOut(i int, ctx context.Context) { fwPacket1 := &firewall.Packet{} nb1 := make([]byte, 12, 12) pending := make([]outboundSend, 0, f.sendBatchSize) + pendingBytes := 0 + maxPendingPackets := f.maxPendingPackets + if maxPendingPackets <= 0 { + maxPendingPackets = f.sendBatchSize + } + maxPendingBytes := f.maxPendingBytes + if maxPendingBytes <= 0 { + maxPendingBytes = f.sendBatchSize * mtu + } for { select { case batch := <-f.outbound[i]: for _, data := range batch.payloads { - sendBuf := f.getSendBuffer() + sendBuf := f.getSendBuffer(i) buf := (*sendBuf)[:0] queue := func(addr netip.AddrPort, length int) { + if len(pending) >= maxPendingPackets || pendingBytes+length > maxPendingBytes { + f.flushSendQueue(i, &pending, &pendingBytes) + } pending = append(pending, outboundSend{ buf: sendBuf, length: length, addr: addr, }) - if len(pending) >= f.sendBatchSize { - f.flushSendQueue(i, &pending) + pendingBytes += length + if len(pending) >= f.sendBatchSize || pendingBytes >= maxPendingBytes { + f.flushSendQueue(i, &pending, &pendingBytes) } } sent := f.consumeInsidePacket(*data, fwPacket1, nb1, buf, queue, i, conntrackCache.Get(f.l)) if !sent { - f.releaseSendBuffer(sendBuf) + f.releaseSendBuffer(i, sendBuf) } *data = (*data)[:mtu] f.outPool.Put(data) } f.releaseOutboundBatch(batch) if len(pending) > 0 { - f.flushSendQueue(i, &pending) + f.flushSendQueue(i, &pending, &pendingBytes) } case <-ctx.Done(): if len(pending) > 0 { - f.flushSendQueue(i, &pending) + f.flushSendQueue(i, &pending, &pendingBytes) } f.wg.Done() return diff --git a/main.go b/main.go index ce5d9d9..5faf356 100644 --- a/main.go +++ b/main.go @@ -164,7 +164,7 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg for i := 0; i < routines; i++ { l.Infof("listening on %v", netip.AddrPortFrom(listenHost, uint16(port))) - udpServer, err := udp.NewListener(l, listenHost, port, routines > 1, c.GetInt("listen.batch", 64)) + udpServer, err := udp.NewListener(l, listenHost, port, routines > 1, c.GetInt("listen.batch", 128)) if err != nil { return nil, util.NewContextualError("Failed to open udp listener", m{"queue": i}, err) } @@ -226,6 +226,9 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg OutboundBatchSize: c.GetInt("batch.outbound_size", outboundBatchSizeDefault), FlushInterval: c.GetDuration("batch.flush_interval", batchFlushIntervalDefault), MaxOutstandingPerChan: c.GetInt("batch.max_outstanding", maxOutstandingBatchesDefault), + MaxPendingPackets: c.GetInt("batch.max_pending_packets", 0), + MaxPendingBytes: c.GetInt("batch.max_pending_bytes", 0), + MaxSendBuffersPerChan: c.GetInt("batch.max_send_buffers_per_routine", 0), } ifConfig := &InterfaceConfig{ diff --git a/udp/udp_linux.go b/udp/udp_linux.go index cfb8470..b88a5a2 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -23,8 +23,8 @@ import ( var readTimeout = unix.NsecToTimeval(int64(time.Millisecond * 500)) const ( - defaultGSOMaxSegments = 8 - defaultGSOFlushTimeout = 150 * time.Microsecond + defaultGSOMaxSegments = 128 + defaultGSOFlushTimeout = 80 * time.Microsecond defaultGROReadBufferSize = MTU * defaultGSOMaxSegments maxGSOBatchBytes = 0xFFFF ) @@ -565,7 +565,7 @@ func (u *StdConn) configureGRO(c *config.C) { return } - enable := c.GetBool("listen.enable_gro", false) + enable := c.GetBool("listen.enable_gro", true) if enable == u.enableGRO { if enable { if size := c.GetInt("listen.gro_read_buffer", 0); size > 0 { @@ -594,7 +594,7 @@ func (u *StdConn) configureGRO(c *config.C) { } func (u *StdConn) configureGSO(c *config.C) { - enable := c.GetBool("listen.enable_gso", false) + enable := c.GetBool("listen.enable_gso", true) if !enable { u.disableGSO() } else {