batch udp packet sending

This commit is contained in:
Jay Wren
2026-02-03 16:56:21 -05:00
parent 42bee7cf17
commit 15333f9fed
9 changed files with 133 additions and 1 deletions

View File

@@ -69,7 +69,6 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet
dropReason := f.firewall.Drop(*fwPacket, false, hostinfo, f.pki.GetCAPool(), localCache) dropReason := f.firewall.Drop(*fwPacket, false, hostinfo, f.pki.GetCAPool(), localCache)
if dropReason == nil { if dropReason == nil {
f.sendNoMetrics(header.Message, 0, hostinfo.ConnectionState, hostinfo, netip.AddrPort{}, packet, nb, out, q) f.sendNoMetrics(header.Message, 0, hostinfo.ConnectionState, hostinfo, netip.AddrPort{}, packet, nb, out, q)
} else { } else {
f.rejectInside(packet, out, q) f.rejectInside(packet, out, q)
if f.l.Level >= logrus.DebugLevel { if f.l.Level >= logrus.DebugLevel {

View File

@@ -13,11 +13,18 @@ type EncReader func(
payload []byte, payload []byte,
) )
// BatchPacket represents a single packet in a batch write operation
type BatchPacket struct {
Payload []byte
Addr netip.AddrPort
}
type Conn interface { type Conn interface {
Rebind() error Rebind() error
LocalAddr() (netip.AddrPort, error) LocalAddr() (netip.AddrPort, error)
ListenOut(r EncReader) ListenOut(r EncReader)
WriteTo(b []byte, addr netip.AddrPort) error WriteTo(b []byte, addr netip.AddrPort) error
WriteBatch(pkts []BatchPacket) (int, error)
ReloadConfig(c *config.C) ReloadConfig(c *config.C)
SupportsMultipleReaders() bool SupportsMultipleReaders() bool
Close() error Close() error
@@ -40,6 +47,9 @@ func (NoopConn) SupportsMultipleReaders() bool {
func (NoopConn) WriteTo(_ []byte, _ netip.AddrPort) error { func (NoopConn) WriteTo(_ []byte, _ netip.AddrPort) error {
return nil return nil
} }
func (NoopConn) WriteBatch(pkts []BatchPacket) (int, error) {
return len(pkts), nil
}
func (NoopConn) ReloadConfig(_ *config.C) { func (NoopConn) ReloadConfig(_ *config.C) {
return return
} }

View File

@@ -202,3 +202,12 @@ func (u *StdConn) Rebind() error {
return nil return nil
} }
func (u *StdConn) WriteBatch(pkts []BatchPacket) (int, error) {
for i := range pkts {
if err := u.WriteTo(pkts[i].Payload, pkts[i].Addr); err != nil {
return i, err
}
}
return len(pkts), nil
}

View File

@@ -101,3 +101,12 @@ func (u *GenericConn) ListenOut(r EncReader) {
func (u *GenericConn) SupportsMultipleReaders() bool { func (u *GenericConn) SupportsMultipleReaders() bool {
return false return false
} }
func (u *GenericConn) WriteBatch(pkts []BatchPacket) (int, error) {
for i := range pkts {
if err := u.WriteTo(pkts[i].Payload, pkts[i].Addr); err != nil {
return i, err
}
}
return len(pkts), nil
}

View File

@@ -313,6 +313,69 @@ func (u *StdConn) Close() error {
return syscall.Close(u.sysFd) return syscall.Close(u.sysFd)
} }
func (u *StdConn) WriteBatch(pkts []BatchPacket) (int, error) {
if len(pkts) == 0 {
return 0, nil
}
msgs := make([]rawMessage, len(pkts))
iovecs := make([]iovec, len(pkts))
var names4 []unix.RawSockaddrInet4
var names6 []unix.RawSockaddrInet6
if u.isV4 {
names4 = make([]unix.RawSockaddrInet4, len(pkts))
} else {
names6 = make([]unix.RawSockaddrInet6, len(pkts))
}
for i := range pkts {
setIovecBase(&iovecs[i], &pkts[i].Payload[0])
setIovecLen(&iovecs[i], len(pkts[i].Payload))
msgs[i].Hdr.Iov = &iovecs[i]
setMsghdrIovlen(&msgs[i].Hdr, 1)
if u.isV4 {
names4[i].Family = unix.AF_INET
names4[i].Addr = pkts[i].Addr.Addr().As4()
binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&names4[i].Port))[:], pkts[i].Addr.Port())
msgs[i].Hdr.Name = (*byte)(unsafe.Pointer(&names4[i]))
msgs[i].Hdr.Namelen = unix.SizeofSockaddrInet4
} else {
names6[i].Family = unix.AF_INET6
names6[i].Addr = pkts[i].Addr.Addr().As16()
binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&names6[i].Port))[:], pkts[i].Addr.Port())
msgs[i].Hdr.Name = (*byte)(unsafe.Pointer(&names6[i]))
msgs[i].Hdr.Namelen = unix.SizeofSockaddrInet6
}
}
var sent int
for sent < len(msgs) {
n, _, errno := unix.Syscall6(
unix.SYS_SENDMMSG,
uintptr(u.sysFd),
uintptr(unsafe.Pointer(&msgs[sent])),
uintptr(len(msgs)-sent),
0,
0,
0,
)
if errno == unix.EINTR {
continue
}
if errno != 0 {
return sent, &net.OpError{Op: "sendmmsg", Err: errno}
}
sent += int(n)
}
return sent, nil
}
func NewUDPStatsEmitter(udpConns []Conn) func() { func NewUDPStatsEmitter(udpConns []Conn) func() {
// Check if our kernel supports SO_MEMINFO before registering the gauges // Check if our kernel supports SO_MEMINFO before registering the gauges
var udpGauges [][unix.SK_MEMINFO_VARS]metrics.Gauge var udpGauges [][unix.SK_MEMINFO_VARS]metrics.Gauge

View File

@@ -52,3 +52,15 @@ func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
return msgs, buffers, names return msgs, buffers, names
} }
func setIovecBase(iov *iovec, base *byte) {
iov.Base = base
}
func setIovecLen(iov *iovec, l int) {
iov.Len = uint32(l)
}
func setMsghdrIovlen(hdr *msghdr, l int) {
hdr.Iovlen = uint32(l)
}

View File

@@ -55,3 +55,15 @@ func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
return msgs, buffers, names return msgs, buffers, names
} }
func setIovecBase(iov *iovec, base *byte) {
iov.Base = base
}
func setIovecLen(iov *iovec, l int) {
iov.Len = uint64(l)
}
func setMsghdrIovlen(hdr *msghdr, l int) {
hdr.Iovlen = uint64(l)
}

View File

@@ -338,6 +338,15 @@ func (u *RIOConn) Rebind() error {
func (u *RIOConn) ReloadConfig(*config.C) {} func (u *RIOConn) ReloadConfig(*config.C) {}
func (u *RIOConn) WriteBatch(pkts []BatchPacket) (int, error) {
for i := range pkts {
if err := u.WriteTo(pkts[i].Payload, pkts[i].Addr); err != nil {
return i, err
}
}
return len(pkts), nil
}
func (u *RIOConn) Close() error { func (u *RIOConn) Close() error {
if !u.isOpen.CompareAndSwap(true, false) { if !u.isOpen.CompareAndSwap(true, false) {
return nil return nil

View File

@@ -142,3 +142,12 @@ func (u *TesterConn) Close() error {
} }
return nil return nil
} }
func (u *TesterConn) WriteBatch(pkts []BatchPacket) (int, error) {
for i := range pkts {
if err := u.WriteTo(pkts[i].Payload, pkts[i].Addr); err != nil {
return i, err
}
}
return len(pkts), nil
}