This commit is contained in:
JackDoan
2025-11-07 14:26:35 -06:00
parent 3ec527e42c
commit ac5382928e
50 changed files with 4013 additions and 123 deletions

View File

@@ -14,22 +14,22 @@ import (
"github.com/rcrowley/go-metrics"
"github.com/sirupsen/logrus"
"github.com/slackhq/nebula/config"
"github.com/slackhq/nebula/packet"
"golang.org/x/sys/unix"
)
type StdConn struct {
sysFd int
isV4 bool
l *logrus.Logger
batch int
}
const iovMax = 128 //1024 //no unix constant for this? from limits.h
//todo I'd like this to be 1024 but we seem to hit errors around ~130?
func maybeIPV4(ip net.IP) (net.IP, bool) {
ip4 := ip.To4()
if ip4 != nil {
return ip4, true
}
return ip, false
type StdConn struct {
sysFd int
isV4 bool
l *logrus.Logger
batch int
enableGRO bool
msgs []rawMessage
iovs [][]iovec
}
func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch int) (Conn, error) {
@@ -69,7 +69,20 @@ func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch in
return nil, fmt.Errorf("unable to bind to socket: %s", err)
}
return &StdConn{sysFd: fd, isV4: ip.Is4(), l: l, batch: batch}, err
const batchSize = 8192
msgs := make([]rawMessage, 0, batchSize) //todo configure
iovs := make([][]iovec, batchSize)
for i := range iovs {
iovs[i] = make([]iovec, iovMax)
}
return &StdConn{
sysFd: fd,
isV4: ip.Is4(),
l: l,
batch: batch,
msgs: msgs,
iovs: iovs,
}, err
}
func (u *StdConn) SupportsMultipleReaders() bool {
@@ -123,9 +136,7 @@ func (u *StdConn) LocalAddr() (netip.AddrPort, error) {
}
func (u *StdConn) ListenOut(r EncReader) {
var ip netip.Addr
msgs, buffers, names := u.PrepareRawMessages(u.batch)
msgs, packets := u.PrepareRawMessages(u.batch, u.isV4)
read := u.ReadMulti
if u.batch == 1 {
read = u.ReadSingle
@@ -139,13 +150,12 @@ func (u *StdConn) ListenOut(r EncReader) {
}
for i := 0; i < n; i++ {
// Its ok to skip the ok check here, the slicing is the only error that can occur and it will panic
if u.isV4 {
ip, _ = netip.AddrFromSlice(names[i][4:8])
} else {
ip, _ = netip.AddrFromSlice(names[i][8:24])
}
r(netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4])), buffers[i][:msgs[i].Len])
packets[i].Payload = packets[i].Payload[:msgs[i].Len]
packets[i].Update(getRawMessageControlLen(&msgs[i]))
}
r(packets[:n])
for i := 0; i < n; i++ { //todo reset this in prev loop, but this makes debug ez
msgs[i].Hdr.Controllen = uint64(unix.CmsgSpace(2))
}
}
}
@@ -198,6 +208,147 @@ func (u *StdConn) WriteTo(b []byte, ip netip.AddrPort) error {
return u.writeTo6(b, ip)
}
func (u *StdConn) WriteToBatch(b []byte, ip netip.AddrPort) error {
if u.isV4 {
return u.writeTo4(b, ip)
}
return u.writeTo6(b, ip)
}
func (u *StdConn) Prep(pkt *packet.Packet, addr netip.AddrPort) error {
nl, err := u.encodeSockaddr(pkt.Name, addr)
if err != nil {
return err
}
pkt.Name = pkt.Name[:nl]
pkt.OutLen = len(pkt.Payload)
return nil
}
func (u *StdConn) WriteBatch(pkts []*packet.Packet) (int, error) {
if len(pkts) == 0 {
return 0, nil
}
u.msgs = u.msgs[:0]
//u.iovs = u.iovs[:0]
sent := 0
var mostRecentPkt *packet.Packet
mostRecentPktSize := 0
//segmenting := false
idx := 0
for _, pkt := range pkts {
if len(pkt.Payload) == 0 || pkt.OutLen == -1 {
sent++
continue
}
lastIdx := idx - 1
if mostRecentPkt != nil && pkt.CompatibleForSegmentationWith(mostRecentPkt, mostRecentPktSize) && u.msgs[lastIdx].Hdr.Iovlen < iovMax {
u.msgs[lastIdx].Hdr.Controllen = uint64(len(mostRecentPkt.Control))
u.msgs[lastIdx].Hdr.Control = &mostRecentPkt.Control[0]
u.iovs[lastIdx][u.msgs[lastIdx].Hdr.Iovlen].Base = &pkt.Payload[0]
u.iovs[lastIdx][u.msgs[lastIdx].Hdr.Iovlen].Len = uint64(len(pkt.Payload))
u.msgs[lastIdx].Hdr.Iovlen++
mostRecentPktSize += len(pkt.Payload)
mostRecentPkt.SetSegSizeForTX()
} else {
u.msgs = append(u.msgs, rawMessage{})
u.iovs[idx][0] = iovec{
Base: &pkt.Payload[0],
Len: uint64(len(pkt.Payload)),
}
msg := &u.msgs[idx]
iov := &u.iovs[idx][0]
idx++
msg.Hdr.Iov = iov
msg.Hdr.Iovlen = 1
setRawMessageControl(msg, nil)
msg.Hdr.Flags = 0
msg.Hdr.Name = &pkt.Name[0]
msg.Hdr.Namelen = uint32(len(pkt.Name))
mostRecentPkt = pkt
mostRecentPktSize = len(pkt.Payload)
}
}
if len(u.msgs) == 0 {
return sent, nil
}
offset := 0
for offset < len(u.msgs) {
n, _, errno := unix.Syscall6(
unix.SYS_SENDMMSG,
uintptr(u.sysFd),
uintptr(unsafe.Pointer(&u.msgs[offset])),
uintptr(len(u.msgs)-offset),
0,
0,
0,
)
if errno != 0 {
if errno == unix.EINTR {
continue
}
//for i := 0; i < len(u.msgs); i++ {
// for j := 0; j < int(u.msgs[i].Hdr.Iovlen); j++ {
// u.l.WithFields(logrus.Fields{
// "msg_index": i,
// "iov idx": j,
// "iov": fmt.Sprintf("%+v", u.iovs[i][j]),
// }).Warn("failed to send message")
// }
//
//}
u.l.WithFields(logrus.Fields{
"errno": errno,
"idx": idx,
"len": len(u.msgs),
"deets": fmt.Sprintf("%+v", u.msgs),
"lastIOV": fmt.Sprintf("%+v", u.iovs[len(u.msgs)-1][u.msgs[len(u.msgs)-1].Hdr.Iovlen-1]),
}).Error("failed to send message")
return sent + offset, &net.OpError{Op: "sendmmsg", Err: errno}
}
if n == 0 {
break
}
offset += int(n)
}
return sent + len(u.msgs), nil
}
func (u *StdConn) encodeSockaddr(dst []byte, addr netip.AddrPort) (uint32, error) {
if u.isV4 {
if !addr.Addr().Is4() {
return 0, fmt.Errorf("Listener is IPv4, but writing to IPv6 remote")
}
var sa unix.RawSockaddrInet4
sa.Family = unix.AF_INET
sa.Addr = addr.Addr().As4()
binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&sa.Port))[:], addr.Port())
size := unix.SizeofSockaddrInet4
copy(dst[:size], (*(*[unix.SizeofSockaddrInet4]byte)(unsafe.Pointer(&sa)))[:])
return uint32(size), nil
}
var sa unix.RawSockaddrInet6
sa.Family = unix.AF_INET6
sa.Addr = addr.Addr().As16()
binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&sa.Port))[:], addr.Port())
size := unix.SizeofSockaddrInet6
copy(dst[:size], (*(*[unix.SizeofSockaddrInet6]byte)(unsafe.Pointer(&sa)))[:])
return uint32(size), nil
}
func (u *StdConn) writeTo6(b []byte, ip netip.AddrPort) error {
var rsa unix.RawSockaddrInet6
rsa.Family = unix.AF_INET6
@@ -298,6 +449,27 @@ func (u *StdConn) ReloadConfig(c *config.C) {
u.l.WithError(err).Error("Failed to set listen.so_mark")
}
}
u.configureGRO(true)
}
func (u *StdConn) configureGRO(enable bool) {
if enable == u.enableGRO {
return
}
if enable {
if err := unix.SetsockoptInt(u.sysFd, unix.SOL_UDP, unix.UDP_GRO, 1); err != nil {
u.l.WithError(err).Warn("Failed to enable UDP GRO")
return
}
u.enableGRO = true
u.l.Info("UDP GRO enabled")
} else {
if err := unix.SetsockoptInt(u.sysFd, unix.SOL_UDP, unix.UDP_GRO, 0); err != nil && err != unix.ENOPROTOOPT {
u.l.WithError(err).Warn("Failed to disable UDP GRO")
}
u.enableGRO = false
}
}
func (u *StdConn) getMemInfo(meminfo *[unix.SK_MEMINFO_VARS]uint32) error {