From e5ce8966d6730affd57a3cf4937a14ba88a855f1 Mon Sep 17 00:00:00 2001 From: Andriyanov Nikita Date: Mon, 21 Apr 2025 20:44:33 +0300 Subject: [PATCH] add netlink options (#1326) * add netlink options * force use buffer * fix namings and add config examples * fix linter --- examples/config.yml | 4 ++++ overlay/tun_linux.go | 40 +++++++++++++++++++++++++++------------- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/examples/config.yml b/examples/config.yml index d8e7e6e..eec4f1c 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -275,6 +275,10 @@ tun: # On linux only, set to true to manage unsafe routes directly on the system route table with gateway routes instead of # in nebula configuration files. Default false, not reloadable. #use_system_route_table: false + # Buffer size for reading routes updates. 0 means default system buffer size. (/proc/sys/net/core/rmem_default). + # If using massive routes updates, for example BGP, you may need to increase this value to avoid packet loss. + # SO_RCVBUFFORCE is used to avoid having to raise the system wide max + #use_system_route_table_buffer_size: 0 # Configure logging level logging: diff --git a/overlay/tun_linux.go b/overlay/tun_linux.go index 7d19c85..4c509ba 100644 --- a/overlay/tun_linux.go +++ b/overlay/tun_linux.go @@ -34,10 +34,11 @@ type tun struct { deviceIndex int ioctlFd uintptr - Routes atomic.Pointer[[]Route] - routeTree atomic.Pointer[bart.Table[routing.Gateways]] - routeChan chan struct{} - useSystemRoutes bool + Routes atomic.Pointer[[]Route] + routeTree atomic.Pointer[bart.Table[routing.Gateways]] + routeChan chan struct{} + useSystemRoutes bool + useSystemRoutesBufferSize int l *logrus.Logger } @@ -124,12 +125,13 @@ func newTun(c *config.C, l *logrus.Logger, vpnNetworks []netip.Prefix, multiqueu func newTunGeneric(c *config.C, l *logrus.Logger, file *os.File, vpnNetworks []netip.Prefix) (*tun, error) { t := &tun{ - ReadWriteCloser: file, - fd: int(file.Fd()), - vpnNetworks: vpnNetworks, - TXQueueLen: c.GetInt("tun.tx_queue", 500), - useSystemRoutes: c.GetBool("tun.use_system_route_table", false), - l: l, + ReadWriteCloser: file, + fd: int(file.Fd()), + vpnNetworks: vpnNetworks, + TXQueueLen: c.GetInt("tun.tx_queue", 500), + useSystemRoutes: c.GetBool("tun.use_system_route_table", false), + useSystemRoutesBufferSize: c.GetInt("tun.use_system_route_table_buffer_size", 0), + l: l, } err := t.reload(c, true) @@ -531,7 +533,13 @@ func (t *tun) watchRoutes() { rch := make(chan netlink.RouteUpdate) doneChan := make(chan struct{}) - if err := netlink.RouteSubscribe(rch, doneChan); err != nil { + netlinkOptions := netlink.RouteSubscribeOptions{ + ReceiveBufferSize: t.useSystemRoutesBufferSize, + ReceiveBufferForceSize: t.useSystemRoutesBufferSize != 0, + ErrorCallback: func(e error) { t.l.WithError(e).Errorf("netlink error") }, + } + + if err := netlink.RouteSubscribeWithOptions(rch, doneChan, netlinkOptions); err != nil { t.l.WithError(err).Errorf("failed to subscribe to system route changes") return } @@ -541,8 +549,14 @@ func (t *tun) watchRoutes() { go func() { for { select { - case r := <-rch: - t.updateRoutes(r) + case r, ok := <-rch: + if ok { + t.updateRoutes(r) + } else { + // may be should do something here as + // netlink stops sending updates + return + } case <-doneChan: // netlink.RouteSubscriber will close the rch for us return