diff --git a/cmd/nebula/main.go b/cmd/nebula/main.go index 5cf0a02..6572a6b 100644 --- a/cmd/nebula/main.go +++ b/cmd/nebula/main.go @@ -3,6 +3,9 @@ package main import ( "flag" "fmt" + "log" + "net/http" + _ "net/http/pprof" "os" "github.com/sirupsen/logrus" @@ -58,6 +61,10 @@ func main() { os.Exit(1) } + go func() { + log.Println(http.ListenAndServe("0.0.0.0:6060", nil)) + }() + if !*configTest { ctrl.Start() notifyReady(l) diff --git a/overlay/vhostnet/device.go b/overlay/vhostnet/device.go index 17aa11d..cb0dc6e 100644 --- a/overlay/vhostnet/device.go +++ b/overlay/vhostnet/device.go @@ -32,8 +32,6 @@ type Device struct { ReceiveQueue *virtqueue.SplitQueue TransmitQueue *virtqueue.SplitQueue - - extraRx []virtqueue.UsedElement } // NewDevice initializes a new vhost networking device within the @@ -279,7 +277,6 @@ func (dev *Device) TransmitPackets(vnethdr virtio.NetHdr, packets [][]byte) erro } } - //todo blocking here suxxxx // Wait for the packet to have been transmitted. for i := range chainIndexes { @@ -297,7 +294,7 @@ func (dev *Device) TransmitPackets(vnethdr virtio.NetHdr, packets [][]byte) erro // processChains processes as many chains as needed to create one packet. The number of processed chains is returned. func (dev *Device) processChains(pkt *packet.VirtIOPacket, chains []virtqueue.UsedElement) (int, error) { //read first element to see how many descriptors we need: - pkt.Payload = pkt.Payload[:cap(pkt.Payload)] + pkt.Reset() n, err := dev.ReceiveQueue.GetDescriptorChainContents(uint16(chains[0].DescriptorIndex), pkt.Payload, int(chains[0].Length)) //todo if err != nil { return 0, err @@ -321,7 +318,7 @@ func (dev *Device) processChains(pkt *packet.VirtIOPacket, chains []virtqueue.Us } //shift the buffer out of out: - copy(pkt.Payload, pkt.Payload[virtio.NetHdrSize:]) + pkt.Payload = pkt.Payload[virtio.NetHdrSize:] cursor := n - virtio.NetHdrSize diff --git a/overlay/virtqueue/available_ring.go b/overlay/virtqueue/available_ring.go index a73afa2..abe540b 100644 --- a/overlay/virtqueue/available_ring.go +++ b/overlay/virtqueue/available_ring.go @@ -47,8 +47,6 @@ type AvailableRing struct { // avoid issues in case a device may try to access it, contrary to the // virtio specification. usedEvent *uint16 - - //mu sync.Mutex } // newAvailableRing creates an available ring that uses the given underlying diff --git a/overlay/virtqueue/descriptor_table.go b/overlay/virtqueue/descriptor_table.go index 7a29c9a..9638a4f 100644 --- a/overlay/virtqueue/descriptor_table.go +++ b/overlay/virtqueue/descriptor_table.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "math" - "sync" "unsafe" "golang.org/x/sys/unix" @@ -54,8 +53,6 @@ type DescriptorTable struct { bufferBase uintptr bufferSize int itemSize int - - mu sync.Mutex } // newDescriptorTable creates a descriptor table that uses the given underlying @@ -126,9 +123,6 @@ func (dt *DescriptorTable) initializeDescriptors() error { return fmt.Errorf("allocate buffer memory for descriptors: %w", err) } - dt.mu.Lock() - defer dt.mu.Unlock() - // Store the base for cleanup later dt.bufferBase = uintptr(basePtr) dt.bufferSize = totalSize @@ -155,9 +149,6 @@ func (dt *DescriptorTable) initializeDescriptors() error { // collect potential errors before returning them. // The descriptor table should no longer be used after calling this. func (dt *DescriptorTable) releaseBuffers() error { - dt.mu.Lock() - defer dt.mu.Unlock() - for i := range dt.descriptors { descriptor := &dt.descriptors[i] descriptor.address = 0 @@ -209,9 +200,6 @@ func (dt *DescriptorTable) createDescriptorChain(outBuffers [][]byte, numInBuffe return 0, ErrDescriptorChainEmpty } - dt.mu.Lock() - defer dt.mu.Unlock() - // Do we still have enough free descriptors? if numDesc > dt.freeNum { return 0, ErrNotEnoughFreeDescriptors @@ -309,9 +297,6 @@ func (dt *DescriptorTable) getDescriptorChain(head uint16) (outBuffers, inBuffer return nil, nil, fmt.Errorf("%w: index out of range", ErrInvalidDescriptorChain) } - dt.mu.Lock() - defer dt.mu.Unlock() - // Iterate over the chain. The iteration is limited to the queue size to // avoid ending up in an endless loop when things go very wrong. next := head @@ -354,9 +339,6 @@ func (dt *DescriptorTable) getDescriptorChainContents(head uint16, out []byte, m return 0, fmt.Errorf("%w: index out of range", ErrInvalidDescriptorChain) } - dt.mu.Lock() - defer dt.mu.Unlock() - // Iterate over the chain. The iteration is limited to the queue size to // avoid ending up in an endless loop when things go very wrong. @@ -431,9 +413,6 @@ func (dt *DescriptorTable) freeDescriptorChain(head uint16) error { return fmt.Errorf("%w: index out of range", ErrInvalidDescriptorChain) } - dt.mu.Lock() - defer dt.mu.Unlock() - // Iterate over the chain. The iteration is limited to the queue size to // avoid ending up in an endless loop when things go very wrong. next := head diff --git a/overlay/virtqueue/split_virtqueue.go b/overlay/virtqueue/split_virtqueue.go index dec9a2b..6bb9d10 100644 --- a/overlay/virtqueue/split_virtqueue.go +++ b/overlay/virtqueue/split_virtqueue.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "os" - "sync" "syscall" "github.com/slackhq/nebula/overlay/eventfd" @@ -35,11 +34,8 @@ type SplitQueue struct { // used buffer notifications. It blocks until the goroutine ended. stop func() error - // offerMutex is used to synchronize calls to - // [SplitQueue.OfferDescriptorChain]. - offerMutex sync.Mutex - pageSize int - itemSize int + pageSize int + itemSize int epoll eventfd.Epoll more int @@ -140,25 +136,21 @@ func NewSplitQueue(queueSize int) (_ *SplitQueue, err error) { // Size returns the size of this queue, which is the number of entries/buffers // this queue can hold. func (sq *SplitQueue) Size() int { - sq.ensureInitialized() return sq.size } // DescriptorTable returns the [DescriptorTable] behind this queue. func (sq *SplitQueue) DescriptorTable() *DescriptorTable { - sq.ensureInitialized() return sq.descriptorTable } // AvailableRing returns the [AvailableRing] behind this queue. func (sq *SplitQueue) AvailableRing() *AvailableRing { - sq.ensureInitialized() return sq.availableRing } // UsedRing returns the [UsedRing] behind this queue. func (sq *SplitQueue) UsedRing() *UsedRing { - sq.ensureInitialized() return sq.usedRing } @@ -166,7 +158,6 @@ func (sq *SplitQueue) UsedRing() *UsedRing { // The returned file descriptor should be used with great care to not interfere // with this implementation. func (sq *SplitQueue) KickEventFD() int { - sq.ensureInitialized() return sq.kickEventFD.FD() } @@ -174,7 +165,6 @@ func (sq *SplitQueue) KickEventFD() int { // The returned file descriptor should be used with great care to not interfere // with this implementation. func (sq *SplitQueue) CallEventFD() int { - sq.ensureInitialized() return sq.callEventFD.FD() } @@ -276,15 +266,6 @@ func (sq *SplitQueue) BlockAndGetHeadsCapped(ctx context.Context, maxToTake int) // and any further calls to [SplitQueue.OfferDescriptorChain] will stall. func (sq *SplitQueue) OfferInDescriptorChains(numInBuffers int) (uint16, error) { - sq.ensureInitialized() - // Synchronize the offering of descriptor chains. While the descriptor table - // and available ring are synchronized on their own as well, this does not - // protect us from interleaved calls which could cause reordering. - // By locking here, we can ensure that all descriptor chains are made - // available to the device in the same order as this method was called. - sq.offerMutex.Lock() - defer sq.offerMutex.Unlock() - // Create a descriptor chain for the given buffers. var ( head uint16 @@ -317,21 +298,11 @@ func (sq *SplitQueue) OfferInDescriptorChains(numInBuffers int) (uint16, error) } func (sq *SplitQueue) OfferOutDescriptorChains(prepend []byte, outBuffers [][]byte) ([]uint16, error) { - sq.ensureInitialized() - // TODO change this // Each descriptor can only hold a whole memory page, so split large out // buffers into multiple smaller ones. outBuffers = splitBuffers(outBuffers, sq.pageSize) - // Synchronize the offering of descriptor chains. While the descriptor table - // and available ring are synchronized on their own as well, this does not - // protect us from interleaved calls which could cause reordering. - // By locking here, we can ensure that all descriptor chains are made - // available to the device in the same order as this method was called. - sq.offerMutex.Lock() - defer sq.offerMutex.Unlock() - chains := make([]uint16, len(outBuffers)) // Create a descriptor chain for the given buffers. @@ -384,12 +355,10 @@ func (sq *SplitQueue) OfferOutDescriptorChains(prepend []byte, outBuffers [][]by // longer using them. They must not be accessed after // [SplitQueue.FreeDescriptorChain] has been called. func (sq *SplitQueue) GetDescriptorChain(head uint16) (outBuffers, inBuffers [][]byte, err error) { - sq.ensureInitialized() return sq.descriptorTable.getDescriptorChain(head) } func (sq *SplitQueue) GetDescriptorChainContents(head uint16, out []byte, maxLen int) (int, error) { - sq.ensureInitialized() return sq.descriptorTable.getDescriptorChainContents(head, out, maxLen) } @@ -412,17 +381,6 @@ func (sq *SplitQueue) FreeDescriptorChain(head uint16) error { } func (sq *SplitQueue) RecycleDescriptorChains(chains []UsedElement) error { - sq.ensureInitialized() - - //todo I don't think we need this here? - // Synchronize the offering of descriptor chains. While the descriptor table - // and available ring are synchronized on their own as well, this does not - // protect us from interleaved calls which could cause reordering. - // By locking here, we can ensure that all descriptor chains are made - // available to the device in the same order as this method was called. - //sq.offerMutex.Lock() - //defer sq.offerMutex.Unlock() - //todo not doing this may break eventually? //not called under lock //if err := sq.descriptorTable.freeDescriptorChain(head); err != nil { diff --git a/packet/virtio.go b/packet/virtio.go index 18c5bce..0598ef5 100644 --- a/packet/virtio.go +++ b/packet/virtio.go @@ -6,11 +6,17 @@ import ( type VirtIOPacket struct { Payload []byte + buf []byte Header virtio.NetHdr } func NewVIO() *VirtIOPacket { out := new(VirtIOPacket) out.Payload = make([]byte, Size) + out.buf = out.Payload return out } + +func (v *VirtIOPacket) Reset() { + v.Payload = v.buf[:Size] +}