mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-16 04:47:38 +02:00
cruft fix tests haha yep faster checksum speed haha save pennies fix typo! checkpt GSO again
375 lines
12 KiB
Go
375 lines
12 KiB
Go
package tio
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"unsafe"
|
|
|
|
"golang.org/x/sys/unix"
|
|
)
|
|
|
|
// Space for segmented output. Worst case is many small segments, each paying
|
|
// an IP+TCP header. Should be a multiple of 64KiB.
|
|
// const tunSegBufSize = 0xffff * 8 TODO larger? config?
|
|
const tunSegBufSize = 131072
|
|
|
|
// tunSegBufCap is the total size we allocate for the per-reader segment
|
|
// buffer. It is sized as one worst-case TSO superpacket (tunSegBufSize) plus
|
|
// the same again as drain headroom so a Read wake can accumulate
|
|
// additional packets after an initial big read without overflowing.
|
|
const tunSegBufCap = tunSegBufSize * 2
|
|
|
|
// tunDrainCap caps how many packets a single Read will accumulate via
|
|
// the post-wake drain loop. Sized to soak up a burst of small ACKs while
|
|
// bounding how much work a single caller holds before handing off.
|
|
const tunDrainCap = 64 //256
|
|
|
|
// gsoInitialPayIovs is the starting capacity (in payload fragments) of
|
|
// Offload.gsoIovs. Sized to cover the default coalesce segment cap without
|
|
// any reallocations.
|
|
const gsoInitialPayIovs = 66
|
|
|
|
// validVnetHdr is the 10-byte virtio_net_hdr we prepend to every non-GSO TUN
|
|
// write. Only flag set is VIRTIO_NET_HDR_F_DATA_VALID, which marks the skb
|
|
// CHECKSUM_UNNECESSARY so the receiving network stack skips L4 checksum
|
|
// verification. All packets that reach the plain Write / WriteReject paths
|
|
// already carry a valid L4 checksum (either supplied by a remote peer whose
|
|
// ciphertext we AEAD-authenticated, or produced by finishChecksum during TSO
|
|
// segmentation, or built locally by CreateRejectPacket), so trusting them is
|
|
// safe.
|
|
var validVnetHdr = [virtioNetHdrLen]byte{unix.VIRTIO_NET_HDR_F_DATA_VALID}
|
|
|
|
// Offload wraps a TUN file descriptor with poll-based reads. The FD provided will be changed to non-blocking.
|
|
// A shared eventfd allows Close to wake all readers blocked in poll.
|
|
type Offload struct {
|
|
fd int
|
|
shutdownFd int
|
|
readPoll [2]unix.PollFd
|
|
writePoll [2]unix.PollFd
|
|
closed atomic.Bool
|
|
readBuf []byte // scratch for a single raw read (virtio hdr + superpacket)
|
|
segBuf []byte // backing store for segmented output
|
|
segOff int // cursor into segBuf for the current Read drain
|
|
pending [][]byte // segments returned from the most recent Read
|
|
writeIovs [2]unix.Iovec // preallocated iovecs for Write (coalescer passthrough); iovs[0] is fixed to validVnetHdr
|
|
// rejectIovs is a second preallocated iovec scratch used exclusively by
|
|
// WriteReject (reject + self-forward from the inside path). It mirrors
|
|
// writeIovs but lets listenIn goroutines emit reject packets without
|
|
// racing with the listenOut coalescer that owns writeIovs.
|
|
rejectIovs [2]unix.Iovec
|
|
|
|
// gsoHdrBuf is a per-queue 10-byte scratch for the virtio_net_hdr emitted
|
|
// by WriteGSO. Separate from validVnetHdr so a concurrent non-GSO Write on
|
|
// another queue never observes a half-written header.
|
|
gsoHdrBuf [virtioNetHdrLen]byte
|
|
// gsoIovs is the writev iovec scratch for WriteGSO. Sized to hold the
|
|
// virtio header + IP/TCP header + up to gsoInitialPayIovs payload
|
|
// fragments; grown on demand if a coalescer pushes more.
|
|
gsoIovs []unix.Iovec
|
|
}
|
|
|
|
func newOffload(fd int, shutdownFd int) (*Offload, error) {
|
|
if err := unix.SetNonblock(fd, true); err != nil {
|
|
return nil, fmt.Errorf("failed to set tun fd non-blocking: %w", err)
|
|
}
|
|
|
|
out := &Offload{
|
|
fd: fd,
|
|
shutdownFd: shutdownFd,
|
|
closed: atomic.Bool{},
|
|
readBuf: make([]byte, tunReadBufSize),
|
|
readPoll: [2]unix.PollFd{
|
|
{Fd: int32(fd), Events: unix.POLLIN},
|
|
{Fd: int32(shutdownFd), Events: unix.POLLIN},
|
|
},
|
|
writePoll: [2]unix.PollFd{
|
|
{Fd: int32(fd), Events: unix.POLLOUT},
|
|
{Fd: int32(shutdownFd), Events: unix.POLLIN},
|
|
},
|
|
|
|
segBuf: make([]byte, tunSegBufCap),
|
|
gsoIovs: make([]unix.Iovec, 2, 2+gsoInitialPayIovs),
|
|
}
|
|
|
|
out.writeIovs[0].Base = &validVnetHdr[0]
|
|
out.writeIovs[0].SetLen(virtioNetHdrLen)
|
|
out.rejectIovs[0].Base = &validVnetHdr[0]
|
|
out.rejectIovs[0].SetLen(virtioNetHdrLen)
|
|
out.gsoIovs[0].Base = &out.gsoHdrBuf[0]
|
|
out.gsoIovs[0].SetLen(virtioNetHdrLen)
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (r *Offload) blockOnRead() error {
|
|
const problemFlags = unix.POLLHUP | unix.POLLNVAL | unix.POLLERR
|
|
var err error
|
|
for {
|
|
_, err = unix.Poll(r.readPoll[:], -1)
|
|
if err != unix.EINTR {
|
|
break
|
|
}
|
|
}
|
|
//always reset these!
|
|
tunEvents := r.readPoll[0].Revents
|
|
shutdownEvents := r.readPoll[1].Revents
|
|
r.readPoll[0].Revents = 0
|
|
r.readPoll[1].Revents = 0
|
|
//do the err check before trusting the potentially bogus bits we just got
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if shutdownEvents&(unix.POLLIN|problemFlags) != 0 {
|
|
return os.ErrClosed
|
|
} else if tunEvents&problemFlags != 0 {
|
|
return os.ErrClosed
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *Offload) blockOnWrite() error {
|
|
const problemFlags = unix.POLLHUP | unix.POLLNVAL | unix.POLLERR
|
|
var err error
|
|
for {
|
|
_, err = unix.Poll(r.writePoll[:], -1)
|
|
if err != unix.EINTR {
|
|
break
|
|
}
|
|
}
|
|
//always reset these!
|
|
tunEvents := r.writePoll[0].Revents
|
|
shutdownEvents := r.writePoll[1].Revents
|
|
r.writePoll[0].Revents = 0
|
|
r.writePoll[1].Revents = 0
|
|
//do the err check before trusting the potentially bogus bits we just got
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if shutdownEvents&(unix.POLLIN|problemFlags) != 0 {
|
|
return os.ErrClosed
|
|
} else if tunEvents&problemFlags != 0 {
|
|
return os.ErrClosed
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *Offload) readRaw(buf []byte) (int, error) {
|
|
for {
|
|
if n, err := unix.Read(r.fd, buf); err == nil {
|
|
return n, nil
|
|
} else if err == unix.EAGAIN {
|
|
if err = r.blockOnRead(); err != nil {
|
|
return 0, err
|
|
}
|
|
continue
|
|
} else if err == unix.EINTR {
|
|
continue
|
|
} else if err == unix.EBADF {
|
|
return 0, os.ErrClosed
|
|
} else {
|
|
return 0, err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Read reads one or more superpackets from the tun and returns the
|
|
// resulting packets. The first read blocks via poll; once the fd is known
|
|
// readable we drain additional packets non-blocking until the kernel queue
|
|
// is empty (EAGAIN), we've collected tunDrainCap packets, or we're out of
|
|
// segBuf headroom. This amortizes the poll wake over bursts of small
|
|
// packets (e.g. TCP ACKs). Slices point into the Offload's internal buffers
|
|
// and are only valid until the next Read or Close on this Queue.
|
|
func (r *Offload) Read() ([][]byte, error) {
|
|
r.pending = r.pending[:0]
|
|
r.segOff = 0
|
|
|
|
// Initial (blocking) read. Retry on decode errors so a single bad
|
|
// packet does not stall the reader.
|
|
for {
|
|
n, err := r.readRaw(r.readBuf)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := r.decodeRead(n); err != nil {
|
|
// Drop and read again — a bad packet should not kill the reader.
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
|
|
// Drain: non-blocking reads until the kernel queue is empty, the drain
|
|
// cap is reached, or segBuf no longer has room for another worst-case
|
|
// superpacket.
|
|
for len(r.pending) < tunDrainCap && tunSegBufCap-r.segOff >= tunSegBufSize {
|
|
n, err := unix.Read(r.fd, r.readBuf)
|
|
if err != nil {
|
|
// EAGAIN / EINTR / anything else: stop draining. We already
|
|
// have a valid batch from the first read.
|
|
break
|
|
}
|
|
if n <= 0 {
|
|
break
|
|
}
|
|
if err := r.decodeRead(n); err != nil {
|
|
// Drop this packet and stop the drain; we'd rather hand off
|
|
// what we have than keep spinning here.
|
|
break
|
|
}
|
|
}
|
|
|
|
return r.pending, nil
|
|
}
|
|
|
|
// decodeRead decodes the virtio header plus payload in r.readBuf[:n], appends
|
|
// the segments to r.pending, and advances r.segOff by the total scratch used.
|
|
// Caller must have already ensured r.vnetHdr is true.
|
|
func (r *Offload) decodeRead(n int) error {
|
|
if n < virtioNetHdrLen {
|
|
return fmt.Errorf("short tun read: %d < %d", n, virtioNetHdrLen)
|
|
}
|
|
var hdr VirtioNetHdr
|
|
hdr.decode(r.readBuf[:virtioNetHdrLen])
|
|
before := len(r.pending)
|
|
if err := segmentInto(r.readBuf[virtioNetHdrLen:n], hdr, &r.pending, r.segBuf[r.segOff:]); err != nil {
|
|
return err
|
|
}
|
|
for k := before; k < len(r.pending); k++ {
|
|
r.segOff += len(r.pending[k])
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *Offload) Write(buf []byte) (int, error) {
|
|
return r.writeWithScratch(buf, &r.writeIovs)
|
|
}
|
|
|
|
// WriteReject emits a packet using a dedicated iovec scratch (rejectIovs)
|
|
// distinct from the one used by the coalescer's Write path. This avoids a
|
|
// data race between the inside (listenIn) goroutine emitting reject or
|
|
// self-forward packets and the outside (listenOut) goroutine flushing TCP
|
|
// coalescer passthroughs on the same Offload.
|
|
func (r *Offload) WriteReject(buf []byte) (int, error) {
|
|
return r.writeWithScratch(buf, &r.rejectIovs)
|
|
}
|
|
|
|
func (r *Offload) writeWithScratch(buf []byte, iovs *[2]unix.Iovec) (int, error) {
|
|
if len(buf) == 0 {
|
|
return 0, nil
|
|
}
|
|
// Point the payload iovec at the caller's buffer. iovs[0] is pre-wired
|
|
// to validVnetHdr during Offload construction so we don't rebuild it here.
|
|
iovs[1].Base = &buf[0]
|
|
iovs[1].SetLen(len(buf))
|
|
iovPtr := unsafe.Pointer(&iovs[0])
|
|
return r.rawWrite(iovPtr, 2)
|
|
}
|
|
|
|
func (r *Offload) rawWrite(iovs unsafe.Pointer, iovcnt int) (int, error) {
|
|
for {
|
|
n, _, errno := syscall.Syscall(unix.SYS_WRITEV, uintptr(r.fd), uintptr(iovs), uintptr(iovcnt))
|
|
if errno == 0 {
|
|
if int(n) < virtioNetHdrLen {
|
|
return 0, io.ErrShortWrite
|
|
}
|
|
return int(n) - virtioNetHdrLen, nil
|
|
}
|
|
if errno == unix.EAGAIN {
|
|
if err := r.blockOnWrite(); err != nil {
|
|
return 0, err
|
|
}
|
|
continue
|
|
}
|
|
if errno == unix.EINTR {
|
|
continue
|
|
}
|
|
if errno == unix.EBADF {
|
|
return 0, os.ErrClosed
|
|
}
|
|
return 0, errno
|
|
}
|
|
}
|
|
|
|
// GSOSupported reports whether this queue was opened with IFF_VNET_HDR and
|
|
// can accept WriteGSO. When false, callers should fall back to per-segment
|
|
// Write calls.
|
|
func (r *Offload) GSOSupported() bool { return true }
|
|
|
|
// WriteGSO emits a TCP TSO superpacket in a single writev. hdr is the
|
|
// IPv4/IPv6 + TCP header prefix (already finalized — total length, IP csum,
|
|
// and TCP pseudo-header partial set by the caller). pays are payload
|
|
// fragments whose concatenation forms the full coalesced payload; each
|
|
// slice is read-only and must stay valid until return. gsoSize is the MSS;
|
|
// every segment except possibly the last is exactly gsoSize bytes.
|
|
// csumStart is the byte offset where the TCP header begins within hdr.
|
|
func (r *Offload) WriteGSO(hdr []byte, pays [][]byte, gsoSize uint16, isV6 bool, csumStart uint16) error {
|
|
if len(hdr) == 0 || len(pays) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Build the virtio_net_hdr. When pays total to <= gsoSize the kernel
|
|
// would produce a single segment; keep NEEDS_CSUM semantics but skip
|
|
// the GSO type so the kernel doesn't spuriously mark this as TSO.
|
|
vhdr := VirtioNetHdr{
|
|
Flags: unix.VIRTIO_NET_HDR_F_NEEDS_CSUM,
|
|
HdrLen: uint16(len(hdr)),
|
|
GSOSize: gsoSize,
|
|
CsumStart: csumStart,
|
|
CsumOffset: 16, // TCP checksum field lives 16 bytes into the TCP header
|
|
}
|
|
var totalPay int
|
|
for _, p := range pays {
|
|
totalPay += len(p)
|
|
}
|
|
if totalPay > int(gsoSize) {
|
|
if isV6 {
|
|
vhdr.GSOType = unix.VIRTIO_NET_HDR_GSO_TCPV6
|
|
} else {
|
|
vhdr.GSOType = unix.VIRTIO_NET_HDR_GSO_TCPV4
|
|
}
|
|
} else {
|
|
vhdr.GSOType = unix.VIRTIO_NET_HDR_GSO_NONE
|
|
vhdr.GSOSize = 0
|
|
}
|
|
vhdr.encode(r.gsoHdrBuf[:])
|
|
|
|
// Build the iovec array: [virtio_hdr, hdr, pays...]. r.gsoIovs[0] is
|
|
// wired to gsoHdrBuf at construction and never changes.
|
|
need := 2 + len(pays)
|
|
if cap(r.gsoIovs) < need {
|
|
grown := make([]unix.Iovec, need)
|
|
grown[0] = r.gsoIovs[0]
|
|
r.gsoIovs = grown
|
|
} else {
|
|
r.gsoIovs = r.gsoIovs[:need]
|
|
}
|
|
r.gsoIovs[1].Base = &hdr[0]
|
|
r.gsoIovs[1].SetLen(len(hdr))
|
|
for i, p := range pays {
|
|
r.gsoIovs[2+i].Base = &p[0]
|
|
r.gsoIovs[2+i].SetLen(len(p))
|
|
}
|
|
|
|
iovPtr := unsafe.Pointer(&r.gsoIovs[0])
|
|
iovCnt := len(r.gsoIovs)
|
|
_, err := r.rawWrite(iovPtr, iovCnt)
|
|
return err
|
|
}
|
|
|
|
func (r *Offload) Close() error {
|
|
if r.closed.Swap(true) {
|
|
return nil
|
|
}
|
|
|
|
//shutdownFd is owned by the container, so we should not close it
|
|
var err error
|
|
if r.fd >= 0 {
|
|
err = unix.Close(r.fd)
|
|
r.fd = -1
|
|
}
|
|
|
|
return err
|
|
}
|