mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-16 04:47:38 +02:00
fix
This commit is contained in:
@@ -346,8 +346,7 @@ func (f *Interface) listenOut(i int) {
|
|||||||
|
|
||||||
func (f *Interface) listenIn(reader tio.Queue, i int) {
|
func (f *Interface) listenIn(reader tio.Queue, i int) {
|
||||||
rejectBuf := make([]byte, mtu)
|
rejectBuf := make([]byte, mtu)
|
||||||
arenaSize := batch.SendBatchCap * (udp.MTU + 32)
|
sb := batch.NewSendBatch(f.writers[i], batch.SendBatchCap, batch.NewArena(batch.DefaultSendBatchArenaCap))
|
||||||
sb := batch.NewSendBatch(f.writers[i], batch.SendBatchCap, arenaSize)
|
|
||||||
fwPacket := &firewall.Packet{}
|
fwPacket := &firewall.Packet{}
|
||||||
nb := make([]byte, 12, 12)
|
nb := make([]byte, 12, 12)
|
||||||
|
|
||||||
|
|||||||
@@ -1,9 +1,18 @@
|
|||||||
package batch
|
package batch
|
||||||
|
|
||||||
import "net/netip"
|
import (
|
||||||
|
"net/netip"
|
||||||
|
|
||||||
|
"github.com/slackhq/nebula/udp"
|
||||||
|
)
|
||||||
|
|
||||||
const SendBatchCap = 128
|
const SendBatchCap = 128
|
||||||
|
|
||||||
|
// DefaultSendBatchArenaCap is the recommended arena capacity for a
|
||||||
|
// standalone SendBatch: 128 slots × (udp.MTU + 32) ≈ 1.1 MiB. The +32 covers
|
||||||
|
// the nebula header + AEAD tag tacked onto each plaintext segment.
|
||||||
|
const DefaultSendBatchArenaCap = SendBatchCap * (udp.MTU + 32)
|
||||||
|
|
||||||
// batchWriter is the minimal subset of udp.Conn needed by SendBatch to flush.
|
// batchWriter is the minimal subset of udp.Conn needed by SendBatch to flush.
|
||||||
type batchWriter interface {
|
type batchWriter interface {
|
||||||
WriteBatch(bufs [][]byte, addrs []netip.AddrPort, outerECNs []byte) error
|
WriteBatch(bufs [][]byte, addrs []netip.AddrPort, outerECNs []byte) error
|
||||||
@@ -11,38 +20,29 @@ type batchWriter interface {
|
|||||||
|
|
||||||
// SendBatch accumulates encrypted UDP packets and flushes them via WriteBatch.
|
// SendBatch accumulates encrypted UDP packets and flushes them via WriteBatch.
|
||||||
// One SendBatch is owned by each listenIn goroutine; no locking is needed.
|
// One SendBatch is owned by each listenIn goroutine; no locking is needed.
|
||||||
// The backing arena grows on demand: when there isn't room for the next slot
|
// Slot bytes are borrowed from the injected Arena and remain valid until
|
||||||
// we allocate a fresh backing array. Already-committed slices keep referencing
|
// Flush, which Resets the arena.
|
||||||
// the old array and remain valid until Flush drops them.
|
|
||||||
type SendBatch struct {
|
type SendBatch struct {
|
||||||
out batchWriter
|
out batchWriter
|
||||||
bufs [][]byte
|
bufs [][]byte
|
||||||
dsts []netip.AddrPort
|
dsts []netip.AddrPort
|
||||||
ecns []byte
|
ecns []byte
|
||||||
backing []byte
|
arena *Arena
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSendBatch makes a SendBatch with batchCap slots and an arenaSize byte buffer for slices to back those slots
|
// NewSendBatch makes a SendBatch with batchCap slots backed by arena.
|
||||||
func NewSendBatch(out batchWriter, batchCap, arenaSize int) *SendBatch {
|
func NewSendBatch(out batchWriter, batchCap int, arena *Arena) *SendBatch {
|
||||||
return &SendBatch{
|
return &SendBatch{
|
||||||
out: out,
|
out: out,
|
||||||
bufs: make([][]byte, 0, batchCap),
|
bufs: make([][]byte, 0, batchCap),
|
||||||
dsts: make([]netip.AddrPort, 0, batchCap),
|
dsts: make([]netip.AddrPort, 0, batchCap),
|
||||||
ecns: make([]byte, 0, batchCap),
|
ecns: make([]byte, 0, batchCap),
|
||||||
backing: make([]byte, 0, arenaSize),
|
arena: arena,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *SendBatch) Reserve(sz int) []byte {
|
func (b *SendBatch) Reserve(sz int) []byte {
|
||||||
if len(b.backing)+sz > cap(b.backing) {
|
return b.arena.Reserve(sz)
|
||||||
// Grow: allocate a fresh backing. Already-committed slices still
|
|
||||||
// reference the old array and remain valid until Flush drops them.
|
|
||||||
newCap := max(cap(b.backing)*2, sz)
|
|
||||||
b.backing = make([]byte, 0, newCap)
|
|
||||||
}
|
|
||||||
start := len(b.backing)
|
|
||||||
b.backing = b.backing[:start+sz]
|
|
||||||
return b.backing[start : start+sz : start+sz]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *SendBatch) Commit(pkt []byte, dst netip.AddrPort, outerECN byte) {
|
func (b *SendBatch) Commit(pkt []byte, dst netip.AddrPort, outerECN byte) {
|
||||||
@@ -60,6 +60,6 @@ func (b *SendBatch) Flush() error {
|
|||||||
b.bufs = b.bufs[:0]
|
b.bufs = b.bufs[:0]
|
||||||
b.dsts = b.dsts[:0]
|
b.dsts = b.dsts[:0]
|
||||||
b.ecns = b.ecns[:0]
|
b.ecns = b.ecns[:0]
|
||||||
b.backing = b.backing[:0]
|
b.arena.Reset()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ func (w *fakeBatchWriter) WriteBatch(bufs [][]byte, addrs []netip.AddrPort, ecns
|
|||||||
|
|
||||||
func TestSendBatchReserveCommitFlush(t *testing.T) {
|
func TestSendBatchReserveCommitFlush(t *testing.T) {
|
||||||
fw := &fakeBatchWriter{}
|
fw := &fakeBatchWriter{}
|
||||||
b := NewSendBatch(fw, 4, 32)
|
b := NewSendBatch(fw, 4, NewArena(32))
|
||||||
|
|
||||||
ap := netip.MustParseAddrPort("10.0.0.1:4242")
|
ap := netip.MustParseAddrPort("10.0.0.1:4242")
|
||||||
for i := 0; i < 4; i++ {
|
for i := 0; i < 4; i++ {
|
||||||
@@ -71,7 +71,7 @@ func TestSendBatchReserveCommitFlush(t *testing.T) {
|
|||||||
|
|
||||||
func TestSendBatchSlotsDoNotOverlap(t *testing.T) {
|
func TestSendBatchSlotsDoNotOverlap(t *testing.T) {
|
||||||
fw := &fakeBatchWriter{}
|
fw := &fakeBatchWriter{}
|
||||||
b := NewSendBatch(fw, 3, 8)
|
b := NewSendBatch(fw, 3, NewArena(8))
|
||||||
ap := netip.MustParseAddrPort("10.0.0.1:80")
|
ap := netip.MustParseAddrPort("10.0.0.1:80")
|
||||||
|
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
@@ -93,7 +93,7 @@ func TestSendBatchSlotsDoNotOverlap(t *testing.T) {
|
|||||||
func TestSendBatchGrowPreservesCommitted(t *testing.T) {
|
func TestSendBatchGrowPreservesCommitted(t *testing.T) {
|
||||||
fw := &fakeBatchWriter{}
|
fw := &fakeBatchWriter{}
|
||||||
// Tiny initial backing forces a grow on the second Reserve.
|
// Tiny initial backing forces a grow on the second Reserve.
|
||||||
b := NewSendBatch(fw, 1, 4)
|
b := NewSendBatch(fw, 1, NewArena(4))
|
||||||
ap := netip.MustParseAddrPort("10.0.0.1:80")
|
ap := netip.MustParseAddrPort("10.0.0.1:80")
|
||||||
|
|
||||||
s1 := b.Reserve(4)
|
s1 := b.Reserve(4)
|
||||||
|
|||||||
Reference in New Issue
Block a user