This commit is contained in:
JackDoan
2026-04-17 11:39:51 -05:00
parent f8f63c470a
commit bd0a63a545
5 changed files with 124 additions and 8 deletions

View File

@@ -11,7 +11,7 @@ import (
"github.com/slackhq/nebula/routing"
)
func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet, nb, out []byte, q int, localCache firewall.ConntrackCache) {
func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet, nb []byte, batch *sendBatch, rejectBuf []byte, q int, localCache firewall.ConntrackCache) {
err := newPacket(packet, false, fwPacket)
if err != nil {
if f.l.Level >= logrus.DebugLevel {
@@ -53,7 +53,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet
})
if hostinfo == nil {
f.rejectInside(packet, out, q)
f.rejectInside(packet, rejectBuf, q)
if f.l.Level >= logrus.DebugLevel {
f.l.WithField("vpnAddr", fwPacket.RemoteAddr).
WithField("fwPacket", fwPacket).
@@ -68,10 +68,10 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet
dropReason := f.firewall.Drop(*fwPacket, false, hostinfo, f.pki.GetCAPool(), localCache)
if dropReason == nil {
f.sendNoMetrics(header.Message, 0, hostinfo.ConnectionState, hostinfo, netip.AddrPort{}, packet, nb, out, q)
f.sendInsideMessage(hostinfo, packet, nb, batch, rejectBuf, q)
} else {
f.rejectInside(packet, out, q)
f.rejectInside(packet, rejectBuf, q)
if f.l.Level >= logrus.DebugLevel {
hostinfo.logger(f.l).
WithField("fwPacket", fwPacket).
@@ -81,6 +81,63 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet
}
}
// sendInsideMessage encrypts a firewall-approved inside packet into the
// caller's batch slot for later sendmmsg flush. When hostinfo.remote is not
// valid we fall through to the relay slow path via the unbatched sendNoMetrics
// so relay behavior is unchanged.
func (f *Interface) sendInsideMessage(hostinfo *HostInfo, p, nb []byte, batch *sendBatch, rejectBuf []byte, q int) {
ci := hostinfo.ConnectionState
if ci.eKey == nil {
return
}
if !hostinfo.remote.IsValid() {
// Slow path: relay fallback. Reuse rejectBuf as the ciphertext
// scratch; sendNoMetrics arranges header space for SendVia.
f.sendNoMetrics(header.Message, 0, ci, hostinfo, netip.AddrPort{}, p, nb, rejectBuf, q)
return
}
scratch := batch.Next()
if scratch == nil {
// Batch full: bypass batching and send this packet directly so we
// never drop traffic on over-subscribed iterations.
f.sendNoMetrics(header.Message, 0, ci, hostinfo, netip.AddrPort{}, p, nb, rejectBuf, q)
return
}
if noiseutil.EncryptLockNeeded {
ci.writeLock.Lock()
}
c := ci.messageCounter.Add(1)
out := header.Encode(scratch, header.Version, header.Message, 0, hostinfo.remoteIndexId, c)
f.connectionManager.Out(hostinfo)
if hostinfo.lastRebindCount != f.rebindCount {
//NOTE: there is an update hole if a tunnel isn't used and exactly 256 rebinds occur before the tunnel is
// finally used again. This tunnel would eventually be torn down and recreated if this action didn't help.
f.lightHouse.QueryServer(hostinfo.vpnAddrs[0])
hostinfo.lastRebindCount = f.rebindCount
if f.l.Level >= logrus.DebugLevel {
f.l.WithField("vpnAddrs", hostinfo.vpnAddrs).Debug("Lighthouse update triggered for punch due to rebind counter")
}
}
out, err := ci.eKey.EncryptDanger(out, out, p, c, nb)
if noiseutil.EncryptLockNeeded {
ci.writeLock.Unlock()
}
if err != nil {
hostinfo.logger(f.l).WithError(err).
WithField("udpAddr", hostinfo.remote).WithField("counter", c).
Error("Failed to encrypt outgoing packet")
return
}
batch.Commit(len(out), hostinfo.remote)
}
func (f *Interface) rejectInside(packet []byte, out []byte, q int) {
if !f.firewall.InSendReject {
return