Cleanup and note more work

This commit is contained in:
Nate Brown
2025-04-16 21:56:53 -05:00
committed by JackDoan
parent 1f01a5543b
commit e96e9e3cfa
5 changed files with 20 additions and 15 deletions

View File

@@ -256,14 +256,14 @@ func (f *Interface) activate() error {
func (f *Interface) run() (func(), error) { func (f *Interface) run() (func(), error) {
// Launch n queues to read packets from udp // Launch n queues to read packets from udp
for i := 0; i < f.routines; i++ { for i := 0; i < f.routines; i++ {
go f.listenOut(i)
f.wg.Add(1) f.wg.Add(1)
go f.listenOut(i)
} }
// Launch n queues to read packets from tun dev // Launch n queues to read packets from tun dev
for i := 0; i < f.routines; i++ { for i := 0; i < f.routines; i++ {
go f.listenIn(f.readers[i], i)
f.wg.Add(1) f.wg.Add(1)
go f.listenIn(f.readers[i], i)
} }
return f.wg.Wait, nil return f.wg.Wait, nil
@@ -285,15 +285,21 @@ func (f *Interface) listenOut(i int) {
fwPacket := &firewall.Packet{} fwPacket := &firewall.Packet{}
nb := make([]byte, 12, 12) nb := make([]byte, 12, 12)
li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) { err := li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) {
f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l)) f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l))
}) })
f.l.Errorf("udp reader %v is done", i) if err != nil && !f.closed.Load() {
f.l.WithError(err).Error("Error while reading packet inbound packet, closing")
//TODO: Trigger Control to close
}
f.l.Debugf("underlay reader %v is done", i)
f.wg.Done() f.wg.Done()
} }
func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) { func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
runtime.LockOSThread()
packet := make([]byte, mtu) packet := make([]byte, mtu)
out := make([]byte, mtu) out := make([]byte, mtu)
fwPacket := &firewall.Packet{} fwPacket := &firewall.Packet{}
@@ -305,8 +311,8 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
n, err := reader.Read(packet) n, err := reader.Read(packet)
if err != nil { if err != nil {
if !f.closed.Load() { if !f.closed.Load() {
//TODO: should we close? yes f.l.WithError(err).Error("Error while reading outbound packet, closing")
f.l.WithError(err).Error("Error while reading outbound packet") //TODO: Trigger Control to close
} }
break break
} }
@@ -314,7 +320,7 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
f.consumeInsidePacket(packet[:n], fwPacket, nb, out, i, conntrackCache.Get(f.l)) f.consumeInsidePacket(packet[:n], fwPacket, nb, out, i, conntrackCache.Get(f.l))
} }
f.l.Errorf("tun reader %v is done", i) f.l.Debugf("overlay reader %v is done", i)
f.wg.Done() f.wg.Done()
} }

View File

@@ -16,7 +16,7 @@ type EncReader func(
type Conn interface { type Conn interface {
Rebind() error Rebind() error
LocalAddr() (netip.AddrPort, error) LocalAddr() (netip.AddrPort, error)
ListenOut(r EncReader) ListenOut(r EncReader) error
WriteTo(b []byte, addr netip.AddrPort) error WriteTo(b []byte, addr netip.AddrPort) error
ReloadConfig(c *config.C) ReloadConfig(c *config.C)
SupportsMultipleReaders() bool SupportsMultipleReaders() bool

View File

@@ -73,7 +73,7 @@ type rawMessage struct {
Len uint32 Len uint32
} }
func (u *GenericConn) ListenOut(r EncReader) { func (u *GenericConn) ListenOut(r EncReader) error {
buffer := make([]byte, MTU) buffer := make([]byte, MTU)
var lastRecvErr time.Time var lastRecvErr time.Time
@@ -82,9 +82,9 @@ func (u *GenericConn) ListenOut(r EncReader) {
// Just read one packet at a time // Just read one packet at a time
n, rua, err := u.ReadFromUDPAddrPort(buffer) n, rua, err := u.ReadFromUDPAddrPort(buffer)
if err != nil { if err != nil {
return err
if errors.Is(err, net.ErrClosed) { if errors.Is(err, net.ErrClosed) {
u.l.WithError(err).Debug("udp socket is closed, exiting read loop") return err
return
} }
// Dampen unexpected message warns to once per minute // Dampen unexpected message warns to once per minute
if lastRecvErr.IsZero() || time.Since(lastRecvErr) > time.Minute { if lastRecvErr.IsZero() || time.Since(lastRecvErr) > time.Minute {

View File

@@ -122,7 +122,7 @@ func (u *StdConn) LocalAddr() (netip.AddrPort, error) {
} }
} }
func (u *StdConn) ListenOut(r EncReader) { func (u *StdConn) ListenOut(r EncReader) error {
var ip netip.Addr var ip netip.Addr
msgs, buffers, names := u.PrepareRawMessages(u.batch) msgs, buffers, names := u.PrepareRawMessages(u.batch)
@@ -134,8 +134,7 @@ func (u *StdConn) ListenOut(r EncReader) {
for { for {
n, err := read(msgs) n, err := read(msgs)
if err != nil { if err != nil {
u.l.WithError(err).Debug("udp socket is closed, exiting read loop") return err
return
} }
for i := 0; i < n; i++ { for i := 0; i < n; i++ {

View File

@@ -140,7 +140,7 @@ func (u *RIOConn) bind(l *logrus.Logger, sa windows.Sockaddr) error {
return nil return nil
} }
func (u *RIOConn) ListenOut(r EncReader) { func (u *RIOConn) ListenOut(r EncReader) error {
buffer := make([]byte, MTU) buffer := make([]byte, MTU)
var lastRecvErr time.Time var lastRecvErr time.Time