This commit is contained in:
JackDoan
2026-04-23 13:12:24 -05:00
parent f76ac2e216
commit 90f2938f9c
5 changed files with 5 additions and 268 deletions

View File

@@ -50,74 +50,6 @@ func TestSendBatchBookkeeping(t *testing.T) {
} }
} }
func TestBatchSegmentable(t *testing.T) {
ap := netip.MustParseAddrPort("10.0.0.1:4242")
other := netip.MustParseAddrPort("10.0.0.2:4242")
mk := func(addrs []netip.AddrPort, sizes []int) *sendBatch {
b := newSendBatch(len(addrs), 64)
for i, a := range addrs {
s := b.Next()
for j := 0; j < sizes[i]; j++ {
s = append(s, byte(j))
}
b.Commit(len(s), a)
}
return b
}
t.Run("uniform same dst", func(t *testing.T) {
b := mk([]netip.AddrPort{ap, ap, ap}, []int{10, 10, 10})
seg, ok := batchSegmentable(b)
if !ok || seg != 10 {
t.Fatalf("got seg=%d ok=%v", seg, ok)
}
})
t.Run("last segment short ok", func(t *testing.T) {
b := mk([]netip.AddrPort{ap, ap, ap}, []int{10, 10, 4})
seg, ok := batchSegmentable(b)
if !ok || seg != 10 {
t.Fatalf("got seg=%d ok=%v", seg, ok)
}
})
t.Run("mixed dst rejected", func(t *testing.T) {
b := mk([]netip.AddrPort{ap, other, ap}, []int{10, 10, 10})
if _, ok := batchSegmentable(b); ok {
t.Fatalf("expected rejection for mixed dst")
}
})
t.Run("mid-batch short rejected", func(t *testing.T) {
b := mk([]netip.AddrPort{ap, ap, ap}, []int{10, 4, 10})
if _, ok := batchSegmentable(b); ok {
t.Fatalf("expected rejection for short mid-batch")
}
})
t.Run("mid-batch longer rejected", func(t *testing.T) {
b := mk([]netip.AddrPort{ap, ap, ap}, []int{10, 11, 10})
if _, ok := batchSegmentable(b); ok {
t.Fatalf("expected rejection for longer mid-batch")
}
})
t.Run("last longer rejected", func(t *testing.T) {
b := mk([]netip.AddrPort{ap, ap, ap}, []int{10, 10, 11})
if _, ok := batchSegmentable(b); ok {
t.Fatalf("expected rejection for longer last segment")
}
})
t.Run("first zero rejected", func(t *testing.T) {
b := mk([]netip.AddrPort{ap, ap}, []int{0, 10})
if _, ok := batchSegmentable(b); ok {
t.Fatalf("expected rejection for zero first")
}
})
}
func TestSendBatchSlotsDoNotOverlap(t *testing.T) { func TestSendBatchSlotsDoNotOverlap(t *testing.T) {
b := newSendBatch(3, 8) b := newSendBatch(3, 8)
ap := netip.MustParseAddrPort("10.0.0.1:80") ap := netip.MustParseAddrPort("10.0.0.1:80")

View File

@@ -380,54 +380,11 @@ func (f *Interface) listenIn(reader tio.Queue, i int) {
} }
func (f *Interface) flushBatch(batch *sendBatch, q int) { func (f *Interface) flushBatch(batch *sendBatch, q int) {
//if len(batch.bufs) == 1 { if err := f.writers[q].WriteBatch(batch.bufs, batch.dsts); err != nil {
// if err := f.writers[q].WriteTo(batch.bufs[0], batch.dsts[0]); err != nil {
// f.l.WithError(err).WithField("writer", q).Error("Failed to write outgoing single-batch")
// }
// return
//}
w := f.writers[q]
if w.SupportsGSO() {
if segSize, ok := batchSegmentable(batch); ok {
if err := w.WriteSegmented(batch.bufs, batch.dsts[0], segSize); err != nil {
f.l.WithError(err).WithField("writer", q).Error("Failed to write outgoing GSO batch")
}
return
}
}
if err := w.WriteBatch(batch.bufs, batch.dsts); err != nil {
f.l.WithError(err).WithField("writer", q).Error("Failed to write outgoing batch") f.l.WithError(err).WithField("writer", q).Error("Failed to write outgoing batch")
} }
} }
// batchSegmentable reports whether a batch can be emitted as a single UDP GSO
// superpacket: all packets go to the same destination, and every packet
// except possibly the last has the same length. Returns the segment size on
// success. The single-packet case is handled in flushBatch before this runs.
func batchSegmentable(b *sendBatch) (int, bool) {
segSize := len(b.bufs[0])
if segSize == 0 {
return 0, false
}
dst := b.dsts[0]
last := len(b.bufs) - 1
for i := 1; i <= last; i++ {
if b.dsts[i] != dst {
return 0, false
}
if i < last {
if len(b.bufs[i]) != segSize {
return 0, false
}
} else {
if len(b.bufs[i]) == 0 || len(b.bufs[i]) > segSize {
return 0, false
}
}
}
return segSize, true
}
func (f *Interface) RegisterConfigChangeCallbacks(c *config.C) { func (f *Interface) RegisterConfigChangeCallbacks(c *config.C) {
c.RegisterReloadCallback(f.reloadFirewall) c.RegisterReloadCallback(f.reloadFirewall)
c.RegisterReloadCallback(f.reloadSendRecvError) c.RegisterReloadCallback(f.reloadSendRecvError)

View File

@@ -12,7 +12,8 @@ import (
) )
// Space for segmented output. Worst case is many small segments, each paying // Space for segmented output. Worst case is many small segments, each paying
// an IP+TCP header. 128KiB comfortably covers the 64KiB payload ceiling. // an IP+TCP header. Should be a multiple of 64KiB.
// const tunSegBufSize = 0xffff * 8 TODO larger? config?
const tunSegBufSize = 131072 const tunSegBufSize = 131072
// tunSegBufCap is the total size we allocate for the per-reader segment // tunSegBufCap is the total size we allocate for the per-reader segment
@@ -24,7 +25,7 @@ const tunSegBufCap = tunSegBufSize * 2
// tunDrainCap caps how many packets a single Read will accumulate via // tunDrainCap caps how many packets a single Read will accumulate via
// the post-wake drain loop. Sized to soak up a burst of small ACKs while // the post-wake drain loop. Sized to soak up a burst of small ACKs while
// bounding how much work a single caller holds before handing off. // bounding how much work a single caller holds before handing off.
const tunDrainCap = 64 const tunDrainCap = 64 //256
// gsoInitialPayIovs is the starting capacity (in payload fragments) of // gsoInitialPayIovs is the starting capacity (in payload fragments) of
// Offload.gsoIovs. Sized to cover the default coalesce segment cap without // Offload.gsoIovs. Sized to cover the default coalesce segment cap without

View File

@@ -35,16 +35,6 @@ type Conn interface {
// WriteTo loop. Returns on the first error; callers may observe a // WriteTo loop. Returns on the first error; callers may observe a
// partial send if some packets went out before the error. // partial send if some packets went out before the error.
WriteBatch(bufs [][]byte, addrs []netip.AddrPort) error WriteBatch(bufs [][]byte, addrs []netip.AddrPort) error
// WriteSegmented sends bufs as a single UDP GSO sendmsg when the kernel
// supports it: all bufs go to the same addr, each must be exactly segSize
// bytes except the last which may be shorter. The kernel emits one
// datagram per buf on the wire. Backends / kernels without GSO support
// fall back to a per-packet WriteTo loop. Returns on the first error.
WriteSegmented(bufs [][]byte, addr netip.AddrPort, segSize int) error
// SupportsGSO reports whether WriteSegmented takes the single-syscall
// GSO path. Callers use this to decide at batch-assembly time whether
// the uniform-size / same-dst check is worth running.
SupportsGSO() bool
ReloadConfig(c *config.C) ReloadConfig(c *config.C)
SupportsMultipleReaders() bool SupportsMultipleReaders() bool
Close() error Close() error
@@ -70,12 +60,6 @@ func (NoopConn) WriteTo(_ []byte, _ netip.AddrPort) error {
func (NoopConn) WriteBatch(_ [][]byte, _ []netip.AddrPort) error { func (NoopConn) WriteBatch(_ [][]byte, _ []netip.AddrPort) error {
return nil return nil
} }
func (NoopConn) WriteSegmented(_ [][]byte, _ netip.AddrPort, _ int) error {
return nil
}
func (NoopConn) SupportsGSO() bool {
return false
}
func (NoopConn) ReloadConfig(_ *config.C) { func (NoopConn) ReloadConfig(_ *config.C) {
return return
} }

View File

@@ -54,13 +54,6 @@ type StdConn struct {
// probed once at socket creation. When true, WriteSegmented takes a // probed once at socket creation. When true, WriteSegmented takes a
// single-syscall GSO path; otherwise it falls back to a WriteTo loop. // single-syscall GSO path; otherwise it falls back to a WriteTo loop.
gsoSupported bool gsoSupported bool
gsoMsg msghdr
gsoIovs []iovec
gsoName []byte // SizeofSockaddrInet6
gsoCmsg []byte // CmsgSpace(2)
gsoSent int
gsoErrno syscall.Errno
gsoFunc func(fd uintptr) bool
} }
func setReusePort(network, address string, c syscall.RawConn) error { func setReusePort(network, address string, c syscall.RawConn) error {
@@ -155,9 +148,7 @@ const maxGSOSegments = 64
// fits, avoiding EMSGSIZE on large TSO superpackets. // fits, avoiding EMSGSIZE on large TSO superpackets.
const maxGSOBytes = 65535 const maxGSOBytes = 65535
// prepareGSO probes UDP_SEGMENT support and, on success, sets up the // prepareGSO probes UDP_SEGMENT support
// reusable sendmsg scratch (iovecs, sockaddr, cmsg) plus the preallocated
// raw-write closure used to avoid heap allocations on the hot path.
func (u *StdConn) prepareGSO() { func (u *StdConn) prepareGSO() {
var probeErr error var probeErr error
if err := u.rawConn.Control(func(fd uintptr) { if err := u.rawConn.Control(func(fd uintptr) {
@@ -169,25 +160,6 @@ func (u *StdConn) prepareGSO() {
return return
} }
u.gsoSupported = true u.gsoSupported = true
u.gsoIovs = make([]iovec, maxGSOSegments)
u.gsoName = make([]byte, unix.SizeofSockaddrInet6)
u.gsoCmsg = make([]byte, unix.CmsgSpace(2))
// Wire up the static pieces of gsoMsg. Iovlen / Controllen / Namelen /
// cmsg contents get refreshed per call; Iov, Name, Control pointers are
// fixed because the scratch slices never move.
u.gsoMsg.Iov = &u.gsoIovs[0]
u.gsoMsg.Name = &u.gsoName[0]
u.gsoMsg.Control = &u.gsoCmsg[0]
// Prepopulate the cmsg header. Len/Level/Type are constant for our use;
// only the 2-byte gso_size payload changes per call.
cmsghdr := (*unix.Cmsghdr)(unsafe.Pointer(&u.gsoCmsg[0]))
cmsghdr.Level = unix.SOL_UDP
cmsghdr.Type = unix.UDP_SEGMENT
setCmsgLen(cmsghdr, unix.CmsgLen(2))
u.gsoFunc = u.sendmsgRawWriteGSO
} }
func (u *StdConn) SupportsMultipleReaders() bool { func (u *StdConn) SupportsMultipleReaders() bool {
@@ -516,115 +488,6 @@ func (u *StdConn) sendmmsgRawWrite(fd uintptr) bool {
return true return true
} }
func (u *StdConn) SupportsGSO() bool {
return u.gsoSupported
}
// WriteSegmented sends bufs to addr as a UDP GSO superpacket. The kernel
// emits one datagram per iovec on the wire; all iovecs except the last must
// be exactly segSize bytes. Non-GSO kernels hit the WriteTo fallback.
// Called with len(bufs) >= 1. len(bufs) > maxGSOSegments is chunked.
func (u *StdConn) WriteSegmented(bufs [][]byte, addr netip.AddrPort, segSize int) error {
if len(bufs) == 0 {
return nil
}
if !u.gsoSupported {
for _, b := range bufs {
if err := u.WriteTo(b, addr); err != nil {
return err
}
}
return nil
}
nlen, err := writeSockaddr(u.gsoName, addr, u.isV4)
if err != nil {
return err
}
u.gsoMsg.Namelen = uint32(nlen)
setMsgControllen(&u.gsoMsg, unix.CmsgSpace(2))
// Cap the per-syscall fan-out by both segment count and total bytes.
// Kernel rejects sendmsg with EMSGSIZE when segCount*segSize would
// exceed sk_gso_max_size (typically 65536). For segSize > maxGSOBytes
// we can't use GSO at all and must fall back per-packet.
segsByBytes := maxGSOBytes / segSize
if segsByBytes == 0 {
for _, b := range bufs {
if werr := u.WriteTo(b, addr); werr != nil {
return werr
}
}
return nil
}
maxChunk := maxGSOSegments
if segsByBytes < maxChunk {
maxChunk = segsByBytes
}
i := 0
for i < len(bufs) {
chunk := len(bufs) - i
if chunk > maxChunk {
chunk = maxChunk
}
for k := 0; k < chunk; k++ {
b := bufs[i+k]
if len(b) == 0 {
u.gsoIovs[k].Base = nil
setIovLen(&u.gsoIovs[k], 0)
} else {
u.gsoIovs[k].Base = &b[0]
setIovLen(&u.gsoIovs[k], len(b))
}
}
setMsgIovlen(&u.gsoMsg, chunk)
binary.NativeEndian.PutUint16(u.gsoCmsg[unix.CmsgLen(0):unix.CmsgLen(0)+2], uint16(segSize))
if serr := u.sendmsgGSO(); serr != nil {
// Fall back to a per-packet loop for the remainder of the
// batch. Dropping the GSO call entirely is safer than
// returning mid-superpacket and losing bytes.
for k := 0; k < chunk; k++ {
if werr := u.WriteTo(bufs[i+k], addr); werr != nil {
return werr
}
}
}
i += chunk
}
return nil
}
// sendmsgRawWriteGSO is the preallocated rawConn.Write callback for the GSO
// path. Reads the prebuilt u.gsoMsg and writes u.gsoSent / u.gsoErrno.
func (u *StdConn) sendmsgRawWriteGSO(fd uintptr) bool {
r1, _, errno := unix.Syscall(
unix.SYS_SENDMSG,
fd,
uintptr(unsafe.Pointer(&u.gsoMsg)),
0,
)
if errno == syscall.EAGAIN || errno == syscall.EWOULDBLOCK {
return false
}
u.gsoSent = int(r1)
u.gsoErrno = errno
return true
}
func (u *StdConn) sendmsgGSO() error {
u.gsoSent = 0
u.gsoErrno = 0
if err := u.rawConn.Write(u.gsoFunc); err != nil {
return err
}
if u.gsoErrno != 0 {
return &net.OpError{Op: "sendmsg", Err: u.gsoErrno}
}
return nil
}
func (u *StdConn) sendmmsg(n int) (int, error) { func (u *StdConn) sendmmsg(n int) (int, error) {
u.writeChunk = n u.writeChunk = n
u.writeSent = 0 u.writeSent = 0