GSO again

This commit is contained in:
JackDoan
2026-04-17 10:25:05 -05:00
parent 6ee5e18d84
commit d0825514a0
31 changed files with 3278 additions and 164 deletions

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"net/netip"
"sync"
"sync/atomic"
@@ -86,8 +85,12 @@ type Interface struct {
conntrackCacheTimeout time.Duration
writers []udp.Conn
readers []io.ReadWriteCloser
wg sync.WaitGroup
readers []overlay.Queue
// tunCoalescers is one tcpCoalescer per tun queue, wrapping readers[i].
// decryptToTun sends plaintext into the coalescer; listenOut calls its
// Flush at the end of each UDP recvmmsg batch.
tunCoalescers []*tcpCoalescer
wg sync.WaitGroup
// fatalErr holds the first unexpected reader error that caused shutdown.
// nil means "no fatal error" (yet)
@@ -184,7 +187,8 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
routines: c.routines,
version: c.version,
writers: make([]udp.Conn, c.routines),
readers: make([]io.ReadWriteCloser, c.routines),
readers: make([]overlay.Queue, c.routines),
tunCoalescers: make([]*tcpCoalescer, c.routines),
myVpnNetworks: cs.myVpnNetworks,
myVpnNetworksTable: cs.myVpnNetworksTable,
myVpnAddrs: cs.myVpnAddrs,
@@ -239,7 +243,7 @@ func (f *Interface) activate() error {
metrics.GetOrRegisterGauge("routines", nil).Update(int64(f.routines))
// Prepare n tun queues
var reader io.ReadWriteCloser = f.inside
var reader overlay.Queue = f.inside
for i := 0; i < f.routines; i++ {
if i > 0 {
reader, err = f.inside.NewMultiQueueReader()
@@ -248,6 +252,7 @@ func (f *Interface) activate() error {
}
}
f.readers[i] = reader
f.tunCoalescers[i] = newTCPCoalescer(reader)
}
f.wg.Add(1) // for us to wait on Close() to return
@@ -305,13 +310,28 @@ func (f *Interface) listenOut(i int) {
ctCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
lhh := f.lightHouse.NewRequestHandler()
plaintext := make([]byte, udp.MTU)
h := &header.H{}
fwPacket := &firewall.Packet{}
nb := make([]byte, 12, 12)
// plaintexts is a ring of decrypt scratches, one per packet in a UDP
// recvmmsg batch. The coalescer borrows payload slices from here and
// requires they stay valid until Flush — so we rotate each packet and
// reset only in the batch-end flush callback.
var plaintexts [][]byte
idx := 0
coalescer := f.tunCoalescers[i]
err := li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) {
f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l))
if idx >= len(plaintexts) {
plaintexts = append(plaintexts, make([]byte, udp.MTU))
}
f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintexts[idx][:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l))
idx++
}, func() {
if err := coalescer.Flush(); err != nil {
f.l.WithError(err).Error("Failed to flush tun coalescer")
}
idx = 0
})
if err != nil && !f.closed.Load() {
@@ -322,16 +342,16 @@ func (f *Interface) listenOut(i int) {
f.l.Debugf("underlay reader %v is done", i)
}
func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
packet := make([]byte, mtu)
out := make([]byte, mtu)
func (f *Interface) listenIn(reader overlay.Queue, i int) {
rejectBuf := make([]byte, mtu)
batch := newSendBatch(sendBatchCap, udp.MTU+32)
fwPacket := &firewall.Packet{}
nb := make([]byte, 12, 12)
conntrackCache := firewall.NewConntrackCacheTicker(f.conntrackCacheTimeout)
for {
n, err := reader.Read(packet)
pkts, err := reader.Read()
if err != nil {
if !f.closed.Load() {
f.l.WithError(err).WithField("reader", i).Error("Error while reading outbound packet, closing")
@@ -340,12 +360,71 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
break
}
f.consumeInsidePacket(packet[:n], fwPacket, nb, out, i, conntrackCache.Get(f.l))
batch.Reset()
for _, pkt := range pkts {
if batch.Len() >= batch.Cap() {
f.flushBatch(batch, i)
batch.Reset()
}
f.consumeInsidePacket(pkt, fwPacket, nb, batch, rejectBuf, i, conntrackCache.Get(f.l))
}
if batch.Len() > 0 {
f.flushBatch(batch, i)
}
}
f.l.Debugf("overlay reader %v is done", i)
}
func (f *Interface) flushBatch(batch *sendBatch, q int) {
//if len(batch.bufs) == 1 {
// if err := f.writers[q].WriteTo(batch.bufs[0], batch.dsts[0]); err != nil {
// f.l.WithError(err).WithField("writer", q).Error("Failed to write outgoing single-batch")
// }
// return
//}
w := f.writers[q]
if w.SupportsGSO() {
if segSize, ok := batchSegmentable(batch); ok {
if err := w.WriteSegmented(batch.bufs, batch.dsts[0], segSize); err != nil {
f.l.WithError(err).WithField("writer", q).Error("Failed to write outgoing GSO batch")
}
return
}
}
if err := w.WriteBatch(batch.bufs, batch.dsts); err != nil {
f.l.WithError(err).WithField("writer", q).Error("Failed to write outgoing batch")
}
}
// batchSegmentable reports whether a batch can be emitted as a single UDP GSO
// superpacket: all packets go to the same destination, and every packet
// except possibly the last has the same length. Returns the segment size on
// success. The single-packet case is handled in flushBatch before this runs.
func batchSegmentable(b *sendBatch) (int, bool) {
segSize := len(b.bufs[0])
if segSize == 0 {
return 0, false
}
dst := b.dsts[0]
last := len(b.bufs) - 1
for i := 1; i <= last; i++ {
if b.dsts[i] != dst {
return 0, false
}
if i < last {
if len(b.bufs[i]) != segSize {
return 0, false
}
} else {
if len(b.bufs[i]) == 0 || len(b.bufs[i]) > segSize {
return 0, false
}
}
}
return segSize, true
}
func (f *Interface) RegisterConfigChangeCallbacks(c *config.C) {
c.RegisterReloadCallback(f.reloadFirewall)
c.RegisterReloadCallback(f.reloadSendRecvError)