mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-16 04:47:38 +02:00
GSO/GRO offloads, with TCP+ECN and UDP support
This commit is contained in:
159
inside.go
159
inside.go
@@ -10,10 +10,23 @@ import (
|
||||
"github.com/slackhq/nebula/iputil"
|
||||
"github.com/slackhq/nebula/noiseutil"
|
||||
"github.com/slackhq/nebula/overlay/batch"
|
||||
"github.com/slackhq/nebula/overlay/tio"
|
||||
"github.com/slackhq/nebula/routing"
|
||||
)
|
||||
|
||||
func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet, nb []byte, sendBatch batch.TxBatcher, rejectBuf []byte, q int, localCache firewall.ConntrackCache) {
|
||||
func (f *Interface) consumeInsidePacket(pkt tio.Packet, fwPacket *firewall.Packet, nb []byte, sendBatch batch.TxBatcher, rejectBuf []byte, q int, localCache firewall.ConntrackCache) {
|
||||
// borrowed: pkt.Bytes is owned by the originating tio.Queue and is
|
||||
// only valid until the next Read on that queue. Every consumer below
|
||||
// (parse, self-forward, handshake cache, sendInsideMessage) reads it
|
||||
// synchronously; do not retain pkt outside this call. If a future
|
||||
// caller needs to keep the packet, use pkt.Clone() to detach it from
|
||||
// the borrow.
|
||||
//
|
||||
// pkt.Bytes is either one IP datagram (GSO zero) or a TSO/USO
|
||||
// superpacket. In both cases the L3+L4 headers at the start describe
|
||||
// the same 5-tuple every segment will share, so a single newPacket /
|
||||
// firewall check covers the whole superpacket.
|
||||
packet := pkt.Bytes
|
||||
err := newPacket(packet, false, fwPacket)
|
||||
if err != nil {
|
||||
if f.l.Enabled(context.Background(), slog.LevelDebug) {
|
||||
@@ -38,7 +51,14 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet
|
||||
// routes packets from the Nebula addr to the Nebula addr through the Nebula
|
||||
// TUN device.
|
||||
if immediatelyForwardToSelf {
|
||||
_, err := f.readers[q].Write(packet)
|
||||
// Write copies into the kernel queue synchronously, so seg's lifetime ends at return.
|
||||
// A self-forwarded superpacket would be re-handed to the
|
||||
// kernel as one giant blob; segment first so the loopback
|
||||
// path sees one IP datagram per Write.
|
||||
err := tio.SegmentSuperpacket(pkt, func(seg []byte) error {
|
||||
_, werr := f.readers[q].Write(seg)
|
||||
return werr
|
||||
})
|
||||
if err != nil {
|
||||
f.l.Error("Failed to forward to tun", "error", err)
|
||||
}
|
||||
@@ -54,7 +74,19 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet
|
||||
}
|
||||
|
||||
hostinfo, ready := f.getOrHandshakeConsiderRouting(fwPacket, func(hh *HandshakeHostInfo) {
|
||||
hh.cachePacket(f.l, header.Message, 0, packet, f.sendMessageNow, f.cachedPacketMetrics)
|
||||
// borrowed: SegmentSuperpacket builds each segment in the kernel-supplied pkt
|
||||
// bytes underneath. cachePacket explicitly copies its argument (handshake_manager.go cachePacket),
|
||||
// so retaining segments past the loop is safe.
|
||||
err := tio.SegmentSuperpacket(pkt, func(seg []byte) error {
|
||||
hh.cachePacket(f.l, header.Message, 0, seg, f.sendMessageNow, f.cachedPacketMetrics)
|
||||
return nil
|
||||
})
|
||||
if err != nil && f.l.Enabled(context.Background(), slog.LevelDebug) {
|
||||
f.l.Debug("Failed to segment superpacket for handshake cache",
|
||||
"error", err,
|
||||
"vpnAddr", fwPacket.RemoteAddr,
|
||||
)
|
||||
}
|
||||
})
|
||||
|
||||
if hostinfo == nil {
|
||||
@@ -74,7 +106,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet
|
||||
|
||||
dropReason := f.firewall.Drop(*fwPacket, false, hostinfo, f.pki.GetCAPool(), localCache)
|
||||
if dropReason == nil {
|
||||
f.sendInsideMessage(hostinfo, packet, nb, sendBatch, rejectBuf, q)
|
||||
f.sendInsideMessage(hostinfo, pkt, nb, sendBatch, rejectBuf, q)
|
||||
} else {
|
||||
f.rejectInside(packet, rejectBuf, q)
|
||||
if f.l.Enabled(context.Background(), slog.LevelDebug) {
|
||||
@@ -86,11 +118,23 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet
|
||||
}
|
||||
}
|
||||
|
||||
// sendInsideMessage encrypts a firewall-approved inside packet into the
|
||||
// caller's batch slot for later sendmmsg flush. When hostinfo.remote is not
|
||||
// valid we fall through to the relay slow path via the unbatched sendNoMetrics
|
||||
// so relay behavior is unchanged.
|
||||
func (f *Interface) sendInsideMessage(hostinfo *HostInfo, p, nb []byte, sendBatch batch.TxBatcher, rejectBuf []byte, q int) {
|
||||
// sendInsideMessage encrypts a firewall-approved inside packet (or every
|
||||
// segment of a TSO/USO superpacket) into the caller's batch slot for
|
||||
// later sendmmsg flush. Segmentation is fused with encryption here so the
|
||||
// kernel-supplied superpacket bytes never get written into a separate
|
||||
// scratch arena: SegmentSuperpacket builds each segment's plaintext in
|
||||
// segScratch[:segLen] in turn, and we encrypt directly into a fresh
|
||||
// SendBatch slot.
|
||||
//
|
||||
// When hostinfo.remote is not valid we fall through to the relay slow
|
||||
// path via the unbatched sendNoMetrics so relay behavior is unchanged;
|
||||
// each segment of a superpacket goes through that path independently.
|
||||
// sendInsideMessage takes a borrowed pkt: pkt.Bytes is only valid until the
|
||||
// next Read on the originating tio.Queue. Each segment is encrypted into a
|
||||
// fresh sendBatch slot (Reserve returns owned scratch), so the borrow ends
|
||||
// inside the SegmentSuperpacket callback below. Do not retain pkt or any
|
||||
// seg slice past the callback's return.
|
||||
func (f *Interface) sendInsideMessage(hostinfo *HostInfo, pkt tio.Packet, nb []byte, sendBatch batch.TxBatcher, rejectBuf []byte, q int) {
|
||||
ci := hostinfo.ConnectionState
|
||||
if ci.eKey == nil {
|
||||
return
|
||||
@@ -99,26 +143,20 @@ func (f *Interface) sendInsideMessage(hostinfo *HostInfo, p, nb []byte, sendBatc
|
||||
if !hostinfo.remote.IsValid() {
|
||||
// Slow path: relay fallback. Reuse rejectBuf as the ciphertext
|
||||
// scratch; sendNoMetrics arranges header space for SendVia.
|
||||
f.sendNoMetrics(header.Message, 0, ci, hostinfo, netip.AddrPort{}, p, nb, rejectBuf, q)
|
||||
// Segment any superpacket so each segment is sized to fit a
|
||||
// single relay encap.
|
||||
err := tio.SegmentSuperpacket(pkt, func(seg []byte) error {
|
||||
f.sendNoMetrics(header.Message, 0, ci, hostinfo, netip.AddrPort{}, seg, nb, rejectBuf, q)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
hostinfo.logger(f.l).Error("Failed to segment superpacket for relay send",
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
scratch := sendBatch.Next()
|
||||
if scratch == nil {
|
||||
// Batch full: bypass batching and send this packet directly so we
|
||||
// never drop traffic on over-subscribed iterations.
|
||||
f.sendNoMetrics(header.Message, 0, ci, hostinfo, netip.AddrPort{}, p, nb, rejectBuf, q)
|
||||
return
|
||||
}
|
||||
|
||||
if noiseutil.EncryptLockNeeded {
|
||||
ci.writeLock.Lock()
|
||||
}
|
||||
c := ci.messageCounter.Add(1)
|
||||
|
||||
out := header.Encode(scratch, header.Version, header.Message, 0, hostinfo.remoteIndexId, c)
|
||||
f.connectionManager.Out(hostinfo)
|
||||
|
||||
if hostinfo.lastRebindCount != f.rebindCount {
|
||||
//NOTE: there is an update hole if a tunnel isn't used and exactly 256 rebinds occur before the tunnel is
|
||||
// finally used again. This tunnel would eventually be torn down and recreated if this action didn't help.
|
||||
@@ -131,20 +169,63 @@ func (f *Interface) sendInsideMessage(hostinfo *HostInfo, p, nb []byte, sendBatc
|
||||
}
|
||||
}
|
||||
|
||||
out, err := ci.eKey.EncryptDanger(out, out, p, c, nb)
|
||||
if noiseutil.EncryptLockNeeded {
|
||||
ci.writeLock.Unlock()
|
||||
}
|
||||
if err != nil {
|
||||
hostinfo.logger(f.l).Error("Failed to encrypt outgoing packet",
|
||||
"error", err,
|
||||
"udpAddr", hostinfo.remote,
|
||||
"counter", c,
|
||||
)
|
||||
return
|
||||
}
|
||||
ecnEnabled := f.ecnEnabled.Load()
|
||||
|
||||
sendBatch.Commit(len(out), hostinfo.remote)
|
||||
err := tio.SegmentSuperpacket(pkt, func(seg []byte) error {
|
||||
// header + plaintext + AEAD tag (16 bytes for both AES-GCM and ChaCha20-Poly1305)
|
||||
scratch := sendBatch.Reserve(header.Len + len(seg) + 16)
|
||||
|
||||
if noiseutil.EncryptLockNeeded {
|
||||
ci.writeLock.Lock()
|
||||
}
|
||||
c := ci.messageCounter.Add(1)
|
||||
|
||||
out := header.Encode(scratch, header.Version, header.Message, 0, hostinfo.remoteIndexId, c)
|
||||
f.connectionManager.Out(hostinfo)
|
||||
|
||||
out, encErr := ci.eKey.EncryptDanger(out, out, seg, c, nb)
|
||||
if noiseutil.EncryptLockNeeded {
|
||||
ci.writeLock.Unlock()
|
||||
}
|
||||
if encErr != nil {
|
||||
hostinfo.logger(f.l).Error("Failed to encrypt outgoing packet",
|
||||
"error", encErr,
|
||||
"udpAddr", hostinfo.remote,
|
||||
"counter", c,
|
||||
)
|
||||
// Skip this segment; the rest of the superpacket can still
|
||||
// go out — TCP will retransmit anything we drop here.
|
||||
return nil
|
||||
}
|
||||
|
||||
var ecn byte
|
||||
if ecnEnabled {
|
||||
ecn = innerECN(seg)
|
||||
}
|
||||
sendBatch.Commit(out, hostinfo.remote, ecn)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
hostinfo.logger(f.l).Error("Failed to segment superpacket for send",
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// innerECN returns the 2-bit IP-level ECN codepoint of an inner IPv4 or IPv6
|
||||
// packet, or 0 if pkt is too short or its IP version is unrecognized. Used at
|
||||
// encap to copy the inner codepoint onto the outer carrier per RFC 6040.
|
||||
func innerECN(pkt []byte) byte {
|
||||
if len(pkt) < 2 {
|
||||
return 0
|
||||
}
|
||||
switch pkt[0] >> 4 {
|
||||
case 4:
|
||||
return pkt[1] & 0x03
|
||||
case 6:
|
||||
return (pkt[1] >> 4) & 0x03
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (f *Interface) rejectInside(packet []byte, out []byte, q int) {
|
||||
|
||||
Reference in New Issue
Block a user