works well

This commit is contained in:
Ryan
2025-11-04 19:33:52 -05:00
parent aa44f4c7c9
commit 98f264cf14
3 changed files with 116 additions and 9 deletions

View File

@@ -8,6 +8,7 @@ import (
"net/netip"
"os"
"runtime"
"strings"
"sync/atomic"
"time"
@@ -399,6 +400,12 @@ func (f *Interface) listenInBatchLocked(raw io.ReadWriteCloser, reader overlay.B
return
}
if isVirtioHeadroomError(err) {
f.l.WithError(err).Warn("Batch reader fell back due to tun headroom issue")
f.listenInLegacyLocked(raw, i)
return
}
f.l.WithError(err).Error("Error while reading outbound packet batch")
os.Exit(2)
}
@@ -549,6 +556,7 @@ func (f *Interface) runTunWriteQueue(i int) {
if writer == nil {
return
}
requiredHeadroom := writer.BatchHeadroom()
batchCap := f.batches.batchSizeHint()
if batchCap <= 0 {
@@ -563,15 +571,27 @@ func (f *Interface) runTunWriteQueue(i int) {
if len(pending) == 0 {
return
}
if _, err := writer.WriteBatch(pending); err != nil {
f.l.WithError(err).
WithField("queue", i).
WithField("reason", reason).
Warn("Failed to write tun batch")
}
valid := pending[:0]
for idx := range pending {
if !f.ensurePacketHeadroom(&pending[idx], requiredHeadroom, i, reason) {
pending[idx] = nil
continue
}
if pending[idx] != nil {
pending[idx].Release()
valid = append(valid, pending[idx])
}
}
if len(valid) > 0 {
if _, err := writer.WriteBatch(valid); err != nil {
f.l.WithError(err).
WithField("queue", i).
WithField("reason", reason).
Warn("Failed to write tun batch")
for _, pkt := range valid {
if pkt != nil {
f.writePacketToTun(i, pkt)
}
}
}
}
pending = pending[:0]
@@ -605,7 +625,9 @@ func (f *Interface) runTunWriteQueue(i int) {
if pkt == nil {
continue
}
pending = append(pending, pkt)
if f.ensurePacketHeadroom(&pkt, requiredHeadroom, i, "queue") {
pending = append(pending, pkt)
}
if len(pending) >= cap(pending) {
flush("cap", false)
continue
@@ -811,6 +833,40 @@ func (f *Interface) writePacketToTun(q int, pkt *overlay.Packet) {
pkt.Release()
}
func (f *Interface) clonePacketWithHeadroom(pkt *overlay.Packet, required int) *overlay.Packet {
if pkt == nil {
return nil
}
payload := pkt.Payload()[:pkt.Len]
if len(payload) == 0 && required <= 0 {
return pkt
}
pool := f.batches.Pool()
if pool != nil {
if clone := pool.Get(); clone != nil {
if len(clone.Payload()) >= len(payload) {
clone.Len = copy(clone.Payload(), payload)
pkt.Release()
return clone
}
clone.Release()
}
}
if required < 0 {
required = 0
}
buf := make([]byte, required+len(payload))
n := copy(buf[required:], payload)
pkt.Release()
return &overlay.Packet{
Buf: buf,
Offset: required,
Len: n,
}
}
func (f *Interface) observeUDPQueueLen(i int) {
if f.batchUDPQueueGauge == nil {
return
@@ -832,6 +888,34 @@ func (f *Interface) currentBatchFlushInterval() time.Duration {
return 0
}
func (f *Interface) ensurePacketHeadroom(pkt **overlay.Packet, required int, queue int, reason string) bool {
p := *pkt
if p == nil {
return false
}
if required <= 0 || p.Offset >= required {
return true
}
clone := f.clonePacketWithHeadroom(p, required)
if clone == nil {
f.l.WithFields(logrus.Fields{
"queue": queue,
"reason": reason,
}).Warn("dropping packet lacking tun headroom")
return false
}
*pkt = clone
return true
}
func isVirtioHeadroomError(err error) bool {
if err == nil {
return false
}
msg := err.Error()
return strings.Contains(msg, "headroom") || strings.Contains(msg, "virtio")
}
func (f *Interface) effectiveGSOMaxSegments() int {
max := f.gsoMaxSegments
if max <= 0 {