mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-15 20:37:36 +02:00
Remove more os.Exit calls and give a more reliable wait for stop function (attempt 3) (#1661)
This commit is contained in:
95
interface.go
95
interface.go
@@ -6,7 +6,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/netip"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -87,6 +87,13 @@ type Interface struct {
|
||||
|
||||
writers []udp.Conn
|
||||
readers []io.ReadWriteCloser
|
||||
wg sync.WaitGroup
|
||||
|
||||
// fatalErr holds the first unexpected reader error that caused shutdown.
|
||||
// nil means "no fatal error" (yet)
|
||||
fatalErr atomic.Pointer[error]
|
||||
// triggerShutdown is a function that will be run exactly once, when onFatal swaps something non-nil into fatalErr
|
||||
triggerShutdown func()
|
||||
|
||||
metricHandshakes metrics.Histogram
|
||||
messageMetrics *MessageMetrics
|
||||
@@ -209,7 +216,7 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
|
||||
// activate creates the interface on the host. After the interface is created, any
|
||||
// other services that want to bind listeners to its IP may do so successfully. However,
|
||||
// the interface isn't going to process anything until run() is called.
|
||||
func (f *Interface) activate() {
|
||||
func (f *Interface) activate() error {
|
||||
// actually turn on tun dev
|
||||
|
||||
addr, err := f.outside.LocalAddr()
|
||||
@@ -237,27 +244,54 @@ func (f *Interface) activate() {
|
||||
if i > 0 {
|
||||
reader, err = f.inside.NewMultiQueueReader()
|
||||
if err != nil {
|
||||
f.l.Fatal(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
f.readers[i] = reader
|
||||
}
|
||||
|
||||
if err := f.inside.Activate(); err != nil {
|
||||
f.wg.Add(1) // for us to wait on Close() to return
|
||||
if err = f.inside.Activate(); err != nil {
|
||||
f.wg.Done()
|
||||
f.inside.Close()
|
||||
f.l.Fatal(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *Interface) run() {
|
||||
func (f *Interface) run() (func() error, error) {
|
||||
// Launch n queues to read packets from udp
|
||||
for i := 0; i < f.routines; i++ {
|
||||
go f.listenOut(i)
|
||||
f.wg.Go(func() {
|
||||
f.listenOut(i)
|
||||
})
|
||||
}
|
||||
|
||||
// Launch n queues to read packets from tun dev
|
||||
for i := 0; i < f.routines; i++ {
|
||||
go f.listenIn(f.readers[i], i)
|
||||
f.wg.Go(func() {
|
||||
f.listenIn(f.readers[i], i)
|
||||
})
|
||||
}
|
||||
|
||||
return func() error {
|
||||
f.wg.Wait()
|
||||
if e := f.fatalErr.Load(); e != nil {
|
||||
return *e
|
||||
}
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
// onFatal stores the first fatal reader error, and calls triggerShutdown if it was the first one
|
||||
func (f *Interface) onFatal(err error) {
|
||||
swapped := f.fatalErr.CompareAndSwap(nil, &err)
|
||||
if !swapped {
|
||||
return
|
||||
}
|
||||
if f.triggerShutdown != nil {
|
||||
f.triggerShutdown()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -276,9 +310,16 @@ func (f *Interface) listenOut(i int) {
|
||||
fwPacket := &firewall.Packet{}
|
||||
nb := make([]byte, 12, 12)
|
||||
|
||||
li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) {
|
||||
err := li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) {
|
||||
f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l))
|
||||
})
|
||||
|
||||
if err != nil && !f.closed.Load() {
|
||||
f.l.WithError(err).Error("Error while reading inbound packet, closing")
|
||||
f.onFatal(err)
|
||||
}
|
||||
|
||||
f.l.Debugf("underlay reader %v is done", i)
|
||||
}
|
||||
|
||||
func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
|
||||
@@ -292,17 +333,17 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
|
||||
for {
|
||||
n, err := reader.Read(packet)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrClosed) && f.closed.Load() {
|
||||
return
|
||||
if !f.closed.Load() {
|
||||
f.l.WithError(err).WithField("reader", i).Error("Error while reading outbound packet, closing")
|
||||
f.onFatal(err)
|
||||
}
|
||||
|
||||
f.l.WithError(err).Error("Error while reading outbound packet")
|
||||
// This only seems to happen when something fatal happens to the fd, so exit.
|
||||
os.Exit(2)
|
||||
break
|
||||
}
|
||||
|
||||
f.consumeInsidePacket(packet[:n], fwPacket, nb, out, i, conntrackCache.Get(f.l))
|
||||
}
|
||||
|
||||
f.l.Debugf("overlay reader %v is done", i)
|
||||
}
|
||||
|
||||
func (f *Interface) RegisterConfigChangeCallbacks(c *config.C) {
|
||||
@@ -477,23 +518,23 @@ func (f *Interface) GetCertState() *CertState {
|
||||
}
|
||||
|
||||
func (f *Interface) Close() error {
|
||||
var errs []error
|
||||
f.closed.Store(true)
|
||||
|
||||
for _, u := range f.writers {
|
||||
// Release the udp readers
|
||||
for i, u := range f.writers {
|
||||
err := u.Close()
|
||||
if err != nil {
|
||||
f.l.WithError(err).Error("Error while closing udp socket")
|
||||
}
|
||||
}
|
||||
for i, r := range f.readers {
|
||||
if i == 0 {
|
||||
continue // f.readers[0] is f.inside, which we want to save for last
|
||||
}
|
||||
if err := r.Close(); err != nil {
|
||||
f.l.WithError(err).Error("Error while closing tun reader")
|
||||
f.l.WithError(err).WithField("writer", i).Error("Error while closing udp socket")
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Release the tun device
|
||||
return f.inside.Close()
|
||||
// Release the tun device (closing the tun also closes all readers)
|
||||
closeErr := f.inside.Close()
|
||||
if closeErr != nil {
|
||||
errs = append(errs, closeErr)
|
||||
}
|
||||
f.wg.Done()
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user