diff --git a/examples/config.yml b/examples/config.yml index 42c32c8..6324701 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -132,6 +132,13 @@ listen: # Sets the max number of packets to pull from the kernel for each syscall (under systems that support recvmmsg) # default is 64, does not support reload #batch: 64 + +# Control batching between UDP and TUN pipelines +#batch: +# inbound_size: 32 # packets to queue from UDP before handing to workers +# outbound_size: 32 # packets to queue from TUN before handing to workers +# flush_interval: 50us # flush partially filled batches after this duration +# max_outstanding: 1028 # batches buffered per routine on each channel # Configure socket buffers for the udp side (outside), leave unset to use the system defaults. Values will be doubled by the kernel # Default is net.core.rmem_default and net.core.wmem_default (/proc/sys/net/core/rmem_default and /proc/sys/net/core/rmem_default) # Maximum is limited by memory in the system, SO_RCVBUFFORCE and SO_SNDBUFFORCE is used to avoid having to raise the system wide diff --git a/interface.go b/interface.go index 3df31d3..9c75ed2 100644 --- a/interface.go +++ b/interface.go @@ -25,10 +25,10 @@ import ( const ( mtu = 9001 - inboundBatchSize = 32 - outboundBatchSize = 32 - batchFlushInterval = 50 * time.Microsecond - maxOutstandingBatches = 1028 + inboundBatchSizeDefault = 32 + outboundBatchSizeDefault = 32 + batchFlushIntervalDefault = 50 * time.Microsecond + maxOutstandingBatchesDefault = 1028 ) type InterfaceConfig struct { @@ -55,9 +55,17 @@ type InterfaceConfig struct { reQueryWait time.Duration ConntrackCacheTimeout time.Duration + BatchConfig BatchConfig l *logrus.Logger } +type BatchConfig struct { + InboundBatchSize int + OutboundBatchSize int + FlushInterval time.Duration + MaxOutstandingPerChan int +} + type Interface struct { hostMap *HostMap outside udp.Conn @@ -111,15 +119,20 @@ type Interface struct { packetBatchPool sync.Pool outboundBatchPool sync.Pool + + inboundBatchSize int + outboundBatchSize int + batchFlushInterval time.Duration + maxOutstandingPerChan int } type packetBatch struct { packets []*packet.Packet } -func newPacketBatch() *packetBatch { +func newPacketBatch(capacity int) *packetBatch { return &packetBatch{ - packets: make([]*packet.Packet, 0, inboundBatchSize), + packets: make([]*packet.Packet, 0, capacity), } } @@ -140,7 +153,7 @@ func (f *Interface) getPacketBatch() *packetBatch { b.reset() return b } - return newPacketBatch() + return newPacketBatch(f.inboundBatchSize) } func (f *Interface) releasePacketBatch(b *packetBatch) { @@ -152,8 +165,8 @@ type outboundBatch struct { payloads []*[]byte } -func newOutboundBatch() *outboundBatch { - return &outboundBatch{payloads: make([]*[]byte, 0, outboundBatchSize)} +func newOutboundBatch(capacity int) *outboundBatch { + return &outboundBatch{payloads: make([]*[]byte, 0, capacity)} } func (b *outboundBatch) add(buf *[]byte) { @@ -173,7 +186,7 @@ func (f *Interface) getOutboundBatch() *outboundBatch { b.reset() return b } - return newOutboundBatch() + return newOutboundBatch(f.outboundBatchSize) } func (f *Interface) releaseOutboundBatch(b *outboundBatch) { @@ -248,6 +261,20 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) { } cs := c.pki.getCertState() + + bc := c.BatchConfig + if bc.InboundBatchSize <= 0 { + bc.InboundBatchSize = inboundBatchSizeDefault + } + if bc.OutboundBatchSize <= 0 { + bc.OutboundBatchSize = outboundBatchSizeDefault + } + if bc.FlushInterval <= 0 { + bc.FlushInterval = batchFlushIntervalDefault + } + if bc.MaxOutstandingPerChan <= 0 { + bc.MaxOutstandingPerChan = maxOutstandingBatchesDefault + } ifce := &Interface{ pki: c.pki, hostMap: c.HostMap, @@ -280,16 +307,20 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) { dropped: metrics.GetOrRegisterCounter("hostinfo.cached_packets.dropped", nil), }, - //TODO: configurable size inbound: make([]chan *packetBatch, c.routines), outbound: make([]chan *outboundBatch, c.routines), l: c.l, + + inboundBatchSize: bc.InboundBatchSize, + outboundBatchSize: bc.OutboundBatchSize, + batchFlushInterval: bc.FlushInterval, + maxOutstandingPerChan: bc.MaxOutstandingPerChan, } for i := 0; i < c.routines; i++ { - ifce.inbound[i] = make(chan *packetBatch, maxOutstandingBatches) - ifce.outbound[i] = make(chan *outboundBatch, maxOutstandingBatches) + ifce.inbound[i] = make(chan *packetBatch, ifce.maxOutstandingPerChan) + ifce.outbound[i] = make(chan *outboundBatch, ifce.maxOutstandingPerChan) } ifce.inPool = sync.Pool{New: func() any { @@ -302,11 +333,11 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) { }} ifce.packetBatchPool = sync.Pool{New: func() any { - return newPacketBatch() + return newPacketBatch(ifce.inboundBatchSize) }} ifce.outboundBatchPool = sync.Pool{New: func() any { - return newOutboundBatch() + return newOutboundBatch(ifce.outboundBatchSize) }} ifce.tryPromoteEvery.Store(c.tryPromoteEvery) @@ -411,7 +442,7 @@ func (f *Interface) listenOut(i int) { p.Addr = fromUdpAddr batch.add(p) - if len(batch.packets) >= inboundBatchSize || time.Since(lastFlush) >= batchFlushInterval { + if len(batch.packets) >= f.inboundBatchSize || time.Since(lastFlush) >= f.batchFlushInterval { flush(false) } }) @@ -465,7 +496,7 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) { *p = (*p)[:n] batch.add(p) - if len(batch.payloads) >= outboundBatchSize || time.Since(lastFlush) >= batchFlushInterval { + if len(batch.payloads) >= f.outboundBatchSize || time.Since(lastFlush) >= f.batchFlushInterval { flush(false) } } diff --git a/main.go b/main.go index 5501e75..ce5d9d9 100644 --- a/main.go +++ b/main.go @@ -221,6 +221,13 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg } } + batchCfg := BatchConfig{ + InboundBatchSize: c.GetInt("batch.inbound_size", inboundBatchSizeDefault), + OutboundBatchSize: c.GetInt("batch.outbound_size", outboundBatchSizeDefault), + FlushInterval: c.GetDuration("batch.flush_interval", batchFlushIntervalDefault), + MaxOutstandingPerChan: c.GetInt("batch.max_outstanding", maxOutstandingBatchesDefault), + } + ifConfig := &InterfaceConfig{ HostMap: hostMap, Inside: tun, @@ -242,6 +249,7 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg relayManager: NewRelayManager(ctx, l, hostMap, c), punchy: punchy, ConntrackCacheTimeout: conntrackCacheTimeout, + BatchConfig: batchCfg, l: l, }