Fix e2e tests writing after the tester tun is closed causing a panic (#1681)
Some checks failed
gofmt / Run gofmt (push) Failing after 3s
smoke-extra / Run extra smoke tests (push) Failing after 3s
smoke / Run multi node smoke test (push) Failing after 3s
Build and test / Build all and test on ubuntu-linux (push) Failing after 2s
Build and test / Build and test on linux with boringcrypto (push) Failing after 3s
Build and test / Build and test on linux with pkcs11 (push) Failing after 2s
Build and test / Build and test on macos-latest (push) Has been cancelled
Build and test / Build and test on windows-latest (push) Has been cancelled

This commit is contained in:
Nate Brown
2026-04-22 17:18:06 -05:00
committed by GitHub
parent 2a1cc62001
commit 5f00ab4b74

View File

@@ -7,7 +7,7 @@ import (
"io" "io"
"net/netip" "net/netip"
"os" "os"
"sync/atomic" "sync"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/slackhq/nebula/config" "github.com/slackhq/nebula/config"
@@ -37,8 +37,16 @@ type TesterConn struct {
RxPackets chan *Packet // Packets to receive into nebula RxPackets chan *Packet // Packets to receive into nebula
TxPackets chan *Packet // Packets transmitted outside by nebula TxPackets chan *Packet // Packets transmitted outside by nebula
closed atomic.Bool // done is closed exactly once by Close. Senders select on it so they
l *logrus.Logger // never race with a channel close; readers exit when it fires. The
// packet channels are intentionally never closed - that was the source
// of `send on closed channel` panics when a WriteTo/Send from another
// goroutine passed the close check and reached the send just after
// Close ran.
done chan struct{}
closeOnce sync.Once
l *logrus.Logger
} }
func NewListener(l *logrus.Logger, ip netip.Addr, port int, _ bool, _ int) (Conn, error) { func NewListener(l *logrus.Logger, ip netip.Addr, port int, _ bool, _ int) (Conn, error) {
@@ -46,6 +54,7 @@ func NewListener(l *logrus.Logger, ip netip.Addr, port int, _ bool, _ int) (Conn
Addr: netip.AddrPortFrom(ip, uint16(port)), Addr: netip.AddrPortFrom(ip, uint16(port)),
RxPackets: make(chan *Packet, 10), RxPackets: make(chan *Packet, 10),
TxPackets: make(chan *Packet, 10), TxPackets: make(chan *Packet, 10),
done: make(chan struct{}),
l: l, l: l,
}, nil }, nil
} }
@@ -54,10 +63,6 @@ func NewListener(l *logrus.Logger, ip netip.Addr, port int, _ bool, _ int) (Conn
// this is an encrypted packet or a handshake message in most cases // this is an encrypted packet or a handshake message in most cases
// packets were transmitted from another nebula node, you can send them with Tun.Send // packets were transmitted from another nebula node, you can send them with Tun.Send
func (u *TesterConn) Send(packet *Packet) { func (u *TesterConn) Send(packet *Packet) {
if u.closed.Load() {
return
}
h := &header.H{} h := &header.H{}
if err := h.Parse(packet.Data); err != nil { if err := h.Parse(packet.Data); err != nil {
panic(err) panic(err)
@@ -68,7 +73,10 @@ func (u *TesterConn) Send(packet *Packet) {
WithField("dataLen", len(packet.Data)). WithField("dataLen", len(packet.Data)).
Debug("UDP receiving injected packet") Debug("UDP receiving injected packet")
} }
u.RxPackets <- packet select {
case <-u.done:
case u.RxPackets <- packet:
}
} }
// Get will pull a UdpPacket from the transmit queue // Get will pull a UdpPacket from the transmit queue
@@ -76,7 +84,12 @@ func (u *TesterConn) Send(packet *Packet) {
// packets were ingested from the tun side (in most cases), you can send them with Tun.Send // packets were ingested from the tun side (in most cases), you can send them with Tun.Send
func (u *TesterConn) Get(block bool) *Packet { func (u *TesterConn) Get(block bool) *Packet {
if block { if block {
return <-u.TxPackets select {
case <-u.done:
return nil
case p := <-u.TxPackets:
return p
}
} }
select { select {
@@ -92,10 +105,6 @@ func (u *TesterConn) Get(block bool) *Packet {
//********************************************************************************************************************// //********************************************************************************************************************//
func (u *TesterConn) WriteTo(b []byte, addr netip.AddrPort) error { func (u *TesterConn) WriteTo(b []byte, addr netip.AddrPort) error {
if u.closed.Load() {
return io.ErrClosedPipe
}
p := &Packet{ p := &Packet{
Data: make([]byte, len(b), len(b)), Data: make([]byte, len(b), len(b)),
From: u.Addr, From: u.Addr,
@@ -103,17 +112,22 @@ func (u *TesterConn) WriteTo(b []byte, addr netip.AddrPort) error {
} }
copy(p.Data, b) copy(p.Data, b)
u.TxPackets <- p select {
return nil case <-u.done:
return io.ErrClosedPipe
case u.TxPackets <- p:
return nil
}
} }
func (u *TesterConn) ListenOut(r EncReader) error { func (u *TesterConn) ListenOut(r EncReader) error {
for { for {
p, ok := <-u.RxPackets select {
if !ok { case <-u.done:
return os.ErrClosed return os.ErrClosed
case p := <-u.RxPackets:
r(p.From, p.Data)
} }
r(p.From, p.Data)
} }
} }
@@ -137,9 +151,8 @@ func (u *TesterConn) Rebind() error {
} }
func (u *TesterConn) Close() error { func (u *TesterConn) Close() error {
if u.closed.CompareAndSwap(false, true) { u.closeOnce.Do(func() {
close(u.RxPackets) close(u.done)
close(u.TxPackets) })
}
return nil return nil
} }