diff --git a/inside.go b/inside.go index 12fad38..07f0418 100644 --- a/inside.go +++ b/inside.go @@ -3,6 +3,7 @@ package nebula import ( "net/netip" + "github.com/rcrowley/go-metrics" "github.com/sirupsen/logrus" "github.com/slackhq/nebula/firewall" "github.com/slackhq/nebula/header" @@ -145,9 +146,12 @@ func (f *Interface) consumeInsidePackets(packets [][]byte, sizes []int, count in // Send all accumulated packets in one batch if len(*batchPackets) > 0 { + batchSize := len(*batchPackets) + metrics.GetOrRegisterHistogram("batch.udp_write_size", nil, metrics.NewUniformSample(1024)).Update(int64(batchSize)) + n, err := f.writers[q].WriteMulti(*batchPackets, *batchAddrs) if err != nil { - f.l.WithError(err).WithField("sent", n).WithField("total", len(*batchPackets)).Error("Failed to send batch") + f.l.WithError(err).WithField("sent", n).WithField("total", batchSize).Error("Failed to send batch") } } } diff --git a/interface.go b/interface.go index 0486378..d80a721 100644 --- a/interface.go +++ b/interface.go @@ -347,6 +347,8 @@ func (f *Interface) listenInBatch(reader io.ReadWriteCloser, batchReader BatchRe conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout) + tunBatchHist := metrics.GetOrRegisterHistogram("batch.tun_read_size", nil, metrics.NewUniformSample(1024)) + for { n, err := batchReader.BatchRead(bufs, sizes) if err != nil { @@ -359,6 +361,8 @@ func (f *Interface) listenInBatch(reader io.ReadWriteCloser, batchReader BatchRe os.Exit(2) } + tunBatchHist.Update(int64(n)) + // Process all packets in the batch at once f.consumeInsidePackets(bufs, sizes, n, outs, i, conntrackCache.Get(f.l), &batchPackets, &batchAddrs) } diff --git a/overlay/tun_linux.go b/overlay/tun_linux.go index e2e926a..c5d7739 100644 --- a/overlay/tun_linux.go +++ b/overlay/tun_linux.go @@ -128,6 +128,7 @@ func (w *wgDeviceWrapper) Close() error { // BatchRead implements batching for multiqueue readers func (w *wgDeviceWrapper) BatchRead(bufs [][]byte, sizes []int) (int, error) { + // The zero here is offset. return w.dev.Read(bufs, sizes, 0) } diff --git a/udp/udp_linux.go b/udp/udp_linux.go index a8f300d..c591a09 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -151,6 +151,8 @@ func (u *StdConn) ListenOut(r EncReader) { read = u.ReadSingle } + udpBatchHist := metrics.GetOrRegisterHistogram("batch.udp_read_size", nil, metrics.NewUniformSample(1024)) + for { n, err := read(msgs) if err != nil { @@ -158,6 +160,8 @@ func (u *StdConn) ListenOut(r EncReader) { return } + udpBatchHist.Update(int64(n)) + for i := 0; i < n; i++ { // Its ok to skip the ok check here, the slicing is the only error that can occur and it will panic if u.isV4 {