This commit is contained in:
JackDoan
2025-11-08 12:10:29 -06:00
parent e0f93c9d4b
commit edff19a05b

View File

@@ -47,6 +47,7 @@ type SplitQueue struct {
// offerMutex is used to synchronize calls to
// [SplitQueue.OfferDescriptorChain].
offerMutex sync.Mutex
pageSize int
}
// NewSplitQueue allocates a new [SplitQueue] in memory. The given queue size
@@ -58,7 +59,8 @@ func NewSplitQueue(queueSize int) (_ *SplitQueue, err error) {
}
sq := SplitQueue{
size: queueSize,
size: queueSize,
pageSize: os.Getpagesize(),
}
// Clean up a partially initialized queue when something fails.
@@ -267,9 +269,10 @@ func (sq *SplitQueue) consumeUsedRing(ctx context.Context) error {
func (sq *SplitQueue) OfferDescriptorChain(outBuffers [][]byte, numInBuffers int, waitFree bool) (uint16, error) {
sq.ensureInitialized()
// TODO change this
// Each descriptor can only hold a whole memory page, so split large out
// buffers into multiple smaller ones.
outBuffers = splitBuffers(outBuffers, os.Getpagesize())
outBuffers = splitBuffers(outBuffers, sq.pageSize)
// Synchronize the offering of descriptor chains. While the descriptor table
// and available ring are synchronized on their own as well, this does not
@@ -411,7 +414,17 @@ func align(index, alignment int) int {
// splitBuffers processes a list of buffers and splits each buffer that is
// larger than the size limit into multiple smaller buffers.
// If none of the buffers are too big though, do nothing, to avoid allocation for now
func splitBuffers(buffers [][]byte, sizeLimit int) [][]byte {
for i := range buffers {
if len(buffers[i]) > sizeLimit {
return reallySplitBuffers(buffers, sizeLimit)
}
}
return buffers
}
func reallySplitBuffers(buffers [][]byte, sizeLimit int) [][]byte {
result := make([][]byte, 0, len(buffers))
for _, buffer := range buffers {
for added := 0; added < len(buffer); added += sizeLimit {