potential for bug

This commit is contained in:
JackDoan
2026-04-20 11:09:29 -05:00
parent 0f27b81f19
commit f34e8fe0e6
14 changed files with 77 additions and 6 deletions

View File

@@ -33,7 +33,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet
// routes packets from the Nebula addr to the Nebula addr through the Nebula // routes packets from the Nebula addr to the Nebula addr through the Nebula
// TUN device. // TUN device.
if immediatelyForwardToSelf { if immediatelyForwardToSelf {
_, err := f.readers[q].Write(packet) _, err := f.readers[q].WriteReject(packet)
if err != nil { if err != nil {
f.l.WithError(err).Error("Failed to forward to tun") f.l.WithError(err).Error("Failed to forward to tun")
} }
@@ -148,7 +148,7 @@ func (f *Interface) rejectInside(packet []byte, out []byte, q int) {
return return
} }
_, err := f.readers[q].Write(out) _, err := f.readers[q].WriteReject(out)
if err != nil { if err != nil {
f.l.WithError(err).Error("Failed to write to tun") f.l.WithError(err).Error("Failed to write to tun")
} }

View File

@@ -19,6 +19,12 @@ const defaultBatchBufSize = 65535
type Queue interface { type Queue interface {
io.ReadWriteCloser io.ReadWriteCloser
ReadBatch() ([][]byte, error) ReadBatch() ([][]byte, error)
// WriteReject writes a single packet that originated from the inside
// path (reject replies or self-forward) using scratch state distinct
// from Write, so it can run concurrently with Write on the same Queue
// without a data race. On backends without a shared-scratch Write, a
// trivial delegation to Write is acceptable.
WriteReject(p []byte) (int, error)
} }
type Device interface { type Device interface {

View File

@@ -37,6 +37,10 @@ func (NoopTun) Write([]byte) (int, error) {
return 0, nil return 0, nil
} }
func (NoopTun) WriteReject(p []byte) (int, error) {
return 0, nil
}
func (NoopTun) SupportsMultiqueue() bool { func (NoopTun) SupportsMultiqueue() bool {
return false return false
} }

View File

@@ -41,6 +41,10 @@ func (t *tun) ReadBatch() ([][]byte, error) {
return t.batchRet[:], nil return t.batchRet[:], nil
} }
func (t *tun) WriteReject(p []byte) (int, error) {
return t.Write(p)
}
func newTunFromFd(c *config.C, l *logrus.Logger, deviceFd int, vpnNetworks []netip.Prefix) (*tun, error) { func newTunFromFd(c *config.C, l *logrus.Logger, deviceFd int, vpnNetworks []netip.Prefix) (*tun, error) {
// XXX Android returns an fd in non-blocking mode which is necessary for shutdown to work properly. // XXX Android returns an fd in non-blocking mode which is necessary for shutdown to work properly.
// Be sure not to call file.Fd() as it will set the fd to blocking mode. // Be sure not to call file.Fd() as it will set the fd to blocking mode.

View File

@@ -527,6 +527,10 @@ func (t *tun) ReadBatch() ([][]byte, error) {
return t.batchRet[:], nil return t.batchRet[:], nil
} }
func (t *tun) WriteReject(p []byte) (int, error) {
return t.Write(p)
}
// Write is only valid for single threaded use // Write is only valid for single threaded use
func (t *tun) Write(from []byte) (int, error) { func (t *tun) Write(from []byte) (int, error) {
buf := t.out buf := t.out

View File

@@ -120,6 +120,10 @@ func (t *disabledTun) Write(b []byte) (int, error) {
return len(b), nil return len(b), nil
} }
func (t *disabledTun) WriteReject(b []byte) (int, error) {
return t.Write(b)
}
func (t *disabledTun) SupportsMultiqueue() bool { func (t *disabledTun) SupportsMultiqueue() bool {
return true return true
} }

View File

@@ -110,6 +110,10 @@ func (t *tun) ReadBatch() ([][]byte, error) {
return t.batchRet[:], nil return t.batchRet[:], nil
} }
func (t *tun) WriteReject(p []byte) (int, error) {
return t.Write(p)
}
func (t *tun) Read(to []byte) (int, error) { func (t *tun) Read(to []byte) (int, error) {
// use readv() to read from the tunnel device, to eliminate the need for copying the buffer // use readv() to read from the tunnel device, to eliminate the need for copying the buffer
if t.devFd < 0 { if t.devFd < 0 {

View File

@@ -43,6 +43,10 @@ func (t *tun) ReadBatch() ([][]byte, error) {
return t.batchRet[:], nil return t.batchRet[:], nil
} }
func (t *tun) WriteReject(p []byte) (int, error) {
return t.Write(p)
}
func newTun(_ *config.C, _ *logrus.Logger, _ []netip.Prefix, _ bool) (*tun, error) { func newTun(_ *config.C, _ *logrus.Logger, _ []netip.Prefix, _ bool) (*tun, error) {
return nil, fmt.Errorf("newTun not supported in iOS") return nil, fmt.Errorf("newTun not supported in iOS")
} }

View File

@@ -47,7 +47,12 @@ type tunFile struct {
segOff int // cursor into segBuf for the current ReadBatch drain segOff int // cursor into segBuf for the current ReadBatch drain
pending [][]byte // segments waiting to be drained by Read pending [][]byte // segments waiting to be drained by Read
pendingIdx int pendingIdx int
writeIovs [2]unix.Iovec // preallocated iovecs for vnetHdr writes; iovs[0] is fixed to zeroVnetHdr writeIovs [2]unix.Iovec // preallocated iovecs for Write (coalescer passthrough); iovs[0] is fixed to zeroVnetHdr
// rejectIovs is a second preallocated iovec scratch used exclusively by
// WriteReject (reject + self-forward from the inside path). It mirrors
// writeIovs but lets listenIn goroutines emit reject packets without
// racing with the listenOut coalescer that owns writeIovs.
rejectIovs [2]unix.Iovec
// gsoHdrBuf is a per-queue 10-byte scratch for the virtio_net_hdr emitted // gsoHdrBuf is a per-queue 10-byte scratch for the virtio_net_hdr emitted
// by WriteGSO. Separate from zeroVnetHdr so a concurrent non-GSO Write on // by WriteGSO. Separate from zeroVnetHdr so a concurrent non-GSO Write on
@@ -92,6 +97,8 @@ func (r *tunFile) newFriend(fd int) (*tunFile, error) {
out.segBuf = make([]byte, tunSegBufCap) out.segBuf = make([]byte, tunSegBufCap)
out.writeIovs[0].Base = &zeroVnetHdr[0] out.writeIovs[0].Base = &zeroVnetHdr[0]
out.writeIovs[0].SetLen(virtioNetHdrLen) out.writeIovs[0].SetLen(virtioNetHdrLen)
out.rejectIovs[0].Base = &zeroVnetHdr[0]
out.rejectIovs[0].SetLen(virtioNetHdrLen)
out.gsoIovs = make([]unix.Iovec, 2, 2+gsoInitialPayIovs) out.gsoIovs = make([]unix.Iovec, 2, 2+gsoInitialPayIovs)
out.gsoIovs[0].Base = &out.gsoHdrBuf[0] out.gsoIovs[0].Base = &out.gsoHdrBuf[0]
out.gsoIovs[0].SetLen(virtioNetHdrLen) out.gsoIovs[0].SetLen(virtioNetHdrLen)
@@ -128,6 +135,8 @@ func newTunFd(fd int, vnetHdr bool) (*tunFile, error) {
out.segBuf = make([]byte, tunSegBufCap) out.segBuf = make([]byte, tunSegBufCap)
out.writeIovs[0].Base = &zeroVnetHdr[0] out.writeIovs[0].Base = &zeroVnetHdr[0]
out.writeIovs[0].SetLen(virtioNetHdrLen) out.writeIovs[0].SetLen(virtioNetHdrLen)
out.rejectIovs[0].Base = &zeroVnetHdr[0]
out.rejectIovs[0].SetLen(virtioNetHdrLen)
out.gsoIovs = make([]unix.Iovec, 2, 2+gsoInitialPayIovs) out.gsoIovs = make([]unix.Iovec, 2, 2+gsoInitialPayIovs)
out.gsoIovs[0].Base = &out.gsoHdrBuf[0] out.gsoIovs[0].Base = &out.gsoHdrBuf[0]
out.gsoIovs[0].SetLen(virtioNetHdrLen) out.gsoIovs[0].SetLen(virtioNetHdrLen)
@@ -297,6 +306,19 @@ func (r *tunFile) Read(buf []byte) (int, error) {
} }
func (r *tunFile) Write(buf []byte) (int, error) { func (r *tunFile) Write(buf []byte) (int, error) {
return r.writeWithScratch(buf, &r.writeIovs)
}
// WriteReject emits a packet using a dedicated iovec scratch (rejectIovs)
// distinct from the one used by the coalescer's Write path. This avoids a
// data race between the inside (listenIn) goroutine emitting reject or
// self-forward packets and the outside (listenOut) goroutine flushing TCP
// coalescer passthroughs on the same tunFile.
func (r *tunFile) WriteReject(buf []byte) (int, error) {
return r.writeWithScratch(buf, &r.rejectIovs)
}
func (r *tunFile) writeWithScratch(buf []byte, iovs *[2]unix.Iovec) (int, error) {
if !r.vnetHdr { if !r.vnetHdr {
for { for {
if n, err := unix.Write(r.fd, buf); err == nil { if n, err := unix.Write(r.fd, buf); err == nil {
@@ -319,9 +341,9 @@ func (r *tunFile) Write(buf []byte) (int, error) {
} }
// Point the payload iovec at the caller's buffer. iovs[0] is pre-wired // Point the payload iovec at the caller's buffer. iovs[0] is pre-wired
// to zeroVnetHdr during tunFile construction so we don't rebuild it here. // to zeroVnetHdr during tunFile construction so we don't rebuild it here.
r.writeIovs[1].Base = &buf[0] iovs[1].Base = &buf[0]
r.writeIovs[1].SetLen(len(buf)) iovs[1].SetLen(len(buf))
iovPtr := uintptr(unsafe.Pointer(&r.writeIovs[0])) iovPtr := uintptr(unsafe.Pointer(&iovs[0]))
// The TUN fd is non-blocking (set in newTunFd / newFriend), so writev // The TUN fd is non-blocking (set in newTunFd / newFriend), so writev
// either completes promptly or returns EAGAIN — it cannot park the // either completes promptly or returns EAGAIN — it cannot park the
// goroutine inside the kernel. That lets us use syscall.RawSyscall and // goroutine inside the kernel. That lets us use syscall.RawSyscall and

View File

@@ -82,6 +82,10 @@ func (t *tun) ReadBatch() ([][]byte, error) {
return t.batchRet[:], nil return t.batchRet[:], nil
} }
func (t *tun) WriteReject(p []byte) (int, error) {
return t.Write(p)
}
var deviceNameRE = regexp.MustCompile(`^tun[0-9]+$`) var deviceNameRE = regexp.MustCompile(`^tun[0-9]+$`)
func newTunFromFd(_ *config.C, _ *logrus.Logger, _ int, _ []netip.Prefix) (*tun, error) { func newTunFromFd(_ *config.C, _ *logrus.Logger, _ int, _ []netip.Prefix) (*tun, error) {

View File

@@ -75,6 +75,10 @@ func (t *tun) ReadBatch() ([][]byte, error) {
return t.batchRet[:], nil return t.batchRet[:], nil
} }
func (t *tun) WriteReject(p []byte) (int, error) {
return t.Write(p)
}
var deviceNameRE = regexp.MustCompile(`^tun[0-9]+$`) var deviceNameRE = regexp.MustCompile(`^tun[0-9]+$`)
func newTunFromFd(_ *config.C, _ *logrus.Logger, _ int, _ []netip.Prefix) (*tun, error) { func newTunFromFd(_ *config.C, _ *logrus.Logger, _ int, _ []netip.Prefix) (*tun, error) {

View File

@@ -130,6 +130,10 @@ func (t *TestTun) Write(b []byte) (n int, err error) {
return len(b), nil return len(b), nil
} }
func (t *TestTun) WriteReject(b []byte) (int, error) {
return t.Write(b)
}
func (t *TestTun) Close() error { func (t *TestTun) Close() error {
if t.closed.CompareAndSwap(false, true) { if t.closed.CompareAndSwap(false, true) {
close(t.rxPackets) close(t.rxPackets)

View File

@@ -52,6 +52,10 @@ func (t *winTun) ReadBatch() ([][]byte, error) {
return t.batchRet[:], nil return t.batchRet[:], nil
} }
func (t *winTun) WriteReject(p []byte) (int, error) {
return t.Write(p)
}
func newTunFromFd(_ *config.C, _ *logrus.Logger, _ int, _ []netip.Prefix) (Device, error) { func newTunFromFd(_ *config.C, _ *logrus.Logger, _ int, _ []netip.Prefix) (Device, error) {
return nil, fmt.Errorf("newTunFromFd not supported in Windows") return nil, fmt.Errorf("newTunFromFd not supported in Windows")
} }

View File

@@ -79,6 +79,9 @@ func (d *UserDevice) Read(p []byte) (n int, err error) {
func (d *UserDevice) Write(p []byte) (n int, err error) { func (d *UserDevice) Write(p []byte) (n int, err error) {
return d.inboundWriter.Write(p) return d.inboundWriter.Write(p)
} }
func (d *UserDevice) WriteReject(p []byte) (n int, err error) {
return d.Write(p)
}
func (d *UserDevice) Close() error { func (d *UserDevice) Close() error {
d.inboundWriter.Close() d.inboundWriter.Close()
d.outboundWriter.Close() d.outboundWriter.Close()