Remove more os.Exit calls and give a more reliable wait for stop function

This commit is contained in:
Nate Brown
2025-04-02 09:51:59 -05:00
committed by JackDoan
parent f77fe74192
commit 6592a07b51
13 changed files with 161 additions and 81 deletions

View File

@@ -16,7 +16,7 @@ type EncReader func(
type Conn interface {
Rebind() error
LocalAddr() (netip.AddrPort, error)
ListenOut(r EncReader)
ListenOut(r EncReader) error
WriteTo(b []byte, addr netip.AddrPort) error
ReloadConfig(c *config.C)
SupportsMultipleReaders() bool
@@ -31,8 +31,8 @@ func (NoopConn) Rebind() error {
func (NoopConn) LocalAddr() (netip.AddrPort, error) {
return netip.AddrPort{}, nil
}
func (NoopConn) ListenOut(_ EncReader) {
return
func (NoopConn) ListenOut(_ EncReader) error {
return nil
}
func (NoopConn) SupportsMultipleReaders() bool {
return false

View File

@@ -165,7 +165,7 @@ func NewUDPStatsEmitter(udpConns []Conn) func() {
return func() {}
}
func (u *StdConn) ListenOut(r EncReader) {
func (u *StdConn) ListenOut(r EncReader) error {
buffer := make([]byte, MTU)
for {
@@ -173,8 +173,7 @@ func (u *StdConn) ListenOut(r EncReader) {
n, rua, err := u.ReadFromUDPAddrPort(buffer)
if err != nil {
if errors.Is(err, net.ErrClosed) {
u.l.WithError(err).Debug("udp socket is closed, exiting read loop")
return
return err
}
u.l.WithError(err).Error("unexpected udp socket receive error")

View File

@@ -10,11 +10,9 @@ package udp
import (
"context"
"errors"
"fmt"
"net"
"net/netip"
"time"
"github.com/sirupsen/logrus"
"github.com/slackhq/nebula/config"
@@ -73,25 +71,14 @@ type rawMessage struct {
Len uint32
}
func (u *GenericConn) ListenOut(r EncReader) {
func (u *GenericConn) ListenOut(r EncReader) error {
buffer := make([]byte, MTU)
var lastRecvErr time.Time
for {
// Just read one packet at a time
n, rua, err := u.ReadFromUDPAddrPort(buffer)
if err != nil {
if errors.Is(err, net.ErrClosed) {
u.l.WithError(err).Debug("udp socket is closed, exiting read loop")
return
}
// Dampen unexpected message warns to once per minute
if lastRecvErr.IsZero() || time.Since(lastRecvErr) > time.Minute {
lastRecvErr = time.Now()
u.l.WithError(err).Warn("unexpected udp socket receive error")
}
continue
return err
}
r(netip.AddrPortFrom(rua.Addr().Unmap(), rua.Port()), buffer[:n])

View File

@@ -171,7 +171,7 @@ func recvmmsg(fd uintptr, msgs []rawMessage) (int, bool, error) {
return int(n), true, nil
}
func (u *StdConn) listenOutSingle(r EncReader) {
func (u *StdConn) listenOutSingle(r EncReader) error {
var err error
var n int
var from netip.AddrPort
@@ -180,15 +180,14 @@ func (u *StdConn) listenOutSingle(r EncReader) {
for {
n, from, err = u.udpConn.ReadFromUDPAddrPort(buffer)
if err != nil {
u.l.WithError(err).Debug("udp socket is closed, exiting read loop")
return
return err
}
from = netip.AddrPortFrom(from.Addr().Unmap(), from.Port())
r(from, buffer[:n])
}
}
func (u *StdConn) listenOutBatch(r EncReader) {
func (u *StdConn) listenOutBatch(r EncReader) error {
var ip netip.Addr
var n int
var operr error
@@ -205,12 +204,10 @@ func (u *StdConn) listenOutBatch(r EncReader) {
for {
err := u.rawConn.Read(reader)
if err != nil {
u.l.WithError(err).Debug("udp socket is closed, exiting read loop")
return
return err
}
if operr != nil {
u.l.WithError(operr).Debug("operr: udp socket is closed, exiting read loop")
return
return operr
}
for i := 0; i < n; i++ {
@@ -225,14 +222,11 @@ func (u *StdConn) listenOutBatch(r EncReader) {
}
}
func (u *StdConn) ListenOut(r EncReader) {
func (u *StdConn) ListenOut(r EncReader) error {
if u.batch == 1 {
//save some ram by not calling PrepareRawMessages for fields we won't use
//we could also make this path more common by calling recvmmsg with msgs[:1],
//but that's still the recvmmsg syscall, which would be a change
u.listenOutSingle(r)
return u.listenOutSingle(r)
} else {
u.listenOutBatch(r)
return u.listenOutBatch(r)
}
}
@@ -290,7 +284,7 @@ func (u *StdConn) ReloadConfig(c *config.C) {
}
func (u *StdConn) getMemInfo(meminfo *[unix.SK_MEMINFO_VARS]uint32) error {
var vallen uint32 = 4 * unix.SK_MEMINFO_VARS
const vallen uint32 = 4 * unix.SK_MEMINFO_VARS
if u.rawConn == nil {
return fmt.Errorf("no UDP connection")

View File

@@ -140,7 +140,7 @@ func (u *RIOConn) bind(l *logrus.Logger, sa windows.Sockaddr) error {
return nil
}
func (u *RIOConn) ListenOut(r EncReader) {
func (u *RIOConn) ListenOut(r EncReader) error {
buffer := make([]byte, MTU)
var lastRecvErr time.Time
@@ -151,8 +151,7 @@ func (u *RIOConn) ListenOut(r EncReader) {
if err != nil {
if errors.Is(err, net.ErrClosed) {
u.l.WithError(err).Debug("udp socket is closed, exiting read loop")
return
return err
}
// Dampen unexpected message warns to once per minute
if lastRecvErr.IsZero() || time.Since(lastRecvErr) > time.Minute {

View File

@@ -6,6 +6,7 @@ package udp
import (
"io"
"net/netip"
"os"
"sync/atomic"
"github.com/sirupsen/logrus"
@@ -106,11 +107,11 @@ func (u *TesterConn) WriteTo(b []byte, addr netip.AddrPort) error {
return nil
}
func (u *TesterConn) ListenOut(r EncReader) {
func (u *TesterConn) ListenOut(r EncReader) error {
for {
p, ok := <-u.RxPackets
if !ok {
return
return os.ErrClosed
}
r(p.From, p.Data)
}