mirror of
https://github.com/slackhq/nebula.git
synced 2025-11-23 17:04:25 +01:00
Compare commits
8 Commits
master
...
interface-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db11e2f1af | ||
|
|
2ee428b067 | ||
|
|
e9657d571e | ||
|
|
3cebf38504 | ||
|
|
ae3ee42469 | ||
|
|
fa034a6d83 | ||
|
|
55d72ac46f | ||
|
|
2c931d5691 |
47
control.go
47
control.go
@@ -1,6 +1,8 @@
|
|||||||
package nebula
|
package nebula
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
@@ -8,6 +10,7 @@ import (
|
|||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/slackhq/nebula/cert"
|
"github.com/slackhq/nebula/cert"
|
||||||
|
"golang.org/x/net/ipv4"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Every interaction here needs to take extra care to copy memory and not return or use arguments "as is" when touching
|
// Every interaction here needs to take extra care to copy memory and not return or use arguments "as is" when touching
|
||||||
@@ -167,3 +170,47 @@ func copyHostInfo(h *HostInfo) ControlHostInfo {
|
|||||||
|
|
||||||
return chi
|
return chi
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Hook provides the ability to hook into the network path for a particular
|
||||||
|
// message sub type. Any received message of that subtype that is allowed by
|
||||||
|
// the firewall will be written to the provided write func instead of the
|
||||||
|
// inside interface.
|
||||||
|
// TODO: make this an io.Writer
|
||||||
|
func (c *Control) Hook(t NebulaMessageSubType, w func([]byte) error) error {
|
||||||
|
if t == 0 {
|
||||||
|
return fmt.Errorf("non-default message subtype must be specified")
|
||||||
|
}
|
||||||
|
if _, ok := c.f.handlers[Version][message][t]; ok {
|
||||||
|
return fmt.Errorf("message subtype %d already hooked", t)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.f.handlers[Version][message][t] = c.f.newHook(w)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send provides the ability to send arbitrary message packets to peer nodes.
|
||||||
|
// The provided payload will be encapsulated in a Nebula Firewall packet
|
||||||
|
// (IPv4 plus ports) from the node IP to the provided destination nebula IP.
|
||||||
|
// Any protocol handling above layer 3 (IP) must be managed by the caller.
|
||||||
|
func (c *Control) Send(ip uint32, port uint16, st NebulaMessageSubType, payload []byte) {
|
||||||
|
headerLen := ipv4.HeaderLen + minFwPacketLen
|
||||||
|
length := headerLen + len(payload)
|
||||||
|
packet := make([]byte, length)
|
||||||
|
packet[0] = 0x45 // IPv4 HL=20
|
||||||
|
packet[9] = 114 // Declare as arbitrary 0-hop protocol
|
||||||
|
binary.BigEndian.PutUint16(packet[2:4], uint16(length))
|
||||||
|
binary.BigEndian.PutUint32(packet[12:16], ip2int(c.f.inside.CidrNet().IP.To4()))
|
||||||
|
binary.BigEndian.PutUint32(packet[16:20], ip)
|
||||||
|
|
||||||
|
// Set identical values for src and dst port as they're only
|
||||||
|
// used for nebula firewall rule/conntrack matching.
|
||||||
|
binary.BigEndian.PutUint16(packet[20:22], port)
|
||||||
|
binary.BigEndian.PutUint16(packet[22:24], port)
|
||||||
|
|
||||||
|
copy(packet[headerLen:], payload)
|
||||||
|
|
||||||
|
fp := &FirewallPacket{}
|
||||||
|
nb := make([]byte, 12)
|
||||||
|
out := make([]byte, mtu)
|
||||||
|
c.f.consumeInsidePacket(st, packet, fp, nb, out)
|
||||||
|
}
|
||||||
|
|||||||
83
handler.go
Normal file
83
handler.go
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
package nebula
|
||||||
|
|
||||||
|
func (f *Interface) newHook(w func([]byte) error) InsideHandler {
|
||||||
|
fn := func(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||||
|
f.decryptTo(w, hostInfo, header.MessageCounter, out, packet, fwPacket, nb)
|
||||||
|
}
|
||||||
|
return f.encrypted(fn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Interface) encrypted(h InsideHandler) InsideHandler {
|
||||||
|
return func(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||||
|
if !f.handleEncrypted(ci, addr, header) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h(hostInfo, ci, addr, header, out, packet, fwPacket, nb)
|
||||||
|
|
||||||
|
f.handleHostRoaming(hostInfo, addr)
|
||||||
|
f.connectionManager.In(hostInfo.hostId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Interface) rxMetrics(h InsideHandler) InsideHandler {
|
||||||
|
return func(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||||
|
f.messageMetrics.Rx(header.Type, header.Subtype, 1)
|
||||||
|
h(hostInfo, ci, addr, header, out, packet, fwPacket, nb)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Interface) handleMessagePacket(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||||
|
f.decryptTo(f.inside.WriteRaw, hostInfo, header.MessageCounter, out, packet, fwPacket, nb)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Interface) handleLighthousePacket(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||||
|
d, err := f.decrypt(hostInfo, header.MessageCounter, out, packet, header, nb)
|
||||||
|
if err != nil {
|
||||||
|
hostInfo.logger().WithError(err).WithField("udpAddr", addr).
|
||||||
|
WithField("packet", packet).
|
||||||
|
Error("Failed to decrypt lighthouse packet")
|
||||||
|
|
||||||
|
//TODO: maybe after build 64 is out? 06/14/2018 - NB
|
||||||
|
//f.sendRecvError(net.Addr(addr), header.RemoteIndex)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
f.lightHouse.HandleRequest(addr, hostInfo.hostId, d, hostInfo.GetCert(), f)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Interface) handleTestPacket(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||||
|
d, err := f.decrypt(hostInfo, header.MessageCounter, out, packet, header, nb)
|
||||||
|
if err != nil {
|
||||||
|
hostInfo.logger().WithError(err).WithField("udpAddr", addr).
|
||||||
|
WithField("packet", packet).
|
||||||
|
Error("Failed to decrypt test packet")
|
||||||
|
|
||||||
|
//TODO: maybe after build 64 is out? 06/14/2018 - NB
|
||||||
|
//f.sendRecvError(net.Addr(addr), header.RemoteIndex)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if header.Subtype == testRequest {
|
||||||
|
// This testRequest might be from TryPromoteBest, so we should roam
|
||||||
|
// to the new IP address before responding
|
||||||
|
f.handleHostRoaming(hostInfo, addr)
|
||||||
|
f.send(test, testReply, ci, hostInfo, hostInfo.remote, d, nb, out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Interface) handleHandshakePacket(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||||
|
HandleIncomingHandshake(f, addr, packet, header, hostInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Interface) handleRecvErrorPacket(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||||
|
// TODO: Remove this with recv_error deprecation
|
||||||
|
f.handleRecvError(addr, header)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Interface) handleCloseTunnelPacket(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||||
|
hostInfo.logger().WithField("udpAddr", addr).
|
||||||
|
Info("Close tunnel received, tearing down.")
|
||||||
|
|
||||||
|
f.closeTunnel(hostInfo)
|
||||||
|
}
|
||||||
@@ -54,6 +54,7 @@ var typeMap = map[NebulaMessageType]string{
|
|||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
subTypeNone NebulaMessageSubType = 0
|
||||||
testRequest NebulaMessageSubType = 0
|
testRequest NebulaMessageSubType = 0
|
||||||
testReply NebulaMessageSubType = 1
|
testReply NebulaMessageSubType = 1
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *FirewallPacket, nb, out []byte) {
|
func (f *Interface) consumeInsidePacket(st NebulaMessageSubType, packet []byte, fwPacket *FirewallPacket, nb, out []byte) {
|
||||||
err := newPacket(packet, false, fwPacket)
|
err := newPacket(packet, false, fwPacket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.WithField("packet", packet).Debugf("Error while validating outbound packet: %s", err)
|
l.WithField("packet", packet).Debugf("Error while validating outbound packet: %s", err)
|
||||||
@@ -45,7 +45,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *FirewallPacket,
|
|||||||
// the packet queue.
|
// the packet queue.
|
||||||
ci.queueLock.Lock()
|
ci.queueLock.Lock()
|
||||||
if !ci.ready {
|
if !ci.ready {
|
||||||
hostinfo.cachePacket(message, 0, packet, f.sendMessageNow)
|
hostinfo.cachePacket(message, st, packet, f.sendMessageNow)
|
||||||
ci.queueLock.Unlock()
|
ci.queueLock.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -54,7 +54,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *FirewallPacket,
|
|||||||
|
|
||||||
dropReason := f.firewall.Drop(packet, *fwPacket, false, hostinfo, trustedCAs)
|
dropReason := f.firewall.Drop(packet, *fwPacket, false, hostinfo, trustedCAs)
|
||||||
if dropReason == nil {
|
if dropReason == nil {
|
||||||
mc := f.sendNoMetrics(message, 0, ci, hostinfo, hostinfo.remote, packet, nb, out)
|
mc := f.sendNoMetrics(message, st, ci, hostinfo, hostinfo.remote, packet, nb, out)
|
||||||
if f.lightHouse != nil && mc%5000 == 0 {
|
if f.lightHouse != nil && mc%5000 == 0 {
|
||||||
f.lightHouse.Query(fwPacket.RemoteIP, f)
|
f.lightHouse.Query(fwPacket.RemoteIP, f)
|
||||||
}
|
}
|
||||||
|
|||||||
30
interface.go
30
interface.go
@@ -20,6 +20,8 @@ type Inside interface {
|
|||||||
WriteRaw([]byte) error
|
WriteRaw([]byte) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type InsideHandler func(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fp *FirewallPacket, nb []byte)
|
||||||
|
|
||||||
type InterfaceConfig struct {
|
type InterfaceConfig struct {
|
||||||
HostMap *HostMap
|
HostMap *HostMap
|
||||||
Outside *udpConn
|
Outside *udpConn
|
||||||
@@ -61,6 +63,9 @@ type Interface struct {
|
|||||||
tunQueues int
|
tunQueues int
|
||||||
version string
|
version string
|
||||||
|
|
||||||
|
// handlers are mapped by protocol version -> type -> subtype
|
||||||
|
handlers map[uint8]map[NebulaMessageType]map[NebulaMessageSubType]InsideHandler
|
||||||
|
|
||||||
metricHandshakes metrics.Histogram
|
metricHandshakes metrics.Histogram
|
||||||
messageMetrics *MessageMetrics
|
messageMetrics *MessageMetrics
|
||||||
}
|
}
|
||||||
@@ -103,6 +108,29 @@ func NewInterface(c *InterfaceConfig) (*Interface, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ifce.connectionManager = newConnectionManager(ifce, c.checkInterval, c.pendingDeletionInterval)
|
ifce.connectionManager = newConnectionManager(ifce, c.checkInterval, c.pendingDeletionInterval)
|
||||||
|
ifce.handlers = map[uint8]map[NebulaMessageType]map[NebulaMessageSubType]InsideHandler{
|
||||||
|
Version: {
|
||||||
|
handshake: {
|
||||||
|
handshakeIXPSK0: ifce.rxMetrics(ifce.handleHandshakePacket),
|
||||||
|
},
|
||||||
|
message: {
|
||||||
|
subTypeNone: ifce.encrypted(ifce.handleMessagePacket),
|
||||||
|
},
|
||||||
|
recvError: {
|
||||||
|
subTypeNone: ifce.rxMetrics(ifce.handleRecvErrorPacket),
|
||||||
|
},
|
||||||
|
lightHouse: {
|
||||||
|
subTypeNone: ifce.rxMetrics(ifce.encrypted(ifce.handleLighthousePacket)),
|
||||||
|
},
|
||||||
|
test: {
|
||||||
|
testRequest: ifce.rxMetrics(ifce.encrypted(ifce.handleTestPacket)),
|
||||||
|
testReply: ifce.rxMetrics(ifce.encrypted(ifce.handleTestPacket)),
|
||||||
|
},
|
||||||
|
closeTunnel: {
|
||||||
|
subTypeNone: ifce.rxMetrics(ifce.encrypted(ifce.handleCloseTunnelPacket)),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
return ifce, nil
|
return ifce, nil
|
||||||
}
|
}
|
||||||
@@ -168,7 +196,7 @@ func (f *Interface) listenIn(i int) {
|
|||||||
os.Exit(2)
|
os.Exit(2)
|
||||||
}
|
}
|
||||||
|
|
||||||
f.consumeInsidePacket(packet[:n], fwPacket, nb, out)
|
f.consumeInsidePacket(subTypeNone, packet[:n], fwPacket, nb, out)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
93
outside.go
93
outside.go
@@ -39,98 +39,15 @@ func (f *Interface) readOutsidePackets(addr *udpAddr, out []byte, packet []byte,
|
|||||||
ci = hostinfo.ConnectionState
|
ci = hostinfo.ConnectionState
|
||||||
}
|
}
|
||||||
|
|
||||||
switch header.Type {
|
handle := f.handlers[header.Version][header.Type][header.Subtype]
|
||||||
case message:
|
|
||||||
if !f.handleEncrypted(ci, addr, header) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
f.decryptToTun(hostinfo, header.MessageCounter, out, packet, fwPacket, nb)
|
if handle == nil {
|
||||||
|
|
||||||
// Fallthrough to the bottom to record incoming traffic
|
|
||||||
|
|
||||||
case lightHouse:
|
|
||||||
f.messageMetrics.Rx(header.Type, header.Subtype, 1)
|
|
||||||
if !f.handleEncrypted(ci, addr, header) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
d, err := f.decrypt(hostinfo, header.MessageCounter, out, packet, header, nb)
|
|
||||||
if err != nil {
|
|
||||||
hostinfo.logger().WithError(err).WithField("udpAddr", addr).
|
|
||||||
WithField("packet", packet).
|
|
||||||
Error("Failed to decrypt lighthouse packet")
|
|
||||||
|
|
||||||
//TODO: maybe after build 64 is out? 06/14/2018 - NB
|
|
||||||
//f.sendRecvError(net.Addr(addr), header.RemoteIndex)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
f.lightHouse.HandleRequest(addr, hostinfo.hostId, d, hostinfo.GetCert(), f)
|
|
||||||
|
|
||||||
// Fallthrough to the bottom to record incoming traffic
|
|
||||||
|
|
||||||
case test:
|
|
||||||
f.messageMetrics.Rx(header.Type, header.Subtype, 1)
|
|
||||||
if !f.handleEncrypted(ci, addr, header) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
d, err := f.decrypt(hostinfo, header.MessageCounter, out, packet, header, nb)
|
|
||||||
if err != nil {
|
|
||||||
hostinfo.logger().WithError(err).WithField("udpAddr", addr).
|
|
||||||
WithField("packet", packet).
|
|
||||||
Error("Failed to decrypt test packet")
|
|
||||||
|
|
||||||
//TODO: maybe after build 64 is out? 06/14/2018 - NB
|
|
||||||
//f.sendRecvError(net.Addr(addr), header.RemoteIndex)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if header.Subtype == testRequest {
|
|
||||||
// This testRequest might be from TryPromoteBest, so we should roam
|
|
||||||
// to the new IP address before responding
|
|
||||||
f.handleHostRoaming(hostinfo, addr)
|
|
||||||
f.send(test, testReply, ci, hostinfo, hostinfo.remote, d, nb, out)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fallthrough to the bottom to record incoming traffic
|
|
||||||
|
|
||||||
// Non encrypted messages below here, they should not fall through to avoid tracking incoming traffic since they
|
|
||||||
// are unauthenticated
|
|
||||||
|
|
||||||
case handshake:
|
|
||||||
f.messageMetrics.Rx(header.Type, header.Subtype, 1)
|
|
||||||
HandleIncomingHandshake(f, addr, packet, header, hostinfo)
|
|
||||||
return
|
|
||||||
|
|
||||||
case recvError:
|
|
||||||
f.messageMetrics.Rx(header.Type, header.Subtype, 1)
|
|
||||||
// TODO: Remove this with recv_error deprecation
|
|
||||||
f.handleRecvError(addr, header)
|
|
||||||
return
|
|
||||||
|
|
||||||
case closeTunnel:
|
|
||||||
f.messageMetrics.Rx(header.Type, header.Subtype, 1)
|
|
||||||
if !f.handleEncrypted(ci, addr, header) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
hostinfo.logger().WithField("udpAddr", addr).
|
|
||||||
Info("Close tunnel received, tearing down.")
|
|
||||||
|
|
||||||
f.closeTunnel(hostinfo)
|
|
||||||
return
|
|
||||||
|
|
||||||
default:
|
|
||||||
f.messageMetrics.Rx(header.Type, header.Subtype, 1)
|
f.messageMetrics.Rx(header.Type, header.Subtype, 1)
|
||||||
hostinfo.logger().Debugf("Unexpected packet received from %s", addr)
|
hostinfo.logger().Debugf("Unexpected packet received from %s", addr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
f.handleHostRoaming(hostinfo, addr)
|
handle(hostinfo, ci, addr, header, out, packet, fwPacket, nb)
|
||||||
|
|
||||||
f.connectionManager.In(hostinfo.hostId)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Interface) closeTunnel(hostInfo *HostInfo) {
|
func (f *Interface) closeTunnel(hostInfo *HostInfo) {
|
||||||
@@ -258,7 +175,7 @@ func (f *Interface) decrypt(hostinfo *HostInfo, mc uint64, out []byte, packet []
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Interface) decryptToTun(hostinfo *HostInfo, messageCounter uint64, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
func (f *Interface) decryptTo(write func([]byte) error, hostinfo *HostInfo, messageCounter uint64, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
out, err = hostinfo.ConnectionState.dKey.DecryptDanger(out, packet[:HeaderLen], packet[HeaderLen:], messageCounter, nb)
|
out, err = hostinfo.ConnectionState.dKey.DecryptDanger(out, packet[:HeaderLen], packet[HeaderLen:], messageCounter, nb)
|
||||||
@@ -293,7 +210,7 @@ func (f *Interface) decryptToTun(hostinfo *HostInfo, messageCounter uint64, out
|
|||||||
}
|
}
|
||||||
|
|
||||||
f.connectionManager.In(hostinfo.hostId)
|
f.connectionManager.In(hostinfo.hostId)
|
||||||
err = f.inside.WriteRaw(out)
|
err = write(out)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.WithError(err).Error("Failed to write to tun")
|
l.WithError(err).Error("Failed to write to tun")
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user