fix interfaces

This commit is contained in:
JackDoan
2026-04-24 15:18:28 -05:00
parent b155f4b7e1
commit 1d84b81032
4 changed files with 62 additions and 45 deletions

View File

@@ -260,7 +260,7 @@ func (f *Interface) activate() error {
} }
f.readers = f.inside.Readers() f.readers = f.inside.Readers()
for i := range f.readers { for i := range f.readers {
f.batchers[i] = batch.NewPassthrough(f.readers[i]) f.batchers[i] = batch.NewTCPCoalescer(f.readers[i])
} }
f.wg.Add(1) // for us to wait on Close() to return f.wg.Add(1) // for us to wait on Close() to return

View File

@@ -1,4 +1,4 @@
package coalesce package batch
import ( import (
"bytes" "bytes"
@@ -84,6 +84,8 @@ type TCPCoalescer struct {
// when a non-admissible packet for that flow arrives, or in Flush. // when a non-admissible packet for that flow arrives, or in Flush.
openSlots map[flowKey]*coalesceSlot openSlots map[flowKey]*coalesceSlot
pool []*coalesceSlot // free list for reuse pool []*coalesceSlot // free list for reuse
backing []byte
} }
func NewTCPCoalescer(w io.Writer) *TCPCoalescer { func NewTCPCoalescer(w io.Writer) *TCPCoalescer {
@@ -92,6 +94,7 @@ func NewTCPCoalescer(w io.Writer) *TCPCoalescer {
slots: make([]*coalesceSlot, 0, initialSlots), slots: make([]*coalesceSlot, 0, initialSlots),
openSlots: make(map[flowKey]*coalesceSlot, initialSlots), openSlots: make(map[flowKey]*coalesceSlot, initialSlots),
pool: make([]*coalesceSlot, 0, initialSlots), pool: make([]*coalesceSlot, 0, initialSlots),
backing: make([]byte, 0, initialSlots*65535),
} }
if gw, ok := w.(tio.GSOWriter); ok && gw.GSOSupported() { if gw, ok := w.(tio.GSOWriter); ok && gw.GSOSupported() {
c.gsoW = gw c.gsoW = gw
@@ -194,10 +197,22 @@ func (p parsedTCP) coalesceable() bool {
return p.payLen > 0 return p.payLen > 0
} }
// Add borrows pkt. The caller must keep pkt valid until the next Flush, func (c *TCPCoalescer) Reserve(sz int) []byte {
if len(c.backing)+sz > cap(c.backing) {
// Grow: allocate a fresh backing. Already-committed slices still
// reference the old array and remain valid until Flush drops them.
newCap := max(cap(c.backing)*2, sz)
c.backing = make([]byte, 0, newCap)
}
start := len(c.backing)
c.backing = c.backing[:start+sz]
return c.backing[start : start+sz : start+sz] //return zero length, sz-cap slice
}
// Commit borrows pkt. The caller must keep pkt valid until the next Flush,
// whether or not the packet was coalesced — passthrough (non-admissible) // whether or not the packet was coalesced — passthrough (non-admissible)
// packets are queued and written at Flush time, not synchronously. // packets are queued and written at Flush time, not synchronously.
func (c *TCPCoalescer) Add(pkt []byte) error { func (c *TCPCoalescer) Commit(pkt []byte) error {
if c.gsoW == nil { if c.gsoW == nil {
c.addPassthrough(pkt) c.addPassthrough(pkt)
return nil return nil
@@ -258,6 +273,8 @@ func (c *TCPCoalescer) Flush() error {
for k := range c.openSlots { for k := range c.openSlots {
delete(c.openSlots, k) delete(c.openSlots, k)
} }
c.backing = c.backing[:0]
return first return first
} }

View File

@@ -1,4 +1,4 @@
package coalesce package batch
import ( import (
"encoding/binary" "encoding/binary"
@@ -116,7 +116,7 @@ func TestCoalescerPassthroughWhenGSOUnavailable(t *testing.T) {
w := &fakeTunWriter{gsoEnabled: false} w := &fakeTunWriter{gsoEnabled: false}
c := NewTCPCoalescer(w) c := NewTCPCoalescer(w)
pkt := buildTCPv4(1000, tcpAck, []byte("hello")) pkt := buildTCPv4(1000, tcpAck, []byte("hello"))
if err := c.Add(pkt); err != nil { if err := c.Commit(pkt); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// No sync write — passthrough is deferred to Flush. // No sync write — passthrough is deferred to Flush.
@@ -140,7 +140,7 @@ func TestCoalescerNonTCPPassthrough(t *testing.T) {
pkt[9] = 1 pkt[9] = 1
copy(pkt[12:16], []byte{10, 0, 0, 1}) copy(pkt[12:16], []byte{10, 0, 0, 1})
copy(pkt[16:20], []byte{10, 0, 0, 2}) copy(pkt[16:20], []byte{10, 0, 0, 2})
if err := c.Add(pkt); err != nil { if err := c.Commit(pkt); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
@@ -155,7 +155,7 @@ func TestCoalescerSeedThenFlushAlone(t *testing.T) {
w := &fakeTunWriter{gsoEnabled: true} w := &fakeTunWriter{gsoEnabled: true}
c := NewTCPCoalescer(w) c := NewTCPCoalescer(w)
pkt := buildTCPv4(1000, tcpAck, make([]byte, 1000)) pkt := buildTCPv4(1000, tcpAck, make([]byte, 1000))
if err := c.Add(pkt); err != nil { if err := c.Commit(pkt); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if len(w.writes) != 0 || len(w.gsoWrites) != 0 { if len(w.writes) != 0 || len(w.gsoWrites) != 0 {
@@ -182,13 +182,13 @@ func TestCoalescerCoalescesAdjacentACKs(t *testing.T) {
w := &fakeTunWriter{gsoEnabled: true} w := &fakeTunWriter{gsoEnabled: true}
c := NewTCPCoalescer(w) c := NewTCPCoalescer(w)
pay := make([]byte, 1200) pay := make([]byte, 1200)
if err := c.Add(buildTCPv4(1000, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Add(buildTCPv4(2200, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4(2200, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Add(buildTCPv4(3400, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4(3400, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
@@ -222,10 +222,10 @@ func TestCoalescerRejectsSeqGap(t *testing.T) {
w := &fakeTunWriter{gsoEnabled: true} w := &fakeTunWriter{gsoEnabled: true}
c := NewTCPCoalescer(w) c := NewTCPCoalescer(w)
pay := make([]byte, 1200) pay := make([]byte, 1200)
if err := c.Add(buildTCPv4(1000, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Add(buildTCPv4(3000, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4(3000, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
@@ -241,13 +241,13 @@ func TestCoalescerRejectsFlagMismatch(t *testing.T) {
w := &fakeTunWriter{gsoEnabled: true} w := &fakeTunWriter{gsoEnabled: true}
c := NewTCPCoalescer(w) c := NewTCPCoalescer(w)
pay := make([]byte, 1200) pay := make([]byte, 1200)
if err := c.Add(buildTCPv4(1000, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// SYN|ACK is non-admissible. Must flush matching flow's slot (gso) // SYN|ACK is non-admissible. Must flush matching flow's slot (gso)
// and then plain-write the SYN packet itself. // and then plain-write the SYN packet itself.
syn := buildTCPv4(2200, tcpSyn|tcpAck, pay) syn := buildTCPv4(2200, tcpSyn|tcpAck, pay)
if err := c.Add(syn); err != nil { if err := c.Commit(syn); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
@@ -262,7 +262,7 @@ func TestCoalescerRejectsFIN(t *testing.T) {
w := &fakeTunWriter{gsoEnabled: true} w := &fakeTunWriter{gsoEnabled: true}
c := NewTCPCoalescer(w) c := NewTCPCoalescer(w)
fin := buildTCPv4(1000, tcpAck|tcpFin, []byte("x")) fin := buildTCPv4(1000, tcpAck|tcpFin, []byte("x"))
if err := c.Add(fin); err != nil { if err := c.Commit(fin); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
@@ -279,15 +279,15 @@ func TestCoalescerShortLastSegmentClosesChain(t *testing.T) {
c := NewTCPCoalescer(w) c := NewTCPCoalescer(w)
full := make([]byte, 1200) full := make([]byte, 1200)
half := make([]byte, 500) half := make([]byte, 500)
if err := c.Add(buildTCPv4(1000, tcpAck, full)); err != nil { if err := c.Commit(buildTCPv4(1000, tcpAck, full)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Add(buildTCPv4(2200, tcpAck, half)); err != nil { if err := c.Commit(buildTCPv4(2200, tcpAck, half)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Chain now closed; next packet seeds a new slot on the same flow // Chain now closed; next packet seeds a new slot on the same flow
// after flushing the old one. // after flushing the old one.
if err := c.Add(buildTCPv4(2700, tcpAck, full)); err != nil { if err := c.Commit(buildTCPv4(2700, tcpAck, full)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
@@ -313,13 +313,13 @@ func TestCoalescerPSHFinalizesChain(t *testing.T) {
w := &fakeTunWriter{gsoEnabled: true} w := &fakeTunWriter{gsoEnabled: true}
c := NewTCPCoalescer(w) c := NewTCPCoalescer(w)
pay := make([]byte, 1200) pay := make([]byte, 1200)
if err := c.Add(buildTCPv4(1000, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4(1000, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Add(buildTCPv4(2200, tcpAckPsh, pay)); err != nil { if err := c.Commit(buildTCPv4(2200, tcpAckPsh, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Add(buildTCPv4(3400, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4(3400, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
@@ -341,10 +341,10 @@ func TestCoalescerRejectsDifferentFlow(t *testing.T) {
p1 := buildTCPv4(1000, tcpAck, pay) p1 := buildTCPv4(1000, tcpAck, pay)
p2 := buildTCPv4(2200, tcpAck, pay) p2 := buildTCPv4(2200, tcpAck, pay)
binary.BigEndian.PutUint16(p2[20:22], 9999) binary.BigEndian.PutUint16(p2[20:22], 9999)
if err := c.Add(p1); err != nil { if err := c.Commit(p1); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Add(p2); err != nil { if err := c.Commit(p2); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
@@ -364,7 +364,7 @@ func TestCoalescerRejectsIPOptions(t *testing.T) {
// Bump IHL to 6 to simulate 4 bytes of IP options. Don't actually add // Bump IHL to 6 to simulate 4 bytes of IP options. Don't actually add
// bytes — parser should bail before it matters. // bytes — parser should bail before it matters.
pkt[0] = 0x46 pkt[0] = 0x46
if err := c.Add(pkt); err != nil { if err := c.Commit(pkt); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
@@ -382,7 +382,7 @@ func TestCoalescerCapBySegments(t *testing.T) {
pay := make([]byte, 512) pay := make([]byte, 512)
seq := uint32(1000) seq := uint32(1000)
for i := 0; i < tcpCoalesceMaxSegs+5; i++ { for i := 0; i < tcpCoalesceMaxSegs+5; i++ {
if err := c.Add(buildTCPv4(seq, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4(seq, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
seq += uint32(len(pay)) seq += uint32(len(pay))
@@ -406,22 +406,22 @@ func TestCoalescerMultipleFlowsInSameBatch(t *testing.T) {
pay := make([]byte, 1200) pay := make([]byte, 1200)
// Flow A: sport 1000. Flow B: sport 3000. // Flow A: sport 1000. Flow B: sport 3000.
if err := c.Add(buildTCPv4Ports(1000, 2000, 100, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4Ports(1000, 2000, 100, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Add(buildTCPv4Ports(3000, 2000, 500, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4Ports(3000, 2000, 500, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Add(buildTCPv4Ports(1000, 2000, 1300, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4Ports(1000, 2000, 1300, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Add(buildTCPv4Ports(3000, 2000, 1700, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4Ports(3000, 2000, 1700, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Add(buildTCPv4Ports(1000, 2000, 2500, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4Ports(1000, 2000, 2500, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Add(buildTCPv4Ports(3000, 2000, 2900, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4Ports(3000, 2000, 2900, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {
@@ -463,7 +463,7 @@ func TestCoalescerPreservesArrivalOrder(t *testing.T) {
// Sequence: coalesceable TCP, ICMP (passthrough), coalesceable TCP on // Sequence: coalesceable TCP, ICMP (passthrough), coalesceable TCP on
// a different flow. Expected emit order: gso(X), plain(ICMP), gso(Y). // a different flow. Expected emit order: gso(X), plain(ICMP), gso(Y).
pay := make([]byte, 1200) pay := make([]byte, 1200)
if err := c.Add(buildTCPv4Ports(1000, 2000, 100, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4Ports(1000, 2000, 100, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
icmp := make([]byte, 28) icmp := make([]byte, 28)
@@ -472,10 +472,10 @@ func TestCoalescerPreservesArrivalOrder(t *testing.T) {
icmp[9] = 1 icmp[9] = 1
copy(icmp[12:16], []byte{10, 0, 0, 1}) copy(icmp[12:16], []byte{10, 0, 0, 1})
copy(icmp[16:20], []byte{10, 0, 0, 3}) copy(icmp[16:20], []byte{10, 0, 0, 3})
if err := c.Add(icmp); err != nil { if err := c.Commit(icmp); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Add(buildTCPv4Ports(3000, 2000, 500, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4Ports(3000, 2000, 500, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Nothing should have hit the writer synchronously. // Nothing should have hit the writer synchronously.
@@ -529,26 +529,26 @@ func TestCoalescerInterleavedFlowsPreserveOrdering(t *testing.T) {
pay := make([]byte, 1200) pay := make([]byte, 1200)
// Flow A two segments. // Flow A two segments.
if err := c.Add(buildTCPv4Ports(1000, 2000, 100, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4Ports(1000, 2000, 100, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Add(buildTCPv4Ports(1000, 2000, 1300, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4Ports(1000, 2000, 1300, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Flow B two segments. // Flow B two segments.
if err := c.Add(buildTCPv4Ports(3000, 2000, 500, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4Ports(3000, 2000, 500, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Add(buildTCPv4Ports(3000, 2000, 1700, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4Ports(3000, 2000, 1700, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Flow A SYN (non-admissible) — must flush only flow A's slot. // Flow A SYN (non-admissible) — must flush only flow A's slot.
syn := buildTCPv4Ports(1000, 2000, 9999, tcpSyn|tcpAck, pay) syn := buildTCPv4Ports(1000, 2000, 9999, tcpSyn|tcpAck, pay)
if err := c.Add(syn); err != nil { if err := c.Commit(syn); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Flow B continues — should still be coalesced with its seed. // Flow B continues — should still be coalesced with its seed.
if err := c.Add(buildTCPv4Ports(3000, 2000, 2900, tcpAck, pay)); err != nil { if err := c.Commit(buildTCPv4Ports(3000, 2000, 2900, tcpAck, pay)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Flush(); err != nil { if err := c.Flush(); err != nil {

View File

@@ -35,7 +35,7 @@ const gsoInitialPayIovs = 66
// validVnetHdr is the 10-byte virtio_net_hdr we prepend to every non-GSO TUN // validVnetHdr is the 10-byte virtio_net_hdr we prepend to every non-GSO TUN
// write. Only flag set is VIRTIO_NET_HDR_F_DATA_VALID, which marks the skb // write. Only flag set is VIRTIO_NET_HDR_F_DATA_VALID, which marks the skb
// CHECKSUM_UNNECESSARY so the receiving network stack skips L4 checksum // CHECKSUM_UNNECESSARY so the receiving network stack skips L4 checksum
// verification. All packets that reach the plain Write / WriteReject paths // verification. All packets that reach the plain Write / WriteFromSelf paths
// already carry a valid L4 checksum (either supplied by a remote peer whose // already carry a valid L4 checksum (either supplied by a remote peer whose
// ciphertext we AEAD-authenticated, or produced by finishChecksum during TSO // ciphertext we AEAD-authenticated, or produced by finishChecksum during TSO
// segmentation, or built locally by CreateRejectPacket), so trusting them is // segmentation, or built locally by CreateRejectPacket), so trusting them is
@@ -56,7 +56,7 @@ type Offload struct {
pending [][]byte // segments returned from the most recent Read pending [][]byte // segments returned from the most recent Read
writeIovs [2]unix.Iovec // preallocated iovecs for Write (coalescer passthrough); iovs[0] is fixed to validVnetHdr writeIovs [2]unix.Iovec // preallocated iovecs for Write (coalescer passthrough); iovs[0] is fixed to validVnetHdr
// rejectIovs is a second preallocated iovec scratch used exclusively by // rejectIovs is a second preallocated iovec scratch used exclusively by
// WriteReject (reject + self-forward from the inside path). It mirrors // WriteFromSelf (reject + self-forward from the inside path). It mirrors
// writeIovs but lets listenIn goroutines emit reject packets without // writeIovs but lets listenIn goroutines emit reject packets without
// racing with the listenOut coalescer that owns writeIovs. // racing with the listenOut coalescer that owns writeIovs.
rejectIovs [2]unix.Iovec rejectIovs [2]unix.Iovec
@@ -246,12 +246,12 @@ func (r *Offload) Write(buf []byte) (int, error) {
return r.writeWithScratch(buf, &r.writeIovs) return r.writeWithScratch(buf, &r.writeIovs)
} }
// WriteReject emits a packet using a dedicated iovec scratch (rejectIovs) // WriteFromSelf emits a packet using a dedicated iovec scratch (rejectIovs)
// distinct from the one used by the coalescer's Write path. This avoids a // distinct from the one used by the coalescer's Write path. This avoids a
// data race between the inside (listenIn) goroutine emitting reject or // data race between the inside (listenIn) goroutine emitting reject or
// self-forward packets and the outside (listenOut) goroutine flushing TCP // self-forward packets and the outside (listenOut) goroutine flushing TCP
// coalescer passthroughs on the same Offload. // coalescer passthroughs on the same Offload.
func (r *Offload) WriteReject(buf []byte) (int, error) { func (r *Offload) WriteFromSelf(buf []byte) (int, error) {
return r.writeWithScratch(buf, &r.rejectIovs) return r.writeWithScratch(buf, &r.rejectIovs)
} }