mirror of
https://github.com/slackhq/nebula.git
synced 2025-11-23 17:04:25 +01:00
move things I'm gclog-ing to the bottom
This commit is contained in:
@@ -174,6 +174,110 @@ func (dev *Device) monitorTransmitQueue() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// refillReceiveQueue offers as many new device-writable buffers to the device
|
||||||
|
// as the queue can fit. The device will then use these to write received
|
||||||
|
// packets.
|
||||||
|
func (dev *Device) refillReceiveQueue() error {
|
||||||
|
for {
|
||||||
|
_, err := dev.receiveQueue.OfferDescriptorChain(nil, 1, false)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, virtqueue.ErrNotEnoughFreeDescriptors) {
|
||||||
|
// Queue is full, job is done.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("offer descriptor chain: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close cleans up the vhost networking device within the kernel and releases
|
||||||
|
// all resources used for it.
|
||||||
|
// The implementation will try to release as many resources as possible and
|
||||||
|
// collect potential errors before returning them.
|
||||||
|
func (dev *Device) Close() error {
|
||||||
|
dev.initialized = false
|
||||||
|
|
||||||
|
// Closing the control file descriptor will unregister all queues from the
|
||||||
|
// kernel.
|
||||||
|
if dev.controlFD >= 0 {
|
||||||
|
if err := unix.Close(dev.controlFD); err != nil {
|
||||||
|
// Return an error and do not continue, because the memory used for
|
||||||
|
// the queues should not be released before they were unregistered
|
||||||
|
// from the kernel.
|
||||||
|
return fmt.Errorf("close control file descriptor: %w", err)
|
||||||
|
}
|
||||||
|
dev.controlFD = -1
|
||||||
|
}
|
||||||
|
|
||||||
|
var errs []error
|
||||||
|
|
||||||
|
if dev.receiveQueue != nil {
|
||||||
|
if err := dev.receiveQueue.Close(); err == nil {
|
||||||
|
dev.receiveQueue = nil
|
||||||
|
} else {
|
||||||
|
errs = append(errs, fmt.Errorf("close receive queue: %w", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if dev.transmitQueue != nil {
|
||||||
|
if err := dev.transmitQueue.Close(); err == nil {
|
||||||
|
dev.transmitQueue = nil
|
||||||
|
} else {
|
||||||
|
errs = append(errs, fmt.Errorf("close transmit queue: %w", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(errs) == 0 {
|
||||||
|
// Everything was cleaned up. No need to run the finalizer anymore.
|
||||||
|
runtime.SetFinalizer(dev, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.Join(errs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensureInitialized is used as a guard to prevent methods to be called on an
|
||||||
|
// uninitialized instance.
|
||||||
|
func (dev *Device) ensureInitialized() {
|
||||||
|
if !dev.initialized {
|
||||||
|
panic("device is not initialized")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// createQueue creates a new virtqueue and registers it with the vhost device
|
||||||
|
// using the given index.
|
||||||
|
func createQueue(controlFD int, queueIndex int, queueSize int) (*virtqueue.SplitQueue, error) {
|
||||||
|
var (
|
||||||
|
queue *virtqueue.SplitQueue
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
if queue, err = virtqueue.NewSplitQueue(queueSize); err != nil {
|
||||||
|
return nil, fmt.Errorf("create virtqueue: %w", err)
|
||||||
|
}
|
||||||
|
if err = vhost.RegisterQueue(controlFD, uint32(queueIndex), queue); err != nil {
|
||||||
|
return nil, fmt.Errorf("register virtqueue with index %d: %w", queueIndex, err)
|
||||||
|
}
|
||||||
|
return queue, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// truncateBuffers returns a new list of buffers whose combined length matches
|
||||||
|
// exactly the specified length. When the specified length exceeds the length of
|
||||||
|
// the buffers, this is an error. When it is smaller, the buffer list will be
|
||||||
|
// truncated accordingly.
|
||||||
|
func truncateBuffers(buffers [][]byte, length int) (out [][]byte) {
|
||||||
|
for _, buffer := range buffers {
|
||||||
|
if length < len(buffer) {
|
||||||
|
out = append(out, buffer[:length])
|
||||||
|
return
|
||||||
|
}
|
||||||
|
out = append(out, buffer)
|
||||||
|
length -= len(buffer)
|
||||||
|
}
|
||||||
|
if length > 0 {
|
||||||
|
panic("length exceeds the combined length of all buffers")
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// TransmitPacket writes the given packet into the transmit queue of this
|
// TransmitPacket writes the given packet into the transmit queue of this
|
||||||
// device. The packet will be prepended with the [virtio.NetHdr].
|
// device. The packet will be prepended with the [virtio.NetHdr].
|
||||||
//
|
//
|
||||||
@@ -318,107 +422,3 @@ func (dev *Device) ReceivePacket() (virtio.NetHdr, []byte, error) {
|
|||||||
|
|
||||||
// TODO: Make above methods cancelable by taking a context.Context argument?
|
// TODO: Make above methods cancelable by taking a context.Context argument?
|
||||||
// TODO: Implement zero-copy variants to transmit and receive packets?
|
// TODO: Implement zero-copy variants to transmit and receive packets?
|
||||||
|
|
||||||
// refillReceiveQueue offers as many new device-writable buffers to the device
|
|
||||||
// as the queue can fit. The device will then use these to write received
|
|
||||||
// packets.
|
|
||||||
func (dev *Device) refillReceiveQueue() error {
|
|
||||||
for {
|
|
||||||
_, err := dev.receiveQueue.OfferDescriptorChain(nil, 1, false)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, virtqueue.ErrNotEnoughFreeDescriptors) {
|
|
||||||
// Queue is full, job is done.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return fmt.Errorf("offer descriptor chain: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close cleans up the vhost networking device within the kernel and releases
|
|
||||||
// all resources used for it.
|
|
||||||
// The implementation will try to release as many resources as possible and
|
|
||||||
// collect potential errors before returning them.
|
|
||||||
func (dev *Device) Close() error {
|
|
||||||
dev.initialized = false
|
|
||||||
|
|
||||||
// Closing the control file descriptor will unregister all queues from the
|
|
||||||
// kernel.
|
|
||||||
if dev.controlFD >= 0 {
|
|
||||||
if err := unix.Close(dev.controlFD); err != nil {
|
|
||||||
// Return an error and do not continue, because the memory used for
|
|
||||||
// the queues should not be released before they were unregistered
|
|
||||||
// from the kernel.
|
|
||||||
return fmt.Errorf("close control file descriptor: %w", err)
|
|
||||||
}
|
|
||||||
dev.controlFD = -1
|
|
||||||
}
|
|
||||||
|
|
||||||
var errs []error
|
|
||||||
|
|
||||||
if dev.receiveQueue != nil {
|
|
||||||
if err := dev.receiveQueue.Close(); err == nil {
|
|
||||||
dev.receiveQueue = nil
|
|
||||||
} else {
|
|
||||||
errs = append(errs, fmt.Errorf("close receive queue: %w", err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if dev.transmitQueue != nil {
|
|
||||||
if err := dev.transmitQueue.Close(); err == nil {
|
|
||||||
dev.transmitQueue = nil
|
|
||||||
} else {
|
|
||||||
errs = append(errs, fmt.Errorf("close transmit queue: %w", err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(errs) == 0 {
|
|
||||||
// Everything was cleaned up. No need to run the finalizer anymore.
|
|
||||||
runtime.SetFinalizer(dev, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
return errors.Join(errs...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ensureInitialized is used as a guard to prevent methods to be called on an
|
|
||||||
// uninitialized instance.
|
|
||||||
func (dev *Device) ensureInitialized() {
|
|
||||||
if !dev.initialized {
|
|
||||||
panic("device is not initialized")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// createQueue creates a new virtqueue and registers it with the vhost device
|
|
||||||
// using the given index.
|
|
||||||
func createQueue(controlFD int, queueIndex int, queueSize int) (*virtqueue.SplitQueue, error) {
|
|
||||||
var (
|
|
||||||
queue *virtqueue.SplitQueue
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
if queue, err = virtqueue.NewSplitQueue(queueSize); err != nil {
|
|
||||||
return nil, fmt.Errorf("create virtqueue: %w", err)
|
|
||||||
}
|
|
||||||
if err = vhost.RegisterQueue(controlFD, uint32(queueIndex), queue); err != nil {
|
|
||||||
return nil, fmt.Errorf("register virtqueue with index %d: %w", queueIndex, err)
|
|
||||||
}
|
|
||||||
return queue, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// truncateBuffers returns a new list of buffers whose combined length matches
|
|
||||||
// exactly the specified length. When the specified length exceeds the length of
|
|
||||||
// the buffers, this is an error. When it is smaller, the buffer list will be
|
|
||||||
// truncated accordingly.
|
|
||||||
func truncateBuffers(buffers [][]byte, length int) (out [][]byte) {
|
|
||||||
for _, buffer := range buffers {
|
|
||||||
if length < len(buffer) {
|
|
||||||
out = append(out, buffer[:length])
|
|
||||||
return
|
|
||||||
}
|
|
||||||
out = append(out, buffer)
|
|
||||||
length -= len(buffer)
|
|
||||||
}
|
|
||||||
if length > 0 {
|
|
||||||
panic("length exceeds the combined length of all buffers")
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user