//go:build !android && !e2e_testing // +build !android,!e2e_testing package udp import ( "context" "encoding/binary" "fmt" "net" "net/netip" "syscall" "unsafe" "github.com/rcrowley/go-metrics" "github.com/sirupsen/logrus" "github.com/slackhq/nebula/config" "golang.org/x/sys/unix" ) type StdConn struct { udpConn *net.UDPConn rawConn syscall.RawConn isV4 bool l *logrus.Logger batch int } func setReusePort(network, address string, c syscall.RawConn) error { var opErr error err := c.Control(func(fd uintptr) { opErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) //CloseOnExec already set by the runtime }) if err != nil { return err } return opErr } func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch int) (Conn, error) { listen := netip.AddrPortFrom(ip, uint16(port)) lc := net.ListenConfig{} if multi { lc.Control = setReusePort } //this context is only used during the bind operation, you can't cancel it to kill the socket pc, err := lc.ListenPacket(context.Background(), "udp", listen.String()) if err != nil { return nil, fmt.Errorf("unable to open socket: %s", err) } udpConn := pc.(*net.UDPConn) rawConn, err := udpConn.SyscallConn() if err != nil { _ = udpConn.Close() return nil, err } return &StdConn{ udpConn: udpConn, rawConn: rawConn, isV4: ip.Is4(), l: l, batch: batch, }, err } func (u *StdConn) SupportsMultipleReaders() bool { return true } func (u *StdConn) Rebind() error { return nil } func (u *StdConn) getSockOptInt(opt int) (int, error) { if u.rawConn == nil { return 0, fmt.Errorf("no UDP connection") } var out int var opErr error err := u.rawConn.Control(func(fd uintptr) { out, opErr = unix.GetsockoptInt(int(fd), unix.SOL_SOCKET, opt) }) if err != nil { return 0, err } return out, opErr } func (u *StdConn) setSockOptInt(opt int, n int) error { if u.rawConn == nil { return fmt.Errorf("no UDP connection") } var opErr error err := u.rawConn.Control(func(fd uintptr) { opErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, opt, n) }) if err != nil { return err } return opErr } func (u *StdConn) SetRecvBuffer(n int) error { return u.setSockOptInt(unix.SO_RCVBUFFORCE, n) } func (u *StdConn) SetSendBuffer(n int) error { return u.setSockOptInt(unix.SO_SNDBUFFORCE, n) } func (u *StdConn) SetSoMark(mark int) error { return u.setSockOptInt(unix.SO_MARK, mark) } func (u *StdConn) GetRecvBuffer() (int, error) { return u.getSockOptInt(unix.SO_RCVBUF) } func (u *StdConn) GetSendBuffer() (int, error) { return u.getSockOptInt(unix.SO_SNDBUF) } func (u *StdConn) GetSoMark() (int, error) { return u.getSockOptInt(unix.SO_MARK) } func (u *StdConn) LocalAddr() (netip.AddrPort, error) { addr := u.udpConn.LocalAddr() return netip.ParseAddrPort(addr.String()) } func recvmmsg(fd uintptr, msgs []rawMessage) (int, bool, error) { var errno syscall.Errno n, _, errno := unix.Syscall6( unix.SYS_RECVMMSG, fd, uintptr(unsafe.Pointer(&msgs[0])), uintptr(len(msgs)), unix.MSG_WAITFORONE, 0, 0, ) if errno == syscall.EAGAIN || errno == syscall.EWOULDBLOCK { // No data available, block for I/O and try again. return int(n), false, nil } if errno != 0 { return int(n), true, &net.OpError{Op: "recvmmsg", Err: errno} } return int(n), true, nil } func (u *StdConn) listenOutSingle(r EncReader) { var err error var n int var from netip.AddrPort buffer := make([]byte, MTU) for { n, from, err = u.udpConn.ReadFromUDPAddrPort(buffer) if err != nil { u.l.WithError(err).Debug("udp socket is closed, exiting read loop") return } from = netip.AddrPortFrom(from.Addr().Unmap(), from.Port()) r(from, buffer[:n]) } } func (u *StdConn) listenOutBatch(r EncReader) { var ip netip.Addr var n int var operr error msgs, buffers, names := u.PrepareRawMessages(u.batch) //reader needs to capture variables from this function, since it's used as a lambda with rawConn.Read //defining it outside the loop so it gets re-used reader := func(fd uintptr) (done bool) { n, done, operr = recvmmsg(fd, msgs) return done } for { err := u.rawConn.Read(reader) if err != nil { u.l.WithError(err).Debug("udp socket is closed, exiting read loop") return } if operr != nil { u.l.WithError(err).Debug("operr: udp socket is closed, exiting read loop") return } for i := 0; i < n; i++ { // Its ok to skip the ok check here, the slicing is the only error that can occur and it will panic if u.isV4 { ip, _ = netip.AddrFromSlice(names[i][4:8]) } else { ip, _ = netip.AddrFromSlice(names[i][8:24]) } r(netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4])), buffers[i][:msgs[i].Len]) } } } func (u *StdConn) ListenOut(r EncReader) { if u.batch == 1 { //save some ram by not calling PrepareRawMessages for fields we won't use //we could also make this path more common by calling recvmmsg with msgs[:1], //but that's still the recvmmsg syscall, which would be a change u.listenOutSingle(r) } else { u.listenOutBatch(r) } } func (u *StdConn) WriteTo(b []byte, ip netip.AddrPort) error { _, err := u.udpConn.WriteToUDPAddrPort(b, ip) return err } func (u *StdConn) ReloadConfig(c *config.C) { b := c.GetInt("listen.read_buffer", 0) if b > 0 { err := u.SetRecvBuffer(b) if err == nil { s, err := u.GetRecvBuffer() if err == nil { u.l.WithField("size", s).Info("listen.read_buffer was set") } else { u.l.WithError(err).Warn("Failed to get listen.read_buffer") } } else { u.l.WithError(err).Error("Failed to set listen.read_buffer") } } b = c.GetInt("listen.write_buffer", 0) if b > 0 { err := u.SetSendBuffer(b) if err == nil { s, err := u.GetSendBuffer() if err == nil { u.l.WithField("size", s).Info("listen.write_buffer was set") } else { u.l.WithError(err).Warn("Failed to get listen.write_buffer") } } else { u.l.WithError(err).Error("Failed to set listen.write_buffer") } } b = c.GetInt("listen.so_mark", 0) s, err := u.GetSoMark() if b > 0 || (err == nil && s != 0) { err := u.SetSoMark(b) if err == nil { s, err := u.GetSoMark() if err == nil { u.l.WithField("mark", s).Info("listen.so_mark was set") } else { u.l.WithError(err).Warn("Failed to get listen.so_mark") } } else { u.l.WithError(err).Error("Failed to set listen.so_mark") } } } func (u *StdConn) getMemInfo(meminfo *[unix.SK_MEMINFO_VARS]uint32) error { var vallen uint32 = 4 * unix.SK_MEMINFO_VARS if u.rawConn == nil { return fmt.Errorf("no UDP connection") } var opErr error err := u.rawConn.Control(func(fd uintptr) { _, _, opErr = unix.Syscall6(unix.SYS_GETSOCKOPT, fd, uintptr(unix.SOL_SOCKET), uintptr(unix.SO_MEMINFO), uintptr(unsafe.Pointer(meminfo)), uintptr(unsafe.Pointer(&vallen)), 0) }) if err != nil { return err } return opErr } func (u *StdConn) Close() error { if u.udpConn != nil { return u.udpConn.Close() } return nil } func NewUDPStatsEmitter(udpConns []Conn) func() { // Check if our kernel supports SO_MEMINFO before registering the gauges var udpGauges [][unix.SK_MEMINFO_VARS]metrics.Gauge var meminfo [unix.SK_MEMINFO_VARS]uint32 if err := udpConns[0].(*StdConn).getMemInfo(&meminfo); err == nil { udpGauges = make([][unix.SK_MEMINFO_VARS]metrics.Gauge, len(udpConns)) for i := range udpConns { udpGauges[i] = [unix.SK_MEMINFO_VARS]metrics.Gauge{ metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.rmem_alloc", i), nil), metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.rcvbuf", i), nil), metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.wmem_alloc", i), nil), metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.sndbuf", i), nil), metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.fwd_alloc", i), nil), metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.wmem_queued", i), nil), metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.optmem", i), nil), metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.backlog", i), nil), metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.drops", i), nil), } } } return func() { for i, gauges := range udpGauges { if err := udpConns[i].(*StdConn).getMemInfo(&meminfo); err == nil { for j := 0; j < unix.SK_MEMINFO_VARS; j++ { gauges[j].Update(int64(meminfo[j])) } } } } }