not sure if switching to this epoll actually helped

This commit is contained in:
JackDoan
2025-11-08 13:47:37 -06:00
parent 987f45baf0
commit 1f043f84f3
2 changed files with 121 additions and 13 deletions

View File

@@ -0,0 +1,91 @@
package eventfd
import (
"encoding/binary"
"syscall"
"golang.org/x/sys/unix"
)
type EventFD struct {
fd int
buf [8]byte
}
func New() (EventFD, error) {
fd, err := unix.Eventfd(0, unix.EFD_NONBLOCK)
if err != nil {
return EventFD{}, err
}
return EventFD{
fd: fd,
buf: [8]byte{},
}, nil
}
func (e *EventFD) Kick() error {
binary.LittleEndian.PutUint64(e.buf[:], 1) //is this right???
_, err := syscall.Write(int(e.fd), e.buf[:])
return err
}
func (e *EventFD) Close() error {
if e.fd != 0 {
return unix.Close(e.fd)
}
return nil
}
func (e *EventFD) FD() int {
return e.fd
}
type Epoll struct {
fd int
buf [8]byte
events []syscall.EpollEvent
}
func NewEpoll() (Epoll, error) {
fd, err := unix.EpollCreate1(0)
if err != nil {
return Epoll{}, err
}
return Epoll{
fd: fd,
buf: [8]byte{},
events: make([]syscall.EpollEvent, 1),
}, nil
}
func (ep *Epoll) AddEvent(fdToAdd int) error {
event := syscall.EpollEvent{
Events: syscall.EPOLLIN,
Fd: int32(fdToAdd),
}
return syscall.EpollCtl(ep.fd, syscall.EPOLL_CTL_ADD, fdToAdd, &event)
}
func (ep *Epoll) Block() (int, error) {
n, err := syscall.EpollWait(ep.fd, ep.events, -1)
if err != nil {
//goland:noinspection GoDirectComparisonOfErrors
if err == syscall.EINTR {
return 0, nil //??
}
return -1, err
}
return n, nil
}
func (ep *Epoll) Clear() error {
_, err := syscall.Read(int(ep.events[0].Fd), ep.buf[:])
return err
}
func (ep *Epoll) Close() error {
if ep.fd != 0 {
return unix.Close(ep.fd)
}
return nil
}

View File

@@ -7,8 +7,8 @@ import (
"os" "os"
"sync" "sync"
"github.com/slackhq/nebula/overlay/eventfd"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"gvisor.dev/gvisor/pkg/eventfd"
) )
// SplitQueue is a virtqueue that consists of several parts, where each part is // SplitQueue is a virtqueue that consists of several parts, where each part is
@@ -25,10 +25,10 @@ type SplitQueue struct {
// kickEventFD is used to signal the device when descriptor chains were // kickEventFD is used to signal the device when descriptor chains were
// added to the available ring. // added to the available ring.
kickEventFD eventfd.Eventfd kickEventFD eventfd.EventFD
// callEventFD is used by the device to signal when it has used descriptor // callEventFD is used by the device to signal when it has used descriptor
// chains and put them in the used ring. // chains and put them in the used ring.
callEventFD eventfd.Eventfd callEventFD eventfd.EventFD
// usedChains is a chanel that receives [UsedElement]s for descriptor chains // usedChains is a chanel that receives [UsedElement]s for descriptor chains
// that were used by the device. // that were used by the device.
@@ -115,11 +115,11 @@ func NewSplitQueue(queueSize int) (_ *SplitQueue, err error) {
sq.availableRing = newAvailableRing(queueSize, sq.buf[availableRingStart:availableRingEnd]) sq.availableRing = newAvailableRing(queueSize, sq.buf[availableRingStart:availableRingEnd])
sq.usedRing = newUsedRing(queueSize, sq.buf[usedRingStart:usedRingEnd]) sq.usedRing = newUsedRing(queueSize, sq.buf[usedRingStart:usedRingEnd])
sq.kickEventFD, err = eventfd.Create() sq.kickEventFD, err = eventfd.New()
if err != nil { if err != nil {
return nil, fmt.Errorf("create kick event file descriptor: %w", err) return nil, fmt.Errorf("create kick event file descriptor: %w", err)
} }
sq.callEventFD, err = eventfd.Create() sq.callEventFD, err = eventfd.New()
if err != nil { if err != nil {
return nil, fmt.Errorf("create call event file descriptor: %w", err) return nil, fmt.Errorf("create call event file descriptor: %w", err)
} }
@@ -198,8 +198,18 @@ func (sq *SplitQueue) UsedDescriptorChains() chan UsedElement {
func (sq *SplitQueue) startConsumeUsedRing() func() error { func (sq *SplitQueue) startConsumeUsedRing() func() error {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
done := make(chan error) done := make(chan error)
ep, err := eventfd.NewEpoll()
if err != nil {
panic(err)
}
err = ep.AddEvent(sq.callEventFD.FD())
if err != nil {
panic(err)
}
go func() { go func() {
done <- sq.consumeUsedRing(ctx) done <- sq.consumeUsedRing(ctx, &ep)
}() }()
return func() error { return func() error {
cancel() cancel()
@@ -208,7 +218,7 @@ func (sq *SplitQueue) startConsumeUsedRing() func() error {
// descriptor, so it will never notice the context being canceled. // descriptor, so it will never notice the context being canceled.
// To resolve this, we can just produce a fake-signal ourselves to wake // To resolve this, we can just produce a fake-signal ourselves to wake
// it up. // it up.
if err := sq.callEventFD.Notify(); err != nil { if err := sq.callEventFD.Kick(); err != nil {
return fmt.Errorf("wake up goroutine: %w", err) return fmt.Errorf("wake up goroutine: %w", err)
} }
@@ -227,18 +237,25 @@ func (sq *SplitQueue) startConsumeUsedRing() func() error {
// consumeUsedRing runs in a goroutine, waits for the device to signal that it // consumeUsedRing runs in a goroutine, waits for the device to signal that it
// has used descriptor chains and puts all new [UsedElement]s into the channel // has used descriptor chains and puts all new [UsedElement]s into the channel
// for them. // for them.
func (sq *SplitQueue) consumeUsedRing(ctx context.Context) error { func (sq *SplitQueue) consumeUsedRing(ctx context.Context, epoll *eventfd.Epoll) error {
var n int
var err error
for ctx.Err() == nil { for ctx.Err() == nil {
// Wait for a signal from the device. // Wait for a signal from the device.
if err := sq.callEventFD.Wait(); err != nil { if n, err = epoll.Block(); err != nil {
return fmt.Errorf("wait: %w", err) return fmt.Errorf("wait: %w", err)
} }
if n > 0 {
_ = epoll.Clear() //???
// Process all new used elements. // Process all new used elements.
for _, usedElement := range sq.usedRing.take() { for _, usedElement := range sq.usedRing.take() {
sq.usedChains <- usedElement sq.usedChains <- usedElement
} }
} }
}
return nil return nil
} }
@@ -321,7 +338,7 @@ func (sq *SplitQueue) OfferDescriptorChain(outBuffers [][]byte, numInBuffers int
sq.availableRing.offer([]uint16{head}) sq.availableRing.offer([]uint16{head})
// Notify the device to make it process the updated available ring. // Notify the device to make it process the updated available ring.
if err := sq.kickEventFD.Notify(); err != nil { if err := sq.kickEventFD.Kick(); err != nil {
return head, fmt.Errorf("notify device: %w", err) return head, fmt.Errorf("notify device: %w", err)
} }