mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-16 04:47:38 +02:00
better and batched tun interface
This commit is contained in:
33
overlay/batch/batch.go
Normal file
33
overlay/batch/batch.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package batch
|
||||
|
||||
import "net/netip"
|
||||
|
||||
type RxBatcher interface {
|
||||
// Reserve creates a pkt to borrow
|
||||
Reserve(sz int) []byte
|
||||
// Commit borrows pkt. The caller must keep pkt valid until the next Flush
|
||||
Commit(pkt []byte) error
|
||||
// Flush emits every queued packet in arrival order. Returns the
|
||||
// first error observed; keeps draining so one bad packet doesn't hold up
|
||||
// the rest. After Flush returns, borrowed payload slices may be recycled.
|
||||
Flush() error
|
||||
}
|
||||
|
||||
type TxBatcher interface {
|
||||
// Next returns a zero-length slice with slotCap capacity over the next unused
|
||||
// slot's backing bytes. The caller writes into the returned slice and then
|
||||
// calls Commit with the final length and destination. Next returns nil when
|
||||
// the batch is full.
|
||||
Next() []byte
|
||||
// Commit records the slot just returned by Next as a packet of length n
|
||||
// destined for dst.
|
||||
Commit(n int, dst netip.AddrPort)
|
||||
// Reset clears committed slots; backing storage is retained for reuse.
|
||||
Reset()
|
||||
// Len returns the number of committed packets.
|
||||
Len() int
|
||||
// Cap returns the maximum number of slots in the batch.
|
||||
Cap() int
|
||||
// Get returns the buffers needed to send the batch
|
||||
Get() ([][]byte, []netip.AddrPort)
|
||||
}
|
||||
57
overlay/batch/passthrough.go
Normal file
57
overlay/batch/passthrough.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/slackhq/nebula/udp"
|
||||
)
|
||||
|
||||
// Passthrough is a RxBatcher that doesn't batch anything, it just accumulates and then sends packets.
|
||||
type Passthrough struct {
|
||||
out io.Writer
|
||||
slots [][]byte
|
||||
backing []byte
|
||||
cursor int
|
||||
}
|
||||
|
||||
func NewPassthrough(w io.Writer) *Passthrough {
|
||||
const baseNumSlots = 128
|
||||
return &Passthrough{
|
||||
out: w,
|
||||
slots: make([][]byte, 0, baseNumSlots),
|
||||
backing: make([]byte, 0, baseNumSlots*udp.MTU),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Passthrough) Reserve(sz int) []byte {
|
||||
if len(p.backing)+sz > cap(p.backing) {
|
||||
// Grow: allocate a fresh backing. Already-committed slices still
|
||||
// reference the old array and remain valid until Flush drops them.
|
||||
newCap := max(cap(p.backing)*2, sz)
|
||||
p.backing = make([]byte, 0, newCap)
|
||||
}
|
||||
start := len(p.backing)
|
||||
p.backing = p.backing[:start+sz]
|
||||
return p.backing[start : start+sz : start+sz] //return zero length, sz-cap slice
|
||||
}
|
||||
|
||||
func (p *Passthrough) Commit(pkt []byte) error {
|
||||
p.slots = append(p.slots, pkt)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Passthrough) Flush() error {
|
||||
var firstErr error
|
||||
for _, s := range p.slots {
|
||||
_, err := p.out.Write(s)
|
||||
if err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
for i := range p.slots {
|
||||
p.slots[i] = nil
|
||||
}
|
||||
p.slots = p.slots[:0]
|
||||
p.backing = p.backing[:0]
|
||||
return firstErr
|
||||
}
|
||||
61
overlay/batch/tx_batch.go
Normal file
61
overlay/batch/tx_batch.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package batch
|
||||
|
||||
import "net/netip"
|
||||
|
||||
const SendBatchCap = 128
|
||||
|
||||
// SendBatch accumulates encrypted UDP packets for potential TX offloading.
|
||||
// One SendBatch is owned by each listenIn goroutine; no locking is needed.
|
||||
// The backing storage holds up to batchCap packets of slotCap bytes each;
|
||||
// bufs and dsts are parallel slices of committed slots.
|
||||
type SendBatch struct {
|
||||
bufs [][]byte
|
||||
dsts []netip.AddrPort
|
||||
backing []byte
|
||||
slotCap int
|
||||
batchCap int
|
||||
nextSlot int
|
||||
}
|
||||
|
||||
func NewSendBatch(batchCap, slotCap int) *SendBatch {
|
||||
return &SendBatch{
|
||||
bufs: make([][]byte, 0, batchCap),
|
||||
dsts: make([]netip.AddrPort, 0, batchCap),
|
||||
backing: make([]byte, batchCap*slotCap),
|
||||
slotCap: slotCap,
|
||||
batchCap: batchCap,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *SendBatch) Next() []byte {
|
||||
if b.nextSlot >= b.batchCap {
|
||||
return nil
|
||||
}
|
||||
start := b.nextSlot * b.slotCap
|
||||
return b.backing[start : start : start+b.slotCap] //set len to 0 but cap to slotCap
|
||||
}
|
||||
|
||||
func (b *SendBatch) Commit(n int, dst netip.AddrPort) {
|
||||
start := b.nextSlot * b.slotCap
|
||||
b.bufs = append(b.bufs, b.backing[start:start+n])
|
||||
b.dsts = append(b.dsts, dst)
|
||||
b.nextSlot++
|
||||
}
|
||||
|
||||
func (b *SendBatch) Reset() {
|
||||
b.bufs = b.bufs[:0]
|
||||
b.dsts = b.dsts[:0]
|
||||
b.nextSlot = 0
|
||||
}
|
||||
|
||||
func (b *SendBatch) Len() int {
|
||||
return len(b.bufs)
|
||||
}
|
||||
|
||||
func (b *SendBatch) Cap() int {
|
||||
return b.batchCap
|
||||
}
|
||||
|
||||
func (b *SendBatch) Get() ([][]byte, []netip.AddrPort) {
|
||||
return b.bufs, b.dsts
|
||||
}
|
||||
69
overlay/batch/tx_batch_test.go
Normal file
69
overlay/batch/tx_batch_test.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"net/netip"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSendBatchBookkeeping(t *testing.T) {
|
||||
b := NewSendBatch(4, 32)
|
||||
if b.Len() != 0 || b.Cap() != 4 {
|
||||
t.Fatalf("fresh batch: len=%d cap=%d", b.Len(), b.Cap())
|
||||
}
|
||||
|
||||
ap := netip.MustParseAddrPort("10.0.0.1:4242")
|
||||
for i := 0; i < 4; i++ {
|
||||
slot := b.Next()
|
||||
if slot == nil {
|
||||
t.Fatalf("slot %d: Next returned nil before cap", i)
|
||||
}
|
||||
if cap(slot) != 32 || len(slot) != 0 {
|
||||
t.Fatalf("slot %d: got len=%d cap=%d want len=0 cap=32", i, len(slot), cap(slot))
|
||||
}
|
||||
// Write a marker byte.
|
||||
slot = append(slot, byte(i), byte(i+1), byte(i+2))
|
||||
b.Commit(len(slot), ap)
|
||||
}
|
||||
if b.Next() != nil {
|
||||
t.Fatalf("Next should return nil when full")
|
||||
}
|
||||
if b.Len() != 4 {
|
||||
t.Fatalf("Len=%d want 4", b.Len())
|
||||
}
|
||||
for i, buf := range b.bufs {
|
||||
if len(buf) != 3 || buf[0] != byte(i) {
|
||||
t.Errorf("buf %d: %x", i, buf)
|
||||
}
|
||||
if b.dsts[i] != ap {
|
||||
t.Errorf("dst %d: got %v want %v", i, b.dsts[i], ap)
|
||||
}
|
||||
}
|
||||
|
||||
// Reset returns empty and Next works again.
|
||||
b.Reset()
|
||||
if b.Len() != 0 {
|
||||
t.Fatalf("after Reset Len=%d want 0", b.Len())
|
||||
}
|
||||
slot := b.Next()
|
||||
if slot == nil || cap(slot) != 32 {
|
||||
t.Fatalf("after Reset Next nil or wrong cap: %v cap=%d", slot == nil, cap(slot))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendBatchSlotsDoNotOverlap(t *testing.T) {
|
||||
b := NewSendBatch(3, 8)
|
||||
ap := netip.MustParseAddrPort("10.0.0.1:80")
|
||||
|
||||
// Fill three slots, each with its own sentinel byte.
|
||||
for i := 0; i < 3; i++ {
|
||||
s := b.Next()
|
||||
s = append(s, byte(0xA0+i), byte(0xB0+i))
|
||||
b.Commit(len(s), ap)
|
||||
}
|
||||
|
||||
for i, buf := range b.bufs {
|
||||
if buf[0] != byte(0xA0+i) || buf[1] != byte(0xB0+i) {
|
||||
t.Errorf("slot %d corrupted: %x", i, buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user