mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-16 12:57:38 +02:00
make relays take the fast path maybe
This commit is contained in:
@@ -974,6 +974,7 @@ func (hm *HandshakeManager) continueHandshake(via ViaSender, hh *HandshakeHostIn
|
|||||||
nb := make([]byte, 12, 12)
|
nb := make([]byte, 12, 12)
|
||||||
out := make([]byte, mtu)
|
out := make([]byte, mtu)
|
||||||
for _, cp := range hh.packetStore {
|
for _, cp := range hh.packetStore {
|
||||||
|
//todo use a sendbatcher
|
||||||
cp.callback(cp.messageType, cp.messageSubType, hostinfo, cp.packet, nb, out)
|
cp.callback(cp.messageType, cp.messageSubType, hostinfo, cp.packet, nb, out)
|
||||||
}
|
}
|
||||||
f.cachedPacketMetrics.sent.Inc(int64(len(hh.packetStore)))
|
f.cachedPacketMetrics.sent.Inc(int64(len(hh.packetStore)))
|
||||||
|
|||||||
167
inside.go
167
inside.go
@@ -2,6 +2,7 @@ package nebula
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
|
|
||||||
@@ -118,6 +119,33 @@ func (f *Interface) consumeInsidePacket(pkt tio.Packet, fwPacket *firewall.Packe
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *Interface) sendInsideEncrypt(hostinfo *HostInfo, ci *ConnectionState, seg, scratch, nb []byte) []byte {
|
||||||
|
if noiseutil.EncryptLockNeeded {
|
||||||
|
ci.writeLock.Lock()
|
||||||
|
}
|
||||||
|
c := ci.messageCounter.Add(1)
|
||||||
|
|
||||||
|
out := header.Encode(scratch, header.Version, header.Message, 0, hostinfo.remoteIndexId, c)
|
||||||
|
f.connectionManager.Out(hostinfo)
|
||||||
|
|
||||||
|
out, encErr := ci.eKey.EncryptDanger(out, out, seg, c, nb)
|
||||||
|
if noiseutil.EncryptLockNeeded {
|
||||||
|
ci.writeLock.Unlock()
|
||||||
|
}
|
||||||
|
if encErr != nil {
|
||||||
|
hostinfo.logger(f.l).Error("Failed to encrypt outgoing packet",
|
||||||
|
"error", encErr,
|
||||||
|
"udpAddr", hostinfo.remote,
|
||||||
|
"counter", c,
|
||||||
|
)
|
||||||
|
// Skip this segment; the rest of the superpacket can still
|
||||||
|
// go out — TCP will retransmit anything we drop here.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
// sendInsideMessage encrypts a firewall-approved inside packet (or every
|
// sendInsideMessage encrypts a firewall-approved inside packet (or every
|
||||||
// segment of a TSO/USO superpacket) into the caller's batch slot for
|
// segment of a TSO/USO superpacket) into the caller's batch slot for
|
||||||
// later sendmmsg flush. Segmentation is fused with encryption here so the
|
// later sendmmsg flush. Segmentation is fused with encryption here so the
|
||||||
@@ -125,38 +153,13 @@ func (f *Interface) consumeInsidePacket(pkt tio.Packet, fwPacket *firewall.Packe
|
|||||||
// scratch arena: SegmentSuperpacket builds each segment's plaintext in
|
// scratch arena: SegmentSuperpacket builds each segment's plaintext in
|
||||||
// segScratch[:segLen] in turn, and we encrypt directly into a fresh
|
// segScratch[:segLen] in turn, and we encrypt directly into a fresh
|
||||||
// SendBatch slot.
|
// SendBatch slot.
|
||||||
//
|
|
||||||
// When hostinfo.remote is not valid we fall through to the relay slow
|
|
||||||
// path via the unbatched sendNoMetrics so relay behavior is unchanged;
|
|
||||||
// each segment of a superpacket goes through that path independently.
|
|
||||||
// sendInsideMessage takes a borrowed pkt: pkt.Bytes is only valid until the
|
|
||||||
// next Read on the originating tio.Queue. Each segment is encrypted into a
|
|
||||||
// fresh sendBatch slot (Reserve returns owned scratch), so the borrow ends
|
|
||||||
// inside the SegmentSuperpacket callback below. Do not retain pkt or any
|
|
||||||
// seg slice past the callback's return.
|
|
||||||
func (f *Interface) sendInsideMessage(hostinfo *HostInfo, pkt tio.Packet, nb []byte, sendBatch batch.TxBatcher, rejectBuf []byte, q int) {
|
func (f *Interface) sendInsideMessage(hostinfo *HostInfo, pkt tio.Packet, nb []byte, sendBatch batch.TxBatcher, rejectBuf []byte, q int) {
|
||||||
ci := hostinfo.ConnectionState
|
ci := hostinfo.ConnectionState
|
||||||
if ci.eKey == nil {
|
if ci.eKey == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if !hostinfo.remote.IsValid() {
|
ecnEnabled := f.ecnEnabled.Load()
|
||||||
// Slow path: relay fallback. Reuse rejectBuf as the ciphertext
|
|
||||||
// scratch; sendNoMetrics arranges header space for SendVia.
|
|
||||||
// Segment any superpacket so each segment is sized to fit a
|
|
||||||
// single relay encap.
|
|
||||||
err := tio.SegmentSuperpacket(pkt, func(seg []byte) error {
|
|
||||||
f.sendNoMetrics(header.Message, 0, ci, hostinfo, netip.AddrPort{}, seg, nb, rejectBuf, q)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
hostinfo.logger(f.l).Error("Failed to segment superpacket for relay send",
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if hostinfo.lastRebindCount != f.rebindCount {
|
if hostinfo.lastRebindCount != f.rebindCount {
|
||||||
//NOTE: there is an update hole if a tunnel isn't used and exactly 256 rebinds occur before the tunnel is
|
//NOTE: there is an update hole if a tunnel isn't used and exactly 256 rebinds occur before the tunnel is
|
||||||
// finally used again. This tunnel would eventually be torn down and recreated if this action didn't help.
|
// finally used again. This tunnel would eventually be torn down and recreated if this action didn't help.
|
||||||
@@ -169,32 +172,63 @@ func (f *Interface) sendInsideMessage(hostinfo *HostInfo, pkt tio.Packet, nb []b
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ecnEnabled := f.ecnEnabled.Load()
|
if !hostinfo.remote.IsValid() { //the relay path
|
||||||
|
//first, find our relay hostinfo:
|
||||||
|
var relayHostInfo *HostInfo
|
||||||
|
var relay *Relay
|
||||||
|
var err error
|
||||||
|
for _, relayIP := range hostinfo.relayState.CopyRelayIps() {
|
||||||
|
relayHostInfo, relay, err = f.hostMap.QueryVpnAddrsRelayFor(hostinfo.vpnAddrs, relayIP)
|
||||||
|
if err != nil {
|
||||||
|
hostinfo.relayState.DeleteRelay(relayIP)
|
||||||
|
hostinfo.logger(f.l).Info("sendNoMetrics failed to find HostInfo",
|
||||||
|
"relay", relayIP,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if relayHostInfo == nil || relay == nil {
|
||||||
|
//failure already logged
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = tio.SegmentSuperpacket(pkt, func(seg []byte) error {
|
||||||
|
//relay header + header + plaintext + AEAD tag (16 bytes for both AES-GCM and ChaCha20-Poly1305) + relay tag
|
||||||
|
scratch := sendBatch.Reserve(header.Len + header.Len + len(seg) + 16 + 16)
|
||||||
|
|
||||||
|
innerPacket := f.sendInsideEncrypt(hostinfo, ci, seg, scratch[header.Len:], nb)
|
||||||
|
if innerPacket == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//now we need to do a relay-encrypt:
|
||||||
|
toSend, err := f.prepareSendVia(relayHostInfo, relay, innerPacket, nb, scratch, true)
|
||||||
|
if err != nil {
|
||||||
|
//already logged
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var ecn byte
|
||||||
|
if ecnEnabled {
|
||||||
|
ecn = innerECN(seg)
|
||||||
|
}
|
||||||
|
sendBatch.Commit(toSend, relayHostInfo.remote, ecn)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
hostinfo.logger(f.l).Error("Failed to segment superpacket for relay send", "error", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
err := tio.SegmentSuperpacket(pkt, func(seg []byte) error {
|
err := tio.SegmentSuperpacket(pkt, func(seg []byte) error {
|
||||||
// header + plaintext + AEAD tag (16 bytes for both AES-GCM and ChaCha20-Poly1305)
|
// header + plaintext + AEAD tag (16 bytes for both AES-GCM and ChaCha20-Poly1305)
|
||||||
scratch := sendBatch.Reserve(header.Len + len(seg) + 16)
|
scratch := sendBatch.Reserve(header.Len + len(seg) + 16)
|
||||||
|
|
||||||
if noiseutil.EncryptLockNeeded {
|
out := f.sendInsideEncrypt(hostinfo, ci, seg, scratch, nb)
|
||||||
ci.writeLock.Lock()
|
if out == nil {
|
||||||
}
|
|
||||||
c := ci.messageCounter.Add(1)
|
|
||||||
|
|
||||||
out := header.Encode(scratch, header.Version, header.Message, 0, hostinfo.remoteIndexId, c)
|
|
||||||
f.connectionManager.Out(hostinfo)
|
|
||||||
|
|
||||||
out, encErr := ci.eKey.EncryptDanger(out, out, seg, c, nb)
|
|
||||||
if noiseutil.EncryptLockNeeded {
|
|
||||||
ci.writeLock.Unlock()
|
|
||||||
}
|
|
||||||
if encErr != nil {
|
|
||||||
hostinfo.logger(f.l).Error("Failed to encrypt outgoing packet",
|
|
||||||
"error", encErr,
|
|
||||||
"udpAddr", hostinfo.remote,
|
|
||||||
"counter", c,
|
|
||||||
)
|
|
||||||
// Skip this segment; the rest of the superpacket can still
|
|
||||||
// go out — TCP will retransmit anything we drop here.
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -417,21 +451,13 @@ func (f *Interface) sendTo(t header.MessageType, st header.MessageSubType, ci *C
|
|||||||
f.sendNoMetrics(t, st, ci, hostinfo, remote, p, nb, out, 0)
|
f.sendNoMetrics(t, st, ci, hostinfo, remote, p, nb, out, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendVia sends a payload through a Relay tunnel. No authentication or encryption is done
|
func (f *Interface) prepareSendVia(via *HostInfo,
|
||||||
// to the payload for the ultimate target host, making this a useful method for sending
|
|
||||||
// handshake messages to peers through relay tunnels.
|
|
||||||
// via is the HostInfo through which the message is relayed.
|
|
||||||
// ad is the plaintext data to authenticate, but not encrypt
|
|
||||||
// nb is a buffer used to store the nonce value, re-used for performance reasons.
|
|
||||||
// out is a buffer used to store the result of the Encrypt operation
|
|
||||||
// q indicates which writer to use to send the packet.
|
|
||||||
func (f *Interface) SendVia(via *HostInfo,
|
|
||||||
relay *Relay,
|
relay *Relay,
|
||||||
ad,
|
ad,
|
||||||
nb,
|
nb,
|
||||||
out []byte,
|
out []byte,
|
||||||
nocopy bool,
|
nocopy bool,
|
||||||
) {
|
) ([]byte, error) {
|
||||||
if noiseutil.EncryptLockNeeded {
|
if noiseutil.EncryptLockNeeded {
|
||||||
// NOTE: for goboring AESGCMTLS we need to lock because of the nonce check
|
// NOTE: for goboring AESGCMTLS we need to lock because of the nonce check
|
||||||
via.ConnectionState.writeLock.Lock()
|
via.ConnectionState.writeLock.Lock()
|
||||||
@@ -453,7 +479,7 @@ func (f *Interface) SendVia(via *HostInfo,
|
|||||||
"headerLen", len(out),
|
"headerLen", len(out),
|
||||||
"cipherOverhead", via.ConnectionState.eKey.Overhead(),
|
"cipherOverhead", via.ConnectionState.eKey.Overhead(),
|
||||||
)
|
)
|
||||||
return
|
return nil, io.ErrShortBuffer
|
||||||
}
|
}
|
||||||
|
|
||||||
// The header bytes are written to the 'out' slice; Grow the slice to hold the header and associated data payload.
|
// The header bytes are written to the 'out' slice; Grow the slice to hold the header and associated data payload.
|
||||||
@@ -473,13 +499,32 @@ func (f *Interface) SendVia(via *HostInfo,
|
|||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
via.logger(f.l).Info("Failed to EncryptDanger in sendVia", "error", err)
|
via.logger(f.l).Info("Failed to EncryptDanger in sendVia", "error", err)
|
||||||
return
|
return nil, err
|
||||||
}
|
}
|
||||||
err = f.writers[0].WriteTo(out, via.remote)
|
f.connectionManager.RelayUsed(relay.LocalIndex)
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendVia sends a payload through a Relay tunnel. No authentication or encryption is done
|
||||||
|
// to the payload for the ultimate target host, making this a useful method for sending
|
||||||
|
// handshake messages to peers through relay tunnels.
|
||||||
|
// via is the HostInfo through which the message is relayed.
|
||||||
|
// ad is the plaintext data to authenticate, but not encrypt
|
||||||
|
// nb is a buffer used to store the nonce value, re-used for performance reasons.
|
||||||
|
// out is a buffer used to store the result of the Encrypt operation
|
||||||
|
// q indicates which writer to use to send the packet.
|
||||||
|
func (f *Interface) SendVia(via *HostInfo,
|
||||||
|
relay *Relay,
|
||||||
|
ad,
|
||||||
|
nb,
|
||||||
|
out []byte,
|
||||||
|
nocopy bool,
|
||||||
|
) {
|
||||||
|
toSend, err := f.prepareSendVia(via, relay, ad, nb, out, nocopy)
|
||||||
|
err = f.writers[0].WriteTo(toSend, via.remote)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
via.logger(f.l).Info("Failed to WriteTo in sendVia", "error", err)
|
via.logger(f.l).Info("Failed to WriteTo in sendVia", "error", err)
|
||||||
}
|
}
|
||||||
f.connectionManager.RelayUsed(relay.LocalIndex)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Interface) sendNoMetrics(t header.MessageType, st header.MessageSubType, ci *ConnectionState, hostinfo *HostInfo, remote netip.AddrPort, p, nb, out []byte, q int) {
|
func (f *Interface) sendNoMetrics(t header.MessageType, st header.MessageSubType, ci *ConnectionState, hostinfo *HostInfo, remote netip.AddrPort, p, nb, out []byte, q int) {
|
||||||
|
|||||||
Reference in New Issue
Block a user