mirror of
https://github.com/slackhq/nebula.git
synced 2025-11-22 16:34:25 +01:00
instruments
This commit is contained in:
@@ -3,6 +3,7 @@ package nebula
|
|||||||
import (
|
import (
|
||||||
"net/netip"
|
"net/netip"
|
||||||
|
|
||||||
|
"github.com/rcrowley/go-metrics"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/slackhq/nebula/firewall"
|
"github.com/slackhq/nebula/firewall"
|
||||||
"github.com/slackhq/nebula/header"
|
"github.com/slackhq/nebula/header"
|
||||||
@@ -145,9 +146,12 @@ func (f *Interface) consumeInsidePackets(packets [][]byte, sizes []int, count in
|
|||||||
|
|
||||||
// Send all accumulated packets in one batch
|
// Send all accumulated packets in one batch
|
||||||
if len(*batchPackets) > 0 {
|
if len(*batchPackets) > 0 {
|
||||||
|
batchSize := len(*batchPackets)
|
||||||
|
metrics.GetOrRegisterHistogram("batch.udp_write_size", nil, metrics.NewUniformSample(1024)).Update(int64(batchSize))
|
||||||
|
|
||||||
n, err := f.writers[q].WriteMulti(*batchPackets, *batchAddrs)
|
n, err := f.writers[q].WriteMulti(*batchPackets, *batchAddrs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.l.WithError(err).WithField("sent", n).WithField("total", len(*batchPackets)).Error("Failed to send batch")
|
f.l.WithError(err).WithField("sent", n).WithField("total", batchSize).Error("Failed to send batch")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -347,6 +347,8 @@ func (f *Interface) listenInBatch(reader io.ReadWriteCloser, batchReader BatchRe
|
|||||||
|
|
||||||
conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
|
conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
|
||||||
|
|
||||||
|
tunBatchHist := metrics.GetOrRegisterHistogram("batch.tun_read_size", nil, metrics.NewUniformSample(1024))
|
||||||
|
|
||||||
for {
|
for {
|
||||||
n, err := batchReader.BatchRead(bufs, sizes)
|
n, err := batchReader.BatchRead(bufs, sizes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -359,6 +361,8 @@ func (f *Interface) listenInBatch(reader io.ReadWriteCloser, batchReader BatchRe
|
|||||||
os.Exit(2)
|
os.Exit(2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tunBatchHist.Update(int64(n))
|
||||||
|
|
||||||
// Process all packets in the batch at once
|
// Process all packets in the batch at once
|
||||||
f.consumeInsidePackets(bufs, sizes, n, outs, i, conntrackCache.Get(f.l), &batchPackets, &batchAddrs)
|
f.consumeInsidePackets(bufs, sizes, n, outs, i, conntrackCache.Get(f.l), &batchPackets, &batchAddrs)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -128,6 +128,7 @@ func (w *wgDeviceWrapper) Close() error {
|
|||||||
|
|
||||||
// BatchRead implements batching for multiqueue readers
|
// BatchRead implements batching for multiqueue readers
|
||||||
func (w *wgDeviceWrapper) BatchRead(bufs [][]byte, sizes []int) (int, error) {
|
func (w *wgDeviceWrapper) BatchRead(bufs [][]byte, sizes []int) (int, error) {
|
||||||
|
// The zero here is offset.
|
||||||
return w.dev.Read(bufs, sizes, 0)
|
return w.dev.Read(bufs, sizes, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -151,6 +151,8 @@ func (u *StdConn) ListenOut(r EncReader) {
|
|||||||
read = u.ReadSingle
|
read = u.ReadSingle
|
||||||
}
|
}
|
||||||
|
|
||||||
|
udpBatchHist := metrics.GetOrRegisterHistogram("batch.udp_read_size", nil, metrics.NewUniformSample(1024))
|
||||||
|
|
||||||
for {
|
for {
|
||||||
n, err := read(msgs)
|
n, err := read(msgs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -158,6 +160,8 @@ func (u *StdConn) ListenOut(r EncReader) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
udpBatchHist.Update(int64(n))
|
||||||
|
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
// Its ok to skip the ok check here, the slicing is the only error that can occur and it will panic
|
// Its ok to skip the ok check here, the slicing is the only error that can occur and it will panic
|
||||||
if u.isV4 {
|
if u.isV4 {
|
||||||
|
|||||||
Reference in New Issue
Block a user