mirror of
https://github.com/slackhq/nebula.git
synced 2025-11-09 00:43:57 +01:00
Cleanup and note more work
This commit is contained in:
parent
a46222a2fe
commit
d7c0b643f8
20
interface.go
20
interface.go
@ -245,14 +245,14 @@ func (f *Interface) activate() error {
|
||||
func (f *Interface) run() (func(), error) {
|
||||
// Launch n queues to read packets from udp
|
||||
for i := 0; i < f.routines; i++ {
|
||||
go f.listenOut(i)
|
||||
f.wg.Add(1)
|
||||
go f.listenOut(i)
|
||||
}
|
||||
|
||||
// Launch n queues to read packets from tun dev
|
||||
for i := 0; i < f.routines; i++ {
|
||||
go f.listenIn(f.readers[i], i)
|
||||
f.wg.Add(1)
|
||||
go f.listenIn(f.readers[i], i)
|
||||
}
|
||||
|
||||
return f.wg.Wait, nil
|
||||
@ -274,15 +274,21 @@ func (f *Interface) listenOut(i int) {
|
||||
fwPacket := &firewall.Packet{}
|
||||
nb := make([]byte, 12, 12)
|
||||
|
||||
li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) {
|
||||
err := li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) {
|
||||
f.readOutsidePackets(fromUdpAddr, nil, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l))
|
||||
})
|
||||
|
||||
f.l.Errorf("udp reader %v is done", i)
|
||||
if err != nil && !f.closed.Load() {
|
||||
f.l.WithError(err).Error("Error while reading packet inbound packet, closing")
|
||||
//TODO: Trigger Control to close
|
||||
}
|
||||
|
||||
f.l.Debugf("underlay reader %v is done", i)
|
||||
f.wg.Done()
|
||||
}
|
||||
|
||||
func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
|
||||
runtime.LockOSThread()
|
||||
packet := make([]byte, mtu)
|
||||
out := make([]byte, mtu)
|
||||
fwPacket := &firewall.Packet{}
|
||||
@ -294,8 +300,8 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
|
||||
n, err := reader.Read(packet)
|
||||
if err != nil {
|
||||
if !f.closed.Load() {
|
||||
//TODO: should we close? yes
|
||||
f.l.WithError(err).Error("Error while reading outbound packet")
|
||||
f.l.WithError(err).Error("Error while reading outbound packet, closing")
|
||||
//TODO: Trigger Control to close
|
||||
}
|
||||
break
|
||||
}
|
||||
@ -303,7 +309,7 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
|
||||
f.consumeInsidePacket(packet[:n], fwPacket, nb, out, i, conntrackCache.Get(f.l))
|
||||
}
|
||||
|
||||
f.l.Errorf("tun reader %v is done", i)
|
||||
f.l.Debugf("overlay reader %v is done", i)
|
||||
f.wg.Done()
|
||||
}
|
||||
|
||||
|
||||
@ -16,7 +16,7 @@ type EncReader func(
|
||||
type Conn interface {
|
||||
Rebind() error
|
||||
LocalAddr() (netip.AddrPort, error)
|
||||
ListenOut(r EncReader)
|
||||
ListenOut(r EncReader) error
|
||||
WriteTo(b []byte, addr netip.AddrPort) error
|
||||
ReloadConfig(c *config.C)
|
||||
Close() error
|
||||
|
||||
@ -70,15 +70,14 @@ type rawMessage struct {
|
||||
Len uint32
|
||||
}
|
||||
|
||||
func (u *GenericConn) ListenOut(r EncReader) {
|
||||
func (u *GenericConn) ListenOut(r EncReader) error {
|
||||
buffer := make([]byte, MTU)
|
||||
|
||||
for {
|
||||
// Just read one packet at a time
|
||||
n, rua, err := u.ReadFromUDPAddrPort(buffer)
|
||||
if err != nil {
|
||||
u.l.WithError(err).Debug("udp socket is closed, exiting read loop")
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
r(netip.AddrPortFrom(rua.Addr().Unmap(), rua.Port()), buffer[:n])
|
||||
|
||||
@ -118,7 +118,7 @@ func (u *StdConn) LocalAddr() (netip.AddrPort, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (u *StdConn) ListenOut(r EncReader) {
|
||||
func (u *StdConn) ListenOut(r EncReader) error {
|
||||
var ip netip.Addr
|
||||
|
||||
msgs, buffers, names := u.PrepareRawMessages(u.batch)
|
||||
@ -130,8 +130,7 @@ func (u *StdConn) ListenOut(r EncReader) {
|
||||
for {
|
||||
n, err := read(msgs)
|
||||
if err != nil {
|
||||
u.l.WithError(err).Debug("udp socket is closed, exiting read loop")
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
|
||||
@ -115,15 +115,14 @@ func (u *RIOConn) bind(sa windows.Sockaddr) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *RIOConn) ListenOut(r EncReader) {
|
||||
func (u *RIOConn) ListenOut(r EncReader) error {
|
||||
buffer := make([]byte, MTU)
|
||||
|
||||
for {
|
||||
// Just read one packet at a time
|
||||
n, rua, err := u.receive(buffer)
|
||||
if err != nil {
|
||||
u.l.WithError(err).Debug("udp socket is closed, exiting read loop")
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
r(netip.AddrPortFrom(netip.AddrFrom16(rua.Addr).Unmap(), (rua.Port>>8)|((rua.Port&0xff)<<8)), buffer[:n])
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user