diff --git a/udp/udp_linux.go b/udp/udp_linux.go index a90e96d3..5b5d4ea1 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -9,6 +9,7 @@ import ( "net" "net/netip" "syscall" + "time" "unsafe" "github.com/rcrowley/go-metrics" @@ -17,6 +18,8 @@ import ( "golang.org/x/sys/unix" ) +var readTimeout = unix.NsecToTimeval(int64(3 * time.Second)) + type StdConn struct { sysFd int isV4 bool @@ -47,6 +50,11 @@ func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch in } } + // Set a read timeout + if err = unix.SetsockoptTimeval(fd, unix.SOL_SOCKET, unix.SO_RCVTIMEO, &readTimeout); err != nil { + return nil, fmt.Errorf("unable to set SO_RCVTIMEO: %s", err) + } + var sa unix.Sockaddr if ip.Is4() { sa4 := &unix.SockaddrInet4{Port: port} @@ -154,6 +162,9 @@ func (u *StdConn) ReadSingle(msgs []rawMessage) (int, error) { ) if err != 0 { + if err == unix.EAGAIN || err == unix.EINTR { + continue + } return 0, &net.OpError{Op: "recvmsg", Err: err} } @@ -175,6 +186,9 @@ func (u *StdConn) ReadMulti(msgs []rawMessage) (int, error) { ) if err != 0 { + if err == unix.EAGAIN || err == unix.EINTR { + continue + } return 0, &net.OpError{Op: "recvmmsg", Err: err} } @@ -301,7 +315,6 @@ func (u *StdConn) getMemInfo(meminfo *[unix.SK_MEMINFO_VARS]uint32) error { } func (u *StdConn) Close() error { - _ = syscall.Shutdown(u.sysFd, syscall.SHUT_RDWR) return syscall.Close(u.sysFd) }