diff --git a/overlay/eventfd/eventfd.go b/overlay/eventfd/eventfd.go new file mode 100644 index 0000000..cf3dd0d --- /dev/null +++ b/overlay/eventfd/eventfd.go @@ -0,0 +1,91 @@ +package eventfd + +import ( + "encoding/binary" + "syscall" + + "golang.org/x/sys/unix" +) + +type EventFD struct { + fd int + buf [8]byte +} + +func New() (EventFD, error) { + fd, err := unix.Eventfd(0, unix.EFD_NONBLOCK) + if err != nil { + return EventFD{}, err + } + return EventFD{ + fd: fd, + buf: [8]byte{}, + }, nil +} + +func (e *EventFD) Kick() error { + binary.LittleEndian.PutUint64(e.buf[:], 1) //is this right??? + _, err := syscall.Write(int(e.fd), e.buf[:]) + return err +} + +func (e *EventFD) Close() error { + if e.fd != 0 { + return unix.Close(e.fd) + } + return nil +} + +func (e *EventFD) FD() int { + return e.fd +} + +type Epoll struct { + fd int + buf [8]byte + events []syscall.EpollEvent +} + +func NewEpoll() (Epoll, error) { + fd, err := unix.EpollCreate1(0) + if err != nil { + return Epoll{}, err + } + return Epoll{ + fd: fd, + buf: [8]byte{}, + events: make([]syscall.EpollEvent, 1), + }, nil +} + +func (ep *Epoll) AddEvent(fdToAdd int) error { + event := syscall.EpollEvent{ + Events: syscall.EPOLLIN, + Fd: int32(fdToAdd), + } + return syscall.EpollCtl(ep.fd, syscall.EPOLL_CTL_ADD, fdToAdd, &event) +} + +func (ep *Epoll) Block() (int, error) { + n, err := syscall.EpollWait(ep.fd, ep.events, -1) + if err != nil { + //goland:noinspection GoDirectComparisonOfErrors + if err == syscall.EINTR { + return 0, nil //?? + } + return -1, err + } + return n, nil +} + +func (ep *Epoll) Clear() error { + _, err := syscall.Read(int(ep.events[0].Fd), ep.buf[:]) + return err +} + +func (ep *Epoll) Close() error { + if ep.fd != 0 { + return unix.Close(ep.fd) + } + return nil +} diff --git a/overlay/virtqueue/split_virtqueue.go b/overlay/virtqueue/split_virtqueue.go index ffcf78a..a88dff9 100644 --- a/overlay/virtqueue/split_virtqueue.go +++ b/overlay/virtqueue/split_virtqueue.go @@ -7,8 +7,8 @@ import ( "os" "sync" + "github.com/slackhq/nebula/overlay/eventfd" "golang.org/x/sys/unix" - "gvisor.dev/gvisor/pkg/eventfd" ) // SplitQueue is a virtqueue that consists of several parts, where each part is @@ -25,10 +25,10 @@ type SplitQueue struct { // kickEventFD is used to signal the device when descriptor chains were // added to the available ring. - kickEventFD eventfd.Eventfd + kickEventFD eventfd.EventFD // callEventFD is used by the device to signal when it has used descriptor // chains and put them in the used ring. - callEventFD eventfd.Eventfd + callEventFD eventfd.EventFD // usedChains is a chanel that receives [UsedElement]s for descriptor chains // that were used by the device. @@ -115,11 +115,11 @@ func NewSplitQueue(queueSize int) (_ *SplitQueue, err error) { sq.availableRing = newAvailableRing(queueSize, sq.buf[availableRingStart:availableRingEnd]) sq.usedRing = newUsedRing(queueSize, sq.buf[usedRingStart:usedRingEnd]) - sq.kickEventFD, err = eventfd.Create() + sq.kickEventFD, err = eventfd.New() if err != nil { return nil, fmt.Errorf("create kick event file descriptor: %w", err) } - sq.callEventFD, err = eventfd.Create() + sq.callEventFD, err = eventfd.New() if err != nil { return nil, fmt.Errorf("create call event file descriptor: %w", err) } @@ -198,8 +198,18 @@ func (sq *SplitQueue) UsedDescriptorChains() chan UsedElement { func (sq *SplitQueue) startConsumeUsedRing() func() error { ctx, cancel := context.WithCancel(context.Background()) done := make(chan error) + + ep, err := eventfd.NewEpoll() + if err != nil { + panic(err) + } + err = ep.AddEvent(sq.callEventFD.FD()) + if err != nil { + panic(err) + } + go func() { - done <- sq.consumeUsedRing(ctx) + done <- sq.consumeUsedRing(ctx, &ep) }() return func() error { cancel() @@ -208,7 +218,7 @@ func (sq *SplitQueue) startConsumeUsedRing() func() error { // descriptor, so it will never notice the context being canceled. // To resolve this, we can just produce a fake-signal ourselves to wake // it up. - if err := sq.callEventFD.Notify(); err != nil { + if err := sq.callEventFD.Kick(); err != nil { return fmt.Errorf("wake up goroutine: %w", err) } @@ -227,16 +237,23 @@ func (sq *SplitQueue) startConsumeUsedRing() func() error { // consumeUsedRing runs in a goroutine, waits for the device to signal that it // has used descriptor chains and puts all new [UsedElement]s into the channel // for them. -func (sq *SplitQueue) consumeUsedRing(ctx context.Context) error { +func (sq *SplitQueue) consumeUsedRing(ctx context.Context, epoll *eventfd.Epoll) error { + var n int + var err error for ctx.Err() == nil { + // Wait for a signal from the device. - if err := sq.callEventFD.Wait(); err != nil { + if n, err = epoll.Block(); err != nil { return fmt.Errorf("wait: %w", err) } - // Process all new used elements. - for _, usedElement := range sq.usedRing.take() { - sq.usedChains <- usedElement + if n > 0 { + _ = epoll.Clear() //??? + + // Process all new used elements. + for _, usedElement := range sq.usedRing.take() { + sq.usedChains <- usedElement + } } } @@ -321,7 +338,7 @@ func (sq *SplitQueue) OfferDescriptorChain(outBuffers [][]byte, numInBuffers int sq.availableRing.offer([]uint16{head}) // Notify the device to make it process the updated available ring. - if err := sq.kickEventFD.Notify(); err != nil { + if err := sq.kickEventFD.Kick(); err != nil { return head, fmt.Errorf("notify device: %w", err) }