more nonblocking

This commit is contained in:
Jay Wren
2025-11-11 14:22:40 -05:00
parent b68e504865
commit ef0a022375

View File

@@ -70,72 +70,55 @@ const (
virtioNetHdrLen = 10 // Size of virtio_net_hdr structure virtioNetHdrLen = 10 // Size of virtio_net_hdr structure
) )
// tunVirtioReader wraps a file descriptor that has IFF_VNET_HDR enabled // wgDeviceWrapper wraps a wireguard Device to implement io.ReadWriteCloser
// and strips the virtio header on reads, adds it on writes // This allows multiqueue readers to use the same wireguard Device batching as the main device
type tunVirtioReader struct { type wgDeviceWrapper struct {
f *os.File dev wgtun.Device
buf [virtioNetHdrLen + 65535]byte // Space for header + max packet buf []byte // Reusable buffer for single packet reads
} }
func (r *tunVirtioReader) Read(b []byte) (int, error) { func (w *wgDeviceWrapper) Read(b []byte) (int, error) {
// Read into our buffer which has space for the virtio header // Use wireguard Device's batch API for single packet
n, err := r.f.Read(r.buf[:]) bufs := [][]byte{b}
sizes := make([]int, 1)
n, err := w.dev.Read(bufs, sizes, 0)
if err != nil { if err != nil {
return 0, err return 0, err
} }
if n == 0 {
// Strip the virtio header (first 10 bytes) return 0, io.EOF
if n < virtioNetHdrLen { }
return 0, fmt.Errorf("packet too short: %d bytes", n) return sizes[0], nil
} }
// Copy payload (after header) to destination func (w *wgDeviceWrapper) Write(b []byte) (int, error) {
copy(b, r.buf[virtioNetHdrLen:n]) // Allocate buffer with space for virtio header
return n - virtioNetHdrLen, nil buf := make([]byte, virtioNetHdrLen+len(b))
} copy(buf[virtioNetHdrLen:], b)
func (r *tunVirtioReader) Write(b []byte) (int, error) { bufs := [][]byte{buf}
// Zero out the virtio header (no offload from userspace write) n, err := w.dev.Write(bufs, virtioNetHdrLen)
for i := 0; i < virtioNetHdrLen; i++ {
r.buf[i] = 0
}
// Copy packet data after header
copy(r.buf[virtioNetHdrLen:], b)
// Write with header prepended
n, err := r.f.Write(r.buf[:virtioNetHdrLen+len(b)])
if err != nil { if err != nil {
return 0, err return 0, err
} }
if n == 0 {
// Return payload size (excluding header) return 0, io.ErrShortWrite
return n - virtioNetHdrLen, nil }
return len(b), nil
} }
func (r *tunVirtioReader) Close() error { func (w *wgDeviceWrapper) Close() error {
return r.f.Close() return w.dev.Close()
} }
// BatchRead reads multiple packets at once for performance // BatchRead implements batching for multiqueue readers
// This is not used for multiqueue readers as they use direct file I/O func (w *wgDeviceWrapper) BatchRead(bufs [][]byte, sizes []int) (int, error) {
// Returns number of packets read return w.dev.Read(bufs, sizes, 0)
func (r *tunVirtioReader) BatchRead(bufs [][]byte, sizes []int) (int, error) {
// For multiqueue file descriptors, we don't have the wireguard Device interface
// Fall back to single packet reads
// TODO: Could implement proper batching with unix.Recvmmsg
n, err := r.Read(bufs[0])
if err != nil {
return 0, err
}
sizes[0] = n
return 1, nil
} }
// BatchSize returns the batch size for multiqueue readers // BatchSize returns the optimal batch size
func (r *tunVirtioReader) BatchSize() int { func (w *wgDeviceWrapper) BatchSize() int {
// Multiqueue readers use single packet mode for now return w.dev.BatchSize()
return 1
} }
func newTunFromFd(c *config.C, l *logrus.Logger, deviceFd int, vpnNetworks []netip.Prefix) (*tun, error) { func newTunFromFd(c *config.C, l *logrus.Logger, deviceFd int, vpnNetworks []netip.Prefix) (*tun, error) {
@@ -343,10 +326,29 @@ func (t *tun) NewMultiQueueReader() (io.ReadWriteCloser, error) {
return nil, err return nil, err
} }
// Set nonblocking mode - CRITICAL for proper netpoller integration
if err = unix.SetNonblock(fd, true); err != nil {
unix.Close(fd)
return nil, err
}
// Get MTU from main device
mtu := t.MaxMTU
if mtu == 0 {
mtu = DefaultMTU
}
file := os.NewFile(uintptr(fd), "/dev/net/tun") file := os.NewFile(uintptr(fd), "/dev/net/tun")
// Wrap in virtio header handler // Create wireguard Device from the file descriptor (just like the main device)
return &tunVirtioReader{f: file}, nil wgDev, err := wgtun.CreateTUNFromFile(file, mtu)
if err != nil {
file.Close()
return nil, fmt.Errorf("failed to create multiqueue TUN device: %w", err)
}
// Return a wrapper that uses the wireguard Device for all I/O
return &wgDeviceWrapper{dev: wgDev}, nil
} }
func (t *tun) RoutesFor(ip netip.Addr) routing.Gateways { func (t *tun) RoutesFor(ip netip.Addr) routing.Gateways {