mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-16 04:47:38 +02:00
checkpt, try to parse packets only once
This commit is contained in:
@@ -19,6 +19,14 @@ const (
|
|||||||
PortFragment = -1 // Special value for matching `port: fragment`
|
PortFragment = -1 // Special value for matching `port: fragment`
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type TransportTuple struct {
|
||||||
|
FirstAddr [16]byte
|
||||||
|
SecondAddr [16]byte
|
||||||
|
FirstPort uint16
|
||||||
|
SecondPort uint16
|
||||||
|
IsV6 bool
|
||||||
|
}
|
||||||
|
|
||||||
type Packet struct {
|
type Packet struct {
|
||||||
LocalAddr netip.Addr
|
LocalAddr netip.Addr
|
||||||
RemoteAddr netip.Addr
|
RemoteAddr netip.Addr
|
||||||
|
|||||||
@@ -348,11 +348,12 @@ func (f *Interface) listenOut(i int) {
|
|||||||
lhh := f.lightHouse.NewRequestHandler()
|
lhh := f.lightHouse.NewRequestHandler()
|
||||||
h := &header.H{}
|
h := &header.H{}
|
||||||
fwPacket := &firewall.Packet{}
|
fwPacket := &firewall.Packet{}
|
||||||
|
parsedRx := &batch.RxParsed{}
|
||||||
nb := make([]byte, 12, 12)
|
nb := make([]byte, 12, 12)
|
||||||
|
|
||||||
listener := func(fromUdpAddr netip.AddrPort, payload []byte, meta udp.RxMeta) {
|
listener := func(fromUdpAddr netip.AddrPort, payload []byte, meta udp.RxMeta) {
|
||||||
plaintext := f.batchers[i].Reserve(len(payload))
|
plaintext := f.batchers[i].Reserve(len(payload))
|
||||||
f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(), meta)
|
f.readOutsidePackets(ViaSender{UdpAddr: fromUdpAddr}, plaintext[:0], payload, h, fwPacket, parsedRx, lhh, nb, i, ctCache.Get(), meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
flusher := func() {
|
flusher := func() {
|
||||||
|
|||||||
19
outside.go
19
outside.go
@@ -13,6 +13,7 @@ import (
|
|||||||
|
|
||||||
"github.com/slackhq/nebula/firewall"
|
"github.com/slackhq/nebula/firewall"
|
||||||
"github.com/slackhq/nebula/header"
|
"github.com/slackhq/nebula/header"
|
||||||
|
"github.com/slackhq/nebula/overlay/batch"
|
||||||
"github.com/slackhq/nebula/udp"
|
"github.com/slackhq/nebula/udp"
|
||||||
"golang.org/x/net/ipv4"
|
"golang.org/x/net/ipv4"
|
||||||
)
|
)
|
||||||
@@ -23,7 +24,7 @@ const (
|
|||||||
|
|
||||||
var ErrOutOfWindow = errors.New("out of window packet")
|
var ErrOutOfWindow = errors.New("out of window packet")
|
||||||
|
|
||||||
func (f *Interface) readOutsidePackets(via ViaSender, out []byte, packet []byte, h *header.H, fwPacket *firewall.Packet, lhf *LightHouseHandler, nb []byte, q int, localCache firewall.ConntrackCache, meta udp.RxMeta) {
|
func (f *Interface) readOutsidePackets(via ViaSender, out []byte, packet []byte, h *header.H, fwPacket *firewall.Packet, parsedRx *batch.RxParsed, lhf *LightHouseHandler, nb []byte, q int, localCache firewall.ConntrackCache, meta udp.RxMeta) {
|
||||||
err := h.Parse(packet)
|
err := h.Parse(packet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Hole punch packets are 0 or 1 byte big, so lets ignore printing those errors
|
// Hole punch packets are 0 or 1 byte big, so lets ignore printing those errors
|
||||||
@@ -136,7 +137,7 @@ func (f *Interface) readOutsidePackets(via ViaSender, out []byte, packet []byte,
|
|||||||
case header.Message:
|
case header.Message:
|
||||||
switch h.Subtype {
|
switch h.Subtype {
|
||||||
case header.MessageNone:
|
case header.MessageNone:
|
||||||
f.handleOutsideMessagePacket(hostinfo, out, packet, fwPacket, nb, q, localCache, meta)
|
f.handleOutsideMessagePacket(hostinfo, out, packet, fwPacket, parsedRx, nb, q, localCache, meta)
|
||||||
default:
|
default:
|
||||||
hostinfo.logger(f.l).Error("IsValidSubType was true, but unexpected message subtype seen", "from", via, "header", h)
|
hostinfo.logger(f.l).Error("IsValidSubType was true, but unexpected message subtype seen", "from", via, "header", h)
|
||||||
return
|
return
|
||||||
@@ -169,7 +170,7 @@ func (f *Interface) readOutsidePackets(via ViaSender, out []byte, packet []byte,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Interface) handleOutsideRelayPacket(hostinfo *HostInfo, via ViaSender, out []byte, packet []byte, h *header.H, fwPacket *firewall.Packet, lhf *LightHouseHandler, nb []byte, q int, localCache firewall.ConntrackCache, meta udp.RxMeta) {
|
func (f *Interface) handleOutsideRelayPacket(hostinfo *HostInfo, via ViaSender, out []byte, packet []byte, h *header.H, fwPacket *firewall.Packet, parsedRx *batch.RxParsed, lhf *LightHouseHandler, nb []byte, q int, localCache firewall.ConntrackCache, meta udp.RxMeta) {
|
||||||
// The entire body is sent as AD, not encrypted.
|
// The entire body is sent as AD, not encrypted.
|
||||||
// The packet consists of a 16-byte parsed Nebula header, Associated Data-protected payload, and a trailing 16-byte AEAD signature value.
|
// The packet consists of a 16-byte parsed Nebula header, Associated Data-protected payload, and a trailing 16-byte AEAD signature value.
|
||||||
// The packet is guaranteed to be at least 16 bytes at this point, b/c it got past the h.Parse() call above. If it's
|
// The packet is guaranteed to be at least 16 bytes at this point, b/c it got past the h.Parse() call above. If it's
|
||||||
@@ -212,7 +213,8 @@ func (f *Interface) handleOutsideRelayPacket(hostinfo *HostInfo, via ViaSender,
|
|||||||
relay: relay,
|
relay: relay,
|
||||||
IsRelayed: true,
|
IsRelayed: true,
|
||||||
}
|
}
|
||||||
f.readOutsidePackets(via, out[:0], signedPayload, h, fwPacket, lhf, nb, q, localCache, meta)
|
f.readOutsidePackets(via, out[:0], signedPayload, h, fwPacket, parsedRx, lhf, nb, q, localCache, meta)
|
||||||
|
return
|
||||||
case ForwardingType:
|
case ForwardingType:
|
||||||
// Find the target HostInfo relay object
|
// Find the target HostInfo relay object
|
||||||
targetHI, targetRelay, err := f.hostMap.QueryVpnAddrsRelayFor(hostinfo.vpnAddrs, relay.PeerAddr)
|
targetHI, targetRelay, err := f.hostMap.QueryVpnAddrsRelayFor(hostinfo.vpnAddrs, relay.PeerAddr)
|
||||||
@@ -560,7 +562,7 @@ func applyOuterECN(pkt []byte, outerECN byte, hostinfo *HostInfo, l *slog.Logger
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Interface) handleOutsideMessagePacket(hostinfo *HostInfo, out []byte, packet []byte, fwPacket *firewall.Packet, nb []byte, q int, localCache firewall.ConntrackCache, meta udp.RxMeta) {
|
func (f *Interface) handleOutsideMessagePacket(hostinfo *HostInfo, out []byte, packet []byte, fwPacket *firewall.Packet, parsedRx *batch.RxParsed, nb []byte, q int, localCache firewall.ConntrackCache, meta udp.RxMeta) {
|
||||||
// RFC 6040 normal-mode combine: fold any outer CE mark stamped by the
|
// RFC 6040 normal-mode combine: fold any outer CE mark stamped by the
|
||||||
// underlay into the inner header before firewall + TUN write. Other
|
// underlay into the inner header before firewall + TUN write. Other
|
||||||
// outer codepoints are advisory only — we keep the inner unchanged.
|
// outer codepoints are advisory only — we keep the inner unchanged.
|
||||||
@@ -568,7 +570,10 @@ func (f *Interface) handleOutsideMessagePacket(hostinfo *HostInfo, out []byte, p
|
|||||||
applyOuterECN(out, meta.OuterECN, hostinfo, f.l)
|
applyOuterECN(out, meta.OuterECN, hostinfo, f.l)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := newPacket(out, true, fwPacket)
|
// Single IP+L4 walk feeds both the firewall (via fwPacket) and the
|
||||||
|
// batcher (via parsedRx). Replaces newPacket — the batcher's CommitInbound
|
||||||
|
// uses parsedRx instead of re-walking the headers.
|
||||||
|
err := batch.ParseInbound(out, fwPacket, parsedRx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
hostinfo.logger(f.l).Warn("Error while validating inbound packet",
|
hostinfo.logger(f.l).Warn("Error while validating inbound packet",
|
||||||
"error", err,
|
"error", err,
|
||||||
@@ -591,7 +596,7 @@ func (f *Interface) handleOutsideMessagePacket(hostinfo *HostInfo, out []byte, p
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = f.batchers[q].Commit(out)
|
err = f.batchers[q].CommitInbound(out, parsedRx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.l.Error("Failed to write to tun", "error", err)
|
f.l.Error("Failed to write to tun", "error", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,8 +5,15 @@ import "net/netip"
|
|||||||
type RxBatcher interface {
|
type RxBatcher interface {
|
||||||
// Reserve creates a pkt to borrow
|
// Reserve creates a pkt to borrow
|
||||||
Reserve(sz int) []byte
|
Reserve(sz int) []byte
|
||||||
// Commit borrows pkt. The caller must keep pkt valid until the next Flush
|
// Commit borrows pkt. The caller must keep pkt valid until the next Flush.
|
||||||
|
// Walks IP+L4 headers itself; prefer CommitInbound when the caller already
|
||||||
|
// has an RxParsed in hand from ParseInbound.
|
||||||
Commit(pkt []byte) error
|
Commit(pkt []byte) error
|
||||||
|
// CommitInbound is Commit with a hint produced by ParseInbound, so the
|
||||||
|
// batcher can skip the IP+L4 re-parse. Borrowed slice contract is the
|
||||||
|
// same as Commit. Implementations that don't coalesce may delegate to
|
||||||
|
// Commit.
|
||||||
|
CommitInbound(pkt []byte, parsed *RxParsed) error
|
||||||
// Flush emits every queued packet in arrival order. Returns the
|
// Flush emits every queued packet in arrival order. Returns the
|
||||||
// first error observed; keeps draining so one bad packet doesn't hold up
|
// first error observed; keeps draining so one bad packet doesn't hold up
|
||||||
// the rest. After Flush returns, borrowed payload slices may be recycled.
|
// the rest. After Flush returns, borrowed payload slices may be recycled.
|
||||||
|
|||||||
@@ -40,6 +40,13 @@ func (p *Passthrough) Commit(pkt []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CommitInbound ignores the hint — Passthrough never coalesces, so there's
|
||||||
|
// no IP/L4 re-parse to skip. Present so Passthrough satisfies the RxBatcher
|
||||||
|
// interface alongside MultiCoalescer.
|
||||||
|
func (p *Passthrough) CommitInbound(pkt []byte, _ *RxParsed) error {
|
||||||
|
return p.Commit(pkt)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Passthrough) Flush() error {
|
func (p *Passthrough) Flush() error {
|
||||||
var firstErr error
|
var firstErr error
|
||||||
for _, s := range p.slots {
|
for _, s := range p.slots {
|
||||||
|
|||||||
Reference in New Issue
Block a user