From 1a83817cc28ee92b28741cd35160cbe23a18a29c Mon Sep 17 00:00:00 2001 From: JackDoan Date: Tue, 11 Nov 2025 22:14:19 -0600 Subject: [PATCH] multiqueue but it doesn't help --- interface.go | 4 +-- overlay/tun.go | 4 +-- overlay/tun_disabled.go | 4 +-- overlay/tun_linux.go | 71 ++++++++++++++++++-------------------- overlay/user.go | 9 ++--- overlay/vhostnet/device.go | 2 +- 6 files changed, 43 insertions(+), 51 deletions(-) diff --git a/interface.go b/interface.go index 6d6f819..4917429 100644 --- a/interface.go +++ b/interface.go @@ -301,7 +301,7 @@ func (f *Interface) listenOut(q int) { for i := 0; i < len(toSend); i += batch { x := min(len(toSend[i:]), batch) toSendThisTime := toSend[i : i+x] - _, err := f.readers[q].WriteMany(toSendThisTime) + _, err := f.readers[q].WriteMany(toSendThisTime, q) if err != nil { f.l.WithError(err).Error("Failed to write messages") } @@ -332,7 +332,7 @@ func (f *Interface) listenIn(reader overlay.TunDev, queueNum int) { for { - n, err := reader.ReadMany(packets) + n, err := reader.ReadMany(packets, queueNum) //todo!! if err != nil { if errors.Is(err, os.ErrClosed) && f.closed.Load() { diff --git a/overlay/tun.go b/overlay/tun.go index e5b1fd7..871e01d 100644 --- a/overlay/tun.go +++ b/overlay/tun.go @@ -16,8 +16,8 @@ const DefaultMTU = 1300 type TunDev interface { io.WriteCloser - ReadMany([]*packet.VirtIOPacket) (int, error) - WriteMany([][]byte) (int, error) + ReadMany(x []*packet.VirtIOPacket, q int) (int, error) + WriteMany(x [][]byte, q int) (int, error) } // TODO: We may be able to remove routines diff --git a/overlay/tun_disabled.go b/overlay/tun_disabled.go index 1adb062..55e6303 100644 --- a/overlay/tun_disabled.go +++ b/overlay/tun_disabled.go @@ -111,7 +111,7 @@ func (t *disabledTun) Write(b []byte) (int, error) { return len(b), nil } -func (t *disabledTun) WriteMany(b [][]byte) (int, error) { +func (t *disabledTun) WriteMany(b [][]byte, _ int) (int, error) { out := 0 for i := range b { x, err := t.Write(b[i]) @@ -123,7 +123,7 @@ func (t *disabledTun) WriteMany(b [][]byte) (int, error) { return out, nil } -func (t *disabledTun) ReadMany(b []*packet.VirtIOPacket) (int, error) { +func (t *disabledTun) ReadMany(b []*packet.VirtIOPacket, _ int) (int, error) { return t.Read(b[0].Payload) } diff --git a/overlay/tun_linux.go b/overlay/tun_linux.go index 1283f23..d8c81cd 100644 --- a/overlay/tun_linux.go +++ b/overlay/tun_linux.go @@ -10,7 +10,6 @@ import ( "os" "strings" "sync/atomic" - "syscall" "time" "unsafe" @@ -29,7 +28,7 @@ import ( type tun struct { file *os.File fd int - vdev *vhostnet.Device + vdev []*vhostnet.Device Device string vpnNetworks []netip.Prefix MaxMTU int @@ -108,7 +107,7 @@ func newTun(c *config.C, l *logrus.Logger, vpnNetworks []netip.Prefix, multiqueu var req ifReq req.Flags = uint16(unix.IFF_TUN | unix.IFF_NO_PI | unix.IFF_TUN_EXCL | unix.IFF_VNET_HDR) if multiqueue { - //req.Flags |= unix.IFF_MULTI_QUEUE + req.Flags |= unix.IFF_MULTI_QUEUE } copy(req.Name[:], c.GetString("tun.dev", "")) if err = ioctl(uintptr(fd), uintptr(unix.TUNSETIFF), uintptr(unsafe.Pointer(&req))); err != nil { @@ -135,17 +134,6 @@ func newTun(c *config.C, l *logrus.Logger, vpnNetworks []netip.Prefix, multiqueu return nil, fmt.Errorf("set offloads: %w", err) } - //name := strings.Trim(c.GetString("tun.dev", ""), "\x00") - //tundev, err := tuntap.NewDevice( - // tuntap.WithName(name), - // tuntap.WithDeviceType(tuntap.DeviceTypeTUN), //todo wtf - // tuntap.WithVirtioNetHdr(true), //todo hmm - // tuntap.WithOffloads(unix.TUN_F_CSUM|unix.TUN_F_USO4|unix.TUN_F_USO6), //todo - //) - //if err != nil { - // return nil, err - //} - t, err := newTunGeneric(c, l, file, vpnNetworks) if err != nil { return nil, err @@ -160,7 +148,7 @@ func newTun(c *config.C, l *logrus.Logger, vpnNetworks []netip.Prefix, multiqueu if err != nil { return nil, err } - t.vdev = vdev + t.vdev = []*vhostnet.Device{vdev} return t, nil } @@ -259,22 +247,29 @@ func (t *tun) reload(c *config.C, initial bool) error { } func (t *tun) NewMultiQueueReader() (TunDev, error) { - //fd, err := unix.Open("/dev/net/tun", os.O_RDWR, 0) - //if err != nil { - // return nil, err - //} - // - //var req ifReq - //req.Flags = uint16(unix.IFF_TUN | unix.IFF_NO_PI | unix.IFF_MULTI_QUEUE) - //copy(req.Name[:], t.Device) - //if err = ioctl(uintptr(fd), uintptr(unix.TUNSETIFF), uintptr(unsafe.Pointer(&req))); err != nil { - // return nil, err - //} - // - //file := os.NewFile(uintptr(fd), "/dev/net/tun") + fd, err := unix.Open("/dev/net/tun", os.O_RDWR, 0) + if err != nil { + return nil, err + } - //return file, nil - return nil, syscall.ENOTSUP + var req ifReq + req.Flags = uint16(unix.IFF_TUN | unix.IFF_NO_PI | unix.IFF_MULTI_QUEUE) + copy(req.Name[:], t.Device) + if err = ioctl(uintptr(fd), uintptr(unix.TUNSETIFF), uintptr(unsafe.Pointer(&req))); err != nil { + return nil, err + } + + vdev, err := vhostnet.NewDevice( + vhostnet.WithBackendFD(fd), + vhostnet.WithQueueSize(8192), //todo config + ) + if err != nil { + return nil, err + } + + t.vdev = append(t.vdev, vdev) + + return t, nil } func (t *tun) RoutesFor(ip netip.Addr) routing.Gateways { @@ -694,8 +689,10 @@ func (t *tun) Close() error { close(t.routeChan) } - if t.vdev != nil { - _ = t.vdev.Close() + for _, v := range t.vdev { + if v != nil { + _ = v.Close() + } } if t.file != nil { @@ -709,8 +706,8 @@ func (t *tun) Close() error { return nil } -func (t *tun) ReadMany(p []*packet.VirtIOPacket) (int, error) { - n, err := t.vdev.ReceivePackets(p) //we are TXing +func (t *tun) ReadMany(p []*packet.VirtIOPacket, q int) (int, error) { + n, err := t.vdev[q].ReceivePackets(p) //we are TXing if err != nil { return 0, err } @@ -730,7 +727,7 @@ func (t *tun) Write(b []byte) (int, error) { NumBuffers: 0, } - err := t.vdev.TransmitPackets(hdr, [][]byte{b}) + err := t.vdev[0].TransmitPackets(hdr, [][]byte{b}) if err != nil { t.l.WithError(err).Error("Transmitting packet") return 0, err @@ -738,7 +735,7 @@ func (t *tun) Write(b []byte) (int, error) { return maximum, nil } -func (t *tun) WriteMany(b [][]byte) (int, error) { +func (t *tun) WriteMany(b [][]byte, q int) (int, error) { maximum := len(b) //we are RXing if maximum == 0 { return 0, nil @@ -753,7 +750,7 @@ func (t *tun) WriteMany(b [][]byte) (int, error) { NumBuffers: 0, } - err := t.vdev.TransmitPackets(hdr, b) + err := t.vdev[q].TransmitPackets(hdr, b) if err != nil { t.l.WithError(err).Error("Transmitting packet") return 0, err diff --git a/overlay/user.go b/overlay/user.go index 62cf786..992b74a 100644 --- a/overlay/user.go +++ b/overlay/user.go @@ -6,7 +6,6 @@ import ( "github.com/sirupsen/logrus" "github.com/slackhq/nebula/config" - "github.com/slackhq/nebula/overlay/virtqueue" "github.com/slackhq/nebula/packet" "github.com/slackhq/nebula/routing" ) @@ -68,11 +67,11 @@ func (d *UserDevice) Close() error { return nil } -func (d *UserDevice) ReadMany(b []*packet.VirtIOPacket) (int, error) { +func (d *UserDevice) ReadMany(b []*packet.VirtIOPacket, _ int) (int, error) { return d.Read(b[0].Payload) } -func (d *UserDevice) WriteMany(b [][]byte) (int, error) { +func (d *UserDevice) WriteMany(b [][]byte, _ int) (int, error) { out := 0 for i := range b { x, err := d.Write(b[i]) @@ -83,7 +82,3 @@ func (d *UserDevice) WriteMany(b [][]byte) (int, error) { } return out, nil } - -func (*UserDevice) GetQueues() []*virtqueue.SplitQueue { - return nil -} diff --git a/overlay/vhostnet/device.go b/overlay/vhostnet/device.go index 6dda434..b8a6aa0 100644 --- a/overlay/vhostnet/device.go +++ b/overlay/vhostnet/device.go @@ -77,7 +77,7 @@ func NewDevice(options ...Option) (*Device, error) { // Advertise the supported features. This isn't much for now. // TODO: Add feature options and implement proper feature negotiation. - getFeatures, err := vhost.GetFeatures(dev.controlFD) + getFeatures, err := vhost.GetFeatures(dev.controlFD) //0x1033D008000 but why if err != nil { return nil, fmt.Errorf("get features: %w", err) }