From 0375aff45129017fb271f83deb76dd2eb9ca2632 Mon Sep 17 00:00:00 2001 From: JackDoan Date: Thu, 7 May 2026 11:04:16 -0500 Subject: [PATCH] checkpt, try to parse packets only once --- firewall/packet.go | 8 ++++++++ interface.go | 3 ++- outside.go | 19 ++++++++++++------- overlay/batch/batch.go | 9 ++++++++- overlay/batch/passthrough.go | 7 +++++++ 5 files changed, 37 insertions(+), 9 deletions(-) diff --git a/firewall/packet.go b/firewall/packet.go index 2cbfb5ea..ea7162fe 100644 --- a/firewall/packet.go +++ b/firewall/packet.go @@ -19,6 +19,14 @@ const ( PortFragment = -1 // Special value for matching `port: fragment` ) +type TransportTuple struct { + FirstAddr [16]byte + SecondAddr [16]byte + FirstPort uint16 + SecondPort uint16 + IsV6 bool +} + type Packet struct { LocalAddr netip.Addr RemoteAddr netip.Addr diff --git a/interface.go b/interface.go index 3d7176d2..2bd1fc37 100644 --- a/interface.go +++ b/interface.go @@ -348,11 +348,12 @@ func (f *Interface) listenOut(i int) { lhh := f.lightHouse.NewRequestHandler() h := &header.H{} fwPacket := &firewall.Packet{} + parsedRx := &batch.RxParsed{} nb := make([]byte, 12, 12) listener := func(fromUdpAddr netip.AddrPort, payload []byte, meta udp.RxMeta) { plaintext := f.batchers[i].Reserve(len(payload)) - f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(), meta) + f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintext[:0], payload, h, fwPacket, parsedRx, lhh, nb, i, ctCache.Get(), meta) } flusher := func() { diff --git a/outside.go b/outside.go index 45a405b3..05dc7a13 100644 --- a/outside.go +++ b/outside.go @@ -13,6 +13,7 @@ import ( "github.com/slackhq/nebula/firewall" "github.com/slackhq/nebula/header" + "github.com/slackhq/nebula/overlay/batch" "github.com/slackhq/nebula/udp" "golang.org/x/net/ipv4" ) @@ -23,7 +24,7 @@ const ( var ErrOutOfWindow = errors.New("out of window packet") -func (f *Interface) readOutsidePackets(via ViaSender, out []byte, packet []byte, h *header.H, fwPacket *firewall.Packet, lhf *LightHouseHandler, nb []byte, q int, localCache firewall.ConntrackCache, meta udp.RxMeta) { +func (f *Interface) readOutsidePackets(via ViaSender, out []byte, packet []byte, h *header.H, fwPacket *firewall.Packet, parsedRx *batch.RxParsed, lhf *LightHouseHandler, nb []byte, q int, localCache firewall.ConntrackCache, meta udp.RxMeta) { err := h.Parse(packet) if err != nil { // Hole punch packets are 0 or 1 byte big, so lets ignore printing those errors @@ -136,7 +137,7 @@ func (f *Interface) readOutsidePackets(via ViaSender, out []byte, packet []byte, case header.Message: switch h.Subtype { case header.MessageNone: - f.handleOutsideMessagePacket(hostinfo, out, packet, fwPacket, nb, q, localCache, meta) + f.handleOutsideMessagePacket(hostinfo, out, packet, fwPacket, parsedRx, nb, q, localCache, meta) default: hostinfo.logger(f.l).Error("IsValidSubType was true, but unexpected message subtype seen", "from", via, "header", h) return @@ -169,7 +170,7 @@ func (f *Interface) readOutsidePackets(via ViaSender, out []byte, packet []byte, } } -func (f *Interface) handleOutsideRelayPacket(hostinfo *HostInfo, via ViaSender, out []byte, packet []byte, h *header.H, fwPacket *firewall.Packet, lhf *LightHouseHandler, nb []byte, q int, localCache firewall.ConntrackCache, meta udp.RxMeta) { +func (f *Interface) handleOutsideRelayPacket(hostinfo *HostInfo, via ViaSender, out []byte, packet []byte, h *header.H, fwPacket *firewall.Packet, parsedRx *batch.RxParsed, lhf *LightHouseHandler, nb []byte, q int, localCache firewall.ConntrackCache, meta udp.RxMeta) { // The entire body is sent as AD, not encrypted. // The packet consists of a 16-byte parsed Nebula header, Associated Data-protected payload, and a trailing 16-byte AEAD signature value. // The packet is guaranteed to be at least 16 bytes at this point, b/c it got past the h.Parse() call above. If it's @@ -212,7 +213,8 @@ func (f *Interface) handleOutsideRelayPacket(hostinfo *HostInfo, via ViaSender, relay: relay, IsRelayed: true, } - f.readOutsidePackets(via, out[:0], signedPayload, h, fwPacket, lhf, nb, q, localCache, meta) + f.readOutsidePackets(via, out[:0], signedPayload, h, fwPacket, parsedRx, lhf, nb, q, localCache, meta) + return case ForwardingType: // Find the target HostInfo relay object targetHI, targetRelay, err := f.hostMap.QueryVpnAddrsRelayFor(hostinfo.vpnAddrs, relay.PeerAddr) @@ -560,7 +562,7 @@ func applyOuterECN(pkt []byte, outerECN byte, hostinfo *HostInfo, l *slog.Logger } } -func (f *Interface) handleOutsideMessagePacket(hostinfo *HostInfo, out []byte, packet []byte, fwPacket *firewall.Packet, nb []byte, q int, localCache firewall.ConntrackCache, meta udp.RxMeta) { +func (f *Interface) handleOutsideMessagePacket(hostinfo *HostInfo, out []byte, packet []byte, fwPacket *firewall.Packet, parsedRx *batch.RxParsed, nb []byte, q int, localCache firewall.ConntrackCache, meta udp.RxMeta) { // RFC 6040 normal-mode combine: fold any outer CE mark stamped by the // underlay into the inner header before firewall + TUN write. Other // outer codepoints are advisory only — we keep the inner unchanged. @@ -568,7 +570,10 @@ func (f *Interface) handleOutsideMessagePacket(hostinfo *HostInfo, out []byte, p applyOuterECN(out, meta.OuterECN, hostinfo, f.l) } - err := newPacket(out, true, fwPacket) + // Single IP+L4 walk feeds both the firewall (via fwPacket) and the + // batcher (via parsedRx). Replaces newPacket — the batcher's CommitInbound + // uses parsedRx instead of re-walking the headers. + err := batch.ParseInbound(out, fwPacket, parsedRx) if err != nil { hostinfo.logger(f.l).Warn("Error while validating inbound packet", "error", err, @@ -591,7 +596,7 @@ func (f *Interface) handleOutsideMessagePacket(hostinfo *HostInfo, out []byte, p return } - err = f.batchers[q].Commit(out) + err = f.batchers[q].CommitInbound(out, parsedRx) if err != nil { f.l.Error("Failed to write to tun", "error", err) } diff --git a/overlay/batch/batch.go b/overlay/batch/batch.go index d171d136..ac7102c0 100644 --- a/overlay/batch/batch.go +++ b/overlay/batch/batch.go @@ -5,8 +5,15 @@ import "net/netip" type RxBatcher interface { // Reserve creates a pkt to borrow Reserve(sz int) []byte - // Commit borrows pkt. The caller must keep pkt valid until the next Flush + // Commit borrows pkt. The caller must keep pkt valid until the next Flush. + // Walks IP+L4 headers itself; prefer CommitInbound when the caller already + // has an RxParsed in hand from ParseInbound. Commit(pkt []byte) error + // CommitInbound is Commit with a hint produced by ParseInbound, so the + // batcher can skip the IP+L4 re-parse. Borrowed slice contract is the + // same as Commit. Implementations that don't coalesce may delegate to + // Commit. + CommitInbound(pkt []byte, parsed *RxParsed) error // Flush emits every queued packet in arrival order. Returns the // first error observed; keeps draining so one bad packet doesn't hold up // the rest. After Flush returns, borrowed payload slices may be recycled. diff --git a/overlay/batch/passthrough.go b/overlay/batch/passthrough.go index c7676ccd..adc59f41 100644 --- a/overlay/batch/passthrough.go +++ b/overlay/batch/passthrough.go @@ -40,6 +40,13 @@ func (p *Passthrough) Commit(pkt []byte) error { return nil } +// CommitInbound ignores the hint — Passthrough never coalesces, so there's +// no IP/L4 re-parse to skip. Present so Passthrough satisfies the RxBatcher +// interface alongside MultiCoalescer. +func (p *Passthrough) CommitInbound(pkt []byte, _ *RxParsed) error { + return p.Commit(pkt) +} + func (p *Passthrough) Flush() error { var firstErr error for _, s := range p.slots {