diff --git a/inside.go b/inside.go index de23713e..e8f0590b 100644 --- a/inside.go +++ b/inside.go @@ -11,11 +11,11 @@ import ( "github.com/slackhq/nebula/iputil" "github.com/slackhq/nebula/noiseutil" "github.com/slackhq/nebula/overlay/batch" - "github.com/slackhq/nebula/overlay/tio" "github.com/slackhq/nebula/routing" + "github.com/slackhq/nebula/wire" ) -func (f *Interface) consumeInsidePacket(pkt tio.Packet, fwPacket *firewall.Packet, nb []byte, sendBatch batch.TxBatcher, rejectBuf []byte, q int, localCache firewall.ConntrackCache) { +func (f *Interface) consumeInsidePacket(pkt wire.TunPacket, fwPacket *firewall.Packet, nb []byte, sendBatch *batch.SendBatch, rejectBuf []byte, q int, localCache firewall.ConntrackCache) { // borrowed: pkt.Bytes is owned by the originating tio.Queue and is // only valid until the next Read on that queue. If you must keep // the packet, use pkt.Clone() to detach it @@ -44,7 +44,7 @@ func (f *Interface) consumeInsidePacket(pkt tio.Packet, fwPacket *firewall.Packe // routes packets from the Nebula addr to the Nebula addr through the Nebula // TUN device. if immediatelyForwardToSelf { - err := tio.SegmentSuperpacket(pkt, func(seg []byte) error { + err := pkt.PerSegment(func(seg []byte) error { _, werr := f.readers[q].Write(seg) return werr }) @@ -66,7 +66,7 @@ func (f *Interface) consumeInsidePacket(pkt tio.Packet, fwPacket *firewall.Packe // borrowed: SegmentSuperpacket builds each segment in the kernel-supplied pkt // bytes underneath. cachePacket explicitly copies its argument (handshake_manager.go cachePacket), // so retaining segments past the loop is safe. - err := tio.SegmentSuperpacket(pkt, func(seg []byte) error { + err := pkt.PerSegment(func(seg []byte) error { hh.cachePacket(f.l, header.Message, 0, seg, f.sendMessageNow, f.cachedPacketMetrics) return nil }) @@ -138,10 +138,10 @@ func (f *Interface) sendInsideEncrypt(hostinfo *HostInfo, ci *ConnectionState, s // segment of a TSO/USO superpacket) into the caller's batch slot for // later sendmmsg flush. Segmentation is fused with encryption here so the // kernel-supplied superpacket bytes never get written into a separate -// scratch arena: SegmentSuperpacket builds each segment's plaintext in +// scratch arena: PerSegment builds each segment's plaintext in // segScratch[:segLen] in turn, and we encrypt directly into a fresh // SendBatch slot. -func (f *Interface) sendInsideMessage(hostinfo *HostInfo, pkt tio.Packet, nb []byte, sendBatch batch.TxBatcher) { +func (f *Interface) sendInsideMessage(hostinfo *HostInfo, pkt wire.TunPacket, nb []byte, sendBatch *batch.SendBatch) { ci := hostinfo.ConnectionState if ci.eKey == nil { return @@ -181,7 +181,7 @@ func (f *Interface) sendInsideMessage(hostinfo *HostInfo, pkt tio.Packet, nb []b return } - err = tio.SegmentSuperpacket(pkt, func(seg []byte) error { + err = pkt.PerSegment(func(seg []byte) error { //relay header + header + plaintext + AEAD tag (16 bytes for both AES-GCM and ChaCha20-Poly1305) + relay tag scratch := sendBatch.Reserve(header.Len + header.Len + len(seg) + 16 + 16) @@ -206,7 +206,7 @@ func (f *Interface) sendInsideMessage(hostinfo *HostInfo, pkt tio.Packet, nb []b return } - err := tio.SegmentSuperpacket(pkt, func(seg []byte) error { + err := pkt.PerSegment(func(seg []byte) error { // header + plaintext + AEAD tag (16 bytes for both AES-GCM and ChaCha20-Poly1305) scratch := sendBatch.Reserve(header.Len + len(seg) + 16) diff --git a/interface.go b/interface.go index 1b9f59ff..329c78e1 100644 --- a/interface.go +++ b/interface.go @@ -12,6 +12,7 @@ import ( "github.com/gaissmai/bart" "github.com/rcrowley/go-metrics" + "github.com/slackhq/nebula/util" "github.com/slackhq/nebula/wire" "github.com/slackhq/nebula/config" @@ -264,7 +265,7 @@ func (f *Interface) activate() error { } f.readers = f.inside.Readers() for i := range f.readers { - arena := batch.NewArena(max(f.batchSize, 1) * udp.MTU) + arena := util.NewArena(max(f.batchSize, 1) * udp.MTU) f.batchers[i] = batch.NewPassthrough(f.readers[i], f.batchSize, arena) } @@ -329,7 +330,7 @@ func (f *Interface) listenOut(i int) { listener := func(fromUdpAddr netip.AddrPort, payload []byte, meta udp.RxMeta) { plaintext := f.batchers[i].Reserve(len(payload)) - f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(), meta) + f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get()) } flusher := func() { @@ -352,10 +353,9 @@ func (f *Interface) listenIn(reader tio.Queue, q int) { packetMem := make([]byte, mtu+16) //MTU + some leading slack space for platforms that return "bonus info" // TODO get the amount of bonus info from the reader packets := make([]wire.TunPacket, 1) - out := make([]byte, mtu) rejectBuf := make([]byte, mtu) arenaSize := batch.SendBatchCap * (udp.MTU + 32) - sb := batch.NewSendBatch(f.writers[q], batch.SendBatchCap, batch.NewArena(arenaSize)) + sb := batch.NewSendBatch(f.writers[q], batch.SendBatchCap, util.NewArena(arenaSize)) fwPacket := &firewall.Packet{} nb := make([]byte, 12, 12) @@ -372,7 +372,7 @@ func (f *Interface) listenIn(reader tio.Queue, q int) { } ctCache := conntrackCache.Get() - for i := range n{ + for i := range n { f.consumeInsidePacket(packets[i], fwPacket, nb, sb, rejectBuf, q, ctCache) } if err := sb.Flush(); err != nil { diff --git a/outside.go b/outside.go index cf079fd7..68111fe2 100644 --- a/outside.go +++ b/outside.go @@ -13,7 +13,6 @@ import ( "github.com/slackhq/nebula/firewall" "github.com/slackhq/nebula/header" - "github.com/slackhq/nebula/udp" "golang.org/x/net/ipv4" ) @@ -23,7 +22,7 @@ const ( var ErrOutOfWindow = errors.New("out of window packet") -func (f *Interface) readOutsidePackets(via ViaSender, out []byte, packet []byte, h *header.H, fwPacket *firewall.Packet, lhf *LightHouseHandler, nb []byte, q int, localCache firewall.ConntrackCache, meta udp.RxMeta) { +func (f *Interface) readOutsidePackets(via ViaSender, out []byte, packet []byte, h *header.H, fwPacket *firewall.Packet, lhf *LightHouseHandler, nb []byte, q int, localCache firewall.ConntrackCache) { err := h.Parse(packet) if err != nil { // Hole punch packets are 0 or 1 byte big, so lets ignore printing those errors @@ -111,7 +110,7 @@ func (f *Interface) readOutsidePackets(via ViaSender, out []byte, packet []byte, // Relay packets are special if isMessageRelay { - f.handleOutsideRelayPacket(hostinfo, via, out, packet, h, fwPacket, lhf, nb, q, localCache, meta) + f.handleOutsideRelayPacket(hostinfo, via, out, packet, h, fwPacket, lhf, nb, q, localCache) return } @@ -135,7 +134,7 @@ func (f *Interface) readOutsidePackets(via ViaSender, out []byte, packet []byte, case header.Message: switch h.Subtype { case header.MessageNone: - f.handleOutsideMessagePacket(hostinfo, out, packet, fwPacket, nb, q, localCache, meta) + f.handleOutsideMessagePacket(hostinfo, out, packet, fwPacket, nb, q, localCache) default: hostinfo.logger(f.l).Error("IsValidSubType was true, but unexpected message subtype seen", "from", via, "header", h) return @@ -168,7 +167,7 @@ func (f *Interface) readOutsidePackets(via ViaSender, out []byte, packet []byte, } } -func (f *Interface) handleOutsideRelayPacket(hostinfo *HostInfo, via ViaSender, out []byte, packet []byte, h *header.H, fwPacket *firewall.Packet, lhf *LightHouseHandler, nb []byte, q int, localCache firewall.ConntrackCache, meta udp.RxMeta) { +func (f *Interface) handleOutsideRelayPacket(hostinfo *HostInfo, via ViaSender, out []byte, packet []byte, h *header.H, fwPacket *firewall.Packet, lhf *LightHouseHandler, nb []byte, q int, localCache firewall.ConntrackCache) { // The entire body is sent as AD, not encrypted. // The packet consists of a 16-byte parsed Nebula header, Associated Data-protected payload, and a trailing 16-byte AEAD signature value. // The packet is guaranteed to be at least 16 bytes at this point, b/c it got past the h.Parse() call above. If it's @@ -211,7 +210,7 @@ func (f *Interface) handleOutsideRelayPacket(hostinfo *HostInfo, via ViaSender, relay: relay, IsRelayed: true, } - f.readOutsidePackets(via, out[:0], signedPayload, h, fwPacket, lhf, nb, q, localCache, meta) + f.readOutsidePackets(via, out[:0], signedPayload, h, fwPacket, lhf, nb, q, localCache) case ForwardingType: // Find the target HostInfo relay object targetHI, targetRelay, err := f.hostMap.QueryVpnAddrsRelayFor(hostinfo.vpnAddrs, relay.PeerAddr) @@ -512,7 +511,7 @@ func (f *Interface) decrypt(hostinfo *HostInfo, mc uint64, out []byte, packet [] return out, nil } -func (f *Interface) handleOutsideMessagePacket(hostinfo *HostInfo, out []byte, packet []byte, fwPacket *firewall.Packet, nb []byte, q int, localCache firewall.ConntrackCache, meta udp.RxMeta) { +func (f *Interface) handleOutsideMessagePacket(hostinfo *HostInfo, out []byte, packet []byte, fwPacket *firewall.Packet, nb []byte, q int, localCache firewall.ConntrackCache) { err := newPacket(out, true, fwPacket) if err != nil { hostinfo.logger(f.l).Warn("Error while validating inbound packet", diff --git a/overlay/batch/batch.go b/overlay/batch/batch.go deleted file mode 100644 index d171d136..00000000 --- a/overlay/batch/batch.go +++ /dev/null @@ -1,28 +0,0 @@ -package batch - -import "net/netip" - -type RxBatcher interface { - // Reserve creates a pkt to borrow - Reserve(sz int) []byte - // Commit borrows pkt. The caller must keep pkt valid until the next Flush - Commit(pkt []byte) error - // Flush emits every queued packet in arrival order. Returns the - // first error observed; keeps draining so one bad packet doesn't hold up - // the rest. After Flush returns, borrowed payload slices may be recycled. - Flush() error -} - -type TxBatcher interface { - // Reserve creates a pkt to borrow - Reserve(sz int) []byte - // Commit borrows pkt and records its destination plus the 2-bit - // IP-level ECN codepoint to set on the outer (carrier) header. The - // caller must keep pkt valid until the next Flush. Pass 0 (Not-ECT) - // to leave the outer ECN field unset. - Commit(pkt []byte, dst netip.AddrPort, outerECN byte) - // Flush emits every queued packet via the underlying batch writer in - // arrival order. Returns an errors.Join of one or more errors. After Flush returns, - // borrowed payload slices may be recycled. - Flush() error -} diff --git a/overlay/batch/passthrough.go b/overlay/batch/passthrough.go index d2c99fe0..7ea77b83 100644 --- a/overlay/batch/passthrough.go +++ b/overlay/batch/passthrough.go @@ -3,24 +3,18 @@ package batch import ( "io" - "github.com/slackhq/nebula/udp" + "github.com/slackhq/nebula/util" ) // Passthrough is a RxBatcher that doesn't batch anything, it just accumulates and then sends packets. type Passthrough struct { out io.Writer slots [][]byte - arena *Arena + arena *util.Arena cursor int } -const passthroughBaseNumSlots = 128 - -// DefaultPassthroughArenaCap is the recommended arena capacity for a -// standalone Passthrough batcher: 128 slots × udp.MTU ≈ 1.1 MiB. -const DefaultPassthroughArenaCap = passthroughBaseNumSlots * udp.MTU - -func NewPassthrough(w io.Writer, slots int, arena *Arena) *Passthrough { +func NewPassthrough(w io.Writer, slots int, arena *util.Arena) *Passthrough { return &Passthrough{ out: w, slots: make([][]byte, 0, slots), diff --git a/overlay/batch/rx_batch.go b/overlay/batch/rx_batch.go new file mode 100644 index 00000000..a58a39f0 --- /dev/null +++ b/overlay/batch/rx_batch.go @@ -0,0 +1,12 @@ +package batch + +type RxBatcher interface { + // Reserve creates a pkt to borrow + Reserve(sz int) []byte + // Commit borrows pkt. The caller must keep pkt valid until the next Flush + Commit(pkt []byte) error + // Flush emits every queued packet in arrival order. Returns the + // first error observed; keeps draining so one bad packet doesn't hold up + // the rest. After Flush returns, borrowed payload slices may be recycled. + Flush() error +} diff --git a/overlay/batch/tx_batch.go b/overlay/batch/tx_batch.go index 599bc306..dcf928b9 100644 --- a/overlay/batch/tx_batch.go +++ b/overlay/batch/tx_batch.go @@ -3,16 +3,11 @@ package batch import ( "net/netip" - "github.com/slackhq/nebula/udp" + "github.com/slackhq/nebula/util" ) const SendBatchCap = 128 -// DefaultSendBatchArenaCap is the recommended arena capacity for a -// standalone SendBatch: 128 slots × (udp.MTU + 32) ≈ 1.1 MiB. The +32 covers -// the nebula header + AEAD tag tacked onto each plaintext segment. -const DefaultSendBatchArenaCap = SendBatchCap * (udp.MTU + 32) - // batchWriter is the minimal subset of udp.Conn needed by SendBatch to flush. type batchWriter interface { WriteBatch(bufs [][]byte, addrs []netip.AddrPort, outerECNs []byte) error @@ -27,11 +22,11 @@ type SendBatch struct { bufs [][]byte dsts []netip.AddrPort ecns []byte - arena *Arena + arena *util.Arena } // NewSendBatch makes a SendBatch with batchCap slots backed by arena. -func NewSendBatch(out batchWriter, batchCap int, arena *Arena) *SendBatch { +func NewSendBatch(out batchWriter, batchCap int, arena *util.Arena) *SendBatch { return &SendBatch{ out: out, bufs: make([][]byte, 0, batchCap), diff --git a/overlay/batch/tx_batch_test.go b/overlay/batch/tx_batch_test.go index 59b06e58..f8e8bd51 100644 --- a/overlay/batch/tx_batch_test.go +++ b/overlay/batch/tx_batch_test.go @@ -3,6 +3,8 @@ package batch import ( "net/netip" "testing" + + "github.com/slackhq/nebula/util" ) type fakeBatchWriter struct { @@ -27,7 +29,7 @@ func (w *fakeBatchWriter) WriteBatch(bufs [][]byte, addrs []netip.AddrPort, ecns func TestSendBatchReserveCommitFlush(t *testing.T) { fw := &fakeBatchWriter{} - b := NewSendBatch(fw, 4, NewArena(32)) + b := NewSendBatch(fw, 4, util.NewArena(32)) ap := netip.MustParseAddrPort("10.0.0.1:4242") for i := 0; i < 4; i++ { @@ -71,7 +73,7 @@ func TestSendBatchReserveCommitFlush(t *testing.T) { func TestSendBatchSlotsDoNotOverlap(t *testing.T) { fw := &fakeBatchWriter{} - b := NewSendBatch(fw, 3, NewArena(8)) + b := NewSendBatch(fw, 3, util.NewArena(8)) ap := netip.MustParseAddrPort("10.0.0.1:80") for i := 0; i < 3; i++ { @@ -93,7 +95,7 @@ func TestSendBatchSlotsDoNotOverlap(t *testing.T) { func TestSendBatchGrowPreservesCommitted(t *testing.T) { fw := &fakeBatchWriter{} // Tiny initial backing forces a grow on the second Reserve. - b := NewSendBatch(fw, 1, NewArena(4)) + b := NewSendBatch(fw, 1, util.NewArena(4)) ap := netip.MustParseAddrPort("10.0.0.1:80") s1 := b.Reserve(4) diff --git a/overlay/tio/segment.go b/overlay/tio/segment.go deleted file mode 100644 index 67648ad2..00000000 --- a/overlay/tio/segment.go +++ /dev/null @@ -1,12 +0,0 @@ -package tio - -import "fmt" - -// SegmentSuperpacket invokes fn once per segment of pkt. -// This is a stub implementation that does not actually support segmentation -func SegmentSuperpacket(pkt Packet, fn func(seg []byte) error) error { - if pkt.GSO.IsSuperpacket() { - return fmt.Errorf("tio: GSO superpacket on platform without segmentation support") - } - return fn(pkt.Bytes) -} diff --git a/wire/wire.go b/wire/wire.go index acc20dd8..9f580fd8 100644 --- a/wire/wire.go +++ b/wire/wire.go @@ -9,3 +9,9 @@ type TunPacket struct { // Fields in Meta should be as portable/platform-agnostic as possible. Meta struct{} } + +// PerSegment invokes fn once per segment of pkt. +// This is a stub implementation that does not actually support segmentation +func (t *TunPacket) PerSegment(fn func(seg []byte) error) error { + return fn(t.Bytes) +}