From d7c0b643f833134b128823df10be03cc2312813b Mon Sep 17 00:00:00 2001 From: Nate Brown Date: Wed, 16 Apr 2025 21:56:53 -0500 Subject: [PATCH] Cleanup and note more work --- interface.go | 20 +++++++++++++------- udp/conn.go | 2 +- udp/udp_generic.go | 5 ++--- udp/udp_linux.go | 5 ++--- udp/udp_rio_windows.go | 5 ++--- 5 files changed, 20 insertions(+), 17 deletions(-) diff --git a/interface.go b/interface.go index 79e642e..5d4b327 100644 --- a/interface.go +++ b/interface.go @@ -245,14 +245,14 @@ func (f *Interface) activate() error { func (f *Interface) run() (func(), error) { // Launch n queues to read packets from udp for i := 0; i < f.routines; i++ { - go f.listenOut(i) f.wg.Add(1) + go f.listenOut(i) } // Launch n queues to read packets from tun dev for i := 0; i < f.routines; i++ { - go f.listenIn(f.readers[i], i) f.wg.Add(1) + go f.listenIn(f.readers[i], i) } return f.wg.Wait, nil @@ -274,15 +274,21 @@ func (f *Interface) listenOut(i int) { fwPacket := &firewall.Packet{} nb := make([]byte, 12, 12) - li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) { + err := li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) { f.readOutsidePackets(fromUdpAddr, nil, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l)) }) - f.l.Errorf("udp reader %v is done", i) + if err != nil && !f.closed.Load() { + f.l.WithError(err).Error("Error while reading packet inbound packet, closing") + //TODO: Trigger Control to close + } + + f.l.Debugf("underlay reader %v is done", i) f.wg.Done() } func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) { + runtime.LockOSThread() packet := make([]byte, mtu) out := make([]byte, mtu) fwPacket := &firewall.Packet{} @@ -294,8 +300,8 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) { n, err := reader.Read(packet) if err != nil { if !f.closed.Load() { - //TODO: should we close? yes - f.l.WithError(err).Error("Error while reading outbound packet") + f.l.WithError(err).Error("Error while reading outbound packet, closing") + //TODO: Trigger Control to close } break } @@ -303,7 +309,7 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) { f.consumeInsidePacket(packet[:n], fwPacket, nb, out, i, conntrackCache.Get(f.l)) } - f.l.Errorf("tun reader %v is done", i) + f.l.Debugf("overlay reader %v is done", i) f.wg.Done() } diff --git a/udp/conn.go b/udp/conn.go index 895b0df..27fcd22 100644 --- a/udp/conn.go +++ b/udp/conn.go @@ -16,7 +16,7 @@ type EncReader func( type Conn interface { Rebind() error LocalAddr() (netip.AddrPort, error) - ListenOut(r EncReader) + ListenOut(r EncReader) error WriteTo(b []byte, addr netip.AddrPort) error ReloadConfig(c *config.C) Close() error diff --git a/udp/udp_generic.go b/udp/udp_generic.go index 06a4d53..f05fb1a 100644 --- a/udp/udp_generic.go +++ b/udp/udp_generic.go @@ -70,15 +70,14 @@ type rawMessage struct { Len uint32 } -func (u *GenericConn) ListenOut(r EncReader) { +func (u *GenericConn) ListenOut(r EncReader) error { buffer := make([]byte, MTU) for { // Just read one packet at a time n, rua, err := u.ReadFromUDPAddrPort(buffer) if err != nil { - u.l.WithError(err).Debug("udp socket is closed, exiting read loop") - return + return err } r(netip.AddrPortFrom(rua.Addr().Unmap(), rua.Port()), buffer[:n]) diff --git a/udp/udp_linux.go b/udp/udp_linux.go index 4bfcc3b..e3df48f 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -118,7 +118,7 @@ func (u *StdConn) LocalAddr() (netip.AddrPort, error) { } } -func (u *StdConn) ListenOut(r EncReader) { +func (u *StdConn) ListenOut(r EncReader) error { var ip netip.Addr msgs, buffers, names := u.PrepareRawMessages(u.batch) @@ -130,8 +130,7 @@ func (u *StdConn) ListenOut(r EncReader) { for { n, err := read(msgs) if err != nil { - u.l.WithError(err).Debug("udp socket is closed, exiting read loop") - return + return err } for i := 0; i < n; i++ { diff --git a/udp/udp_rio_windows.go b/udp/udp_rio_windows.go index 585b642..c8d420f 100644 --- a/udp/udp_rio_windows.go +++ b/udp/udp_rio_windows.go @@ -115,15 +115,14 @@ func (u *RIOConn) bind(sa windows.Sockaddr) error { return nil } -func (u *RIOConn) ListenOut(r EncReader) { +func (u *RIOConn) ListenOut(r EncReader) error { buffer := make([]byte, MTU) for { // Just read one packet at a time n, rua, err := u.receive(buffer) if err != nil { - u.l.WithError(err).Debug("udp socket is closed, exiting read loop") - return + return err } r(netip.AddrPortFrom(netip.AddrFrom16(rua.Addr).Unmap(), (rua.Port>>8)|((rua.Port&0xff)<<8)), buffer[:n])