mirror of
https://github.com/slackhq/nebula.git
synced 2025-11-22 08:24:25 +01:00
Compare commits
8 Commits
channels-s
...
interface-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db11e2f1af | ||
|
|
2ee428b067 | ||
|
|
e9657d571e | ||
|
|
3cebf38504 | ||
|
|
ae3ee42469 | ||
|
|
fa034a6d83 | ||
|
|
55d72ac46f | ||
|
|
2c931d5691 |
47
control.go
47
control.go
@@ -1,6 +1,8 @@
|
||||
package nebula
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
@@ -8,6 +10,7 @@ import (
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/slackhq/nebula/cert"
|
||||
"golang.org/x/net/ipv4"
|
||||
)
|
||||
|
||||
// Every interaction here needs to take extra care to copy memory and not return or use arguments "as is" when touching
|
||||
@@ -167,3 +170,47 @@ func copyHostInfo(h *HostInfo) ControlHostInfo {
|
||||
|
||||
return chi
|
||||
}
|
||||
|
||||
// Hook provides the ability to hook into the network path for a particular
|
||||
// message sub type. Any received message of that subtype that is allowed by
|
||||
// the firewall will be written to the provided write func instead of the
|
||||
// inside interface.
|
||||
// TODO: make this an io.Writer
|
||||
func (c *Control) Hook(t NebulaMessageSubType, w func([]byte) error) error {
|
||||
if t == 0 {
|
||||
return fmt.Errorf("non-default message subtype must be specified")
|
||||
}
|
||||
if _, ok := c.f.handlers[Version][message][t]; ok {
|
||||
return fmt.Errorf("message subtype %d already hooked", t)
|
||||
}
|
||||
|
||||
c.f.handlers[Version][message][t] = c.f.newHook(w)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send provides the ability to send arbitrary message packets to peer nodes.
|
||||
// The provided payload will be encapsulated in a Nebula Firewall packet
|
||||
// (IPv4 plus ports) from the node IP to the provided destination nebula IP.
|
||||
// Any protocol handling above layer 3 (IP) must be managed by the caller.
|
||||
func (c *Control) Send(ip uint32, port uint16, st NebulaMessageSubType, payload []byte) {
|
||||
headerLen := ipv4.HeaderLen + minFwPacketLen
|
||||
length := headerLen + len(payload)
|
||||
packet := make([]byte, length)
|
||||
packet[0] = 0x45 // IPv4 HL=20
|
||||
packet[9] = 114 // Declare as arbitrary 0-hop protocol
|
||||
binary.BigEndian.PutUint16(packet[2:4], uint16(length))
|
||||
binary.BigEndian.PutUint32(packet[12:16], ip2int(c.f.inside.CidrNet().IP.To4()))
|
||||
binary.BigEndian.PutUint32(packet[16:20], ip)
|
||||
|
||||
// Set identical values for src and dst port as they're only
|
||||
// used for nebula firewall rule/conntrack matching.
|
||||
binary.BigEndian.PutUint16(packet[20:22], port)
|
||||
binary.BigEndian.PutUint16(packet[22:24], port)
|
||||
|
||||
copy(packet[headerLen:], payload)
|
||||
|
||||
fp := &FirewallPacket{}
|
||||
nb := make([]byte, 12)
|
||||
out := make([]byte, mtu)
|
||||
c.f.consumeInsidePacket(st, packet, fp, nb, out)
|
||||
}
|
||||
|
||||
83
handler.go
Normal file
83
handler.go
Normal file
@@ -0,0 +1,83 @@
|
||||
package nebula
|
||||
|
||||
func (f *Interface) newHook(w func([]byte) error) InsideHandler {
|
||||
fn := func(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||
f.decryptTo(w, hostInfo, header.MessageCounter, out, packet, fwPacket, nb)
|
||||
}
|
||||
return f.encrypted(fn)
|
||||
}
|
||||
|
||||
func (f *Interface) encrypted(h InsideHandler) InsideHandler {
|
||||
return func(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||
if !f.handleEncrypted(ci, addr, header) {
|
||||
return
|
||||
}
|
||||
|
||||
h(hostInfo, ci, addr, header, out, packet, fwPacket, nb)
|
||||
|
||||
f.handleHostRoaming(hostInfo, addr)
|
||||
f.connectionManager.In(hostInfo.hostId)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *Interface) rxMetrics(h InsideHandler) InsideHandler {
|
||||
return func(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||
f.messageMetrics.Rx(header.Type, header.Subtype, 1)
|
||||
h(hostInfo, ci, addr, header, out, packet, fwPacket, nb)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *Interface) handleMessagePacket(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||
f.decryptTo(f.inside.WriteRaw, hostInfo, header.MessageCounter, out, packet, fwPacket, nb)
|
||||
}
|
||||
|
||||
func (f *Interface) handleLighthousePacket(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||
d, err := f.decrypt(hostInfo, header.MessageCounter, out, packet, header, nb)
|
||||
if err != nil {
|
||||
hostInfo.logger().WithError(err).WithField("udpAddr", addr).
|
||||
WithField("packet", packet).
|
||||
Error("Failed to decrypt lighthouse packet")
|
||||
|
||||
//TODO: maybe after build 64 is out? 06/14/2018 - NB
|
||||
//f.sendRecvError(net.Addr(addr), header.RemoteIndex)
|
||||
return
|
||||
}
|
||||
|
||||
f.lightHouse.HandleRequest(addr, hostInfo.hostId, d, hostInfo.GetCert(), f)
|
||||
}
|
||||
|
||||
func (f *Interface) handleTestPacket(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||
d, err := f.decrypt(hostInfo, header.MessageCounter, out, packet, header, nb)
|
||||
if err != nil {
|
||||
hostInfo.logger().WithError(err).WithField("udpAddr", addr).
|
||||
WithField("packet", packet).
|
||||
Error("Failed to decrypt test packet")
|
||||
|
||||
//TODO: maybe after build 64 is out? 06/14/2018 - NB
|
||||
//f.sendRecvError(net.Addr(addr), header.RemoteIndex)
|
||||
return
|
||||
}
|
||||
|
||||
if header.Subtype == testRequest {
|
||||
// This testRequest might be from TryPromoteBest, so we should roam
|
||||
// to the new IP address before responding
|
||||
f.handleHostRoaming(hostInfo, addr)
|
||||
f.send(test, testReply, ci, hostInfo, hostInfo.remote, d, nb, out)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *Interface) handleHandshakePacket(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||
HandleIncomingHandshake(f, addr, packet, header, hostInfo)
|
||||
}
|
||||
|
||||
func (f *Interface) handleRecvErrorPacket(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||
// TODO: Remove this with recv_error deprecation
|
||||
f.handleRecvError(addr, header)
|
||||
}
|
||||
|
||||
func (f *Interface) handleCloseTunnelPacket(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||
hostInfo.logger().WithField("udpAddr", addr).
|
||||
Info("Close tunnel received, tearing down.")
|
||||
|
||||
f.closeTunnel(hostInfo)
|
||||
}
|
||||
@@ -54,6 +54,7 @@ var typeMap = map[NebulaMessageType]string{
|
||||
}
|
||||
|
||||
const (
|
||||
subTypeNone NebulaMessageSubType = 0
|
||||
testRequest NebulaMessageSubType = 0
|
||||
testReply NebulaMessageSubType = 1
|
||||
)
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *FirewallPacket, nb, out []byte) {
|
||||
func (f *Interface) consumeInsidePacket(st NebulaMessageSubType, packet []byte, fwPacket *FirewallPacket, nb, out []byte) {
|
||||
err := newPacket(packet, false, fwPacket)
|
||||
if err != nil {
|
||||
l.WithField("packet", packet).Debugf("Error while validating outbound packet: %s", err)
|
||||
@@ -45,7 +45,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *FirewallPacket,
|
||||
// the packet queue.
|
||||
ci.queueLock.Lock()
|
||||
if !ci.ready {
|
||||
hostinfo.cachePacket(message, 0, packet, f.sendMessageNow)
|
||||
hostinfo.cachePacket(message, st, packet, f.sendMessageNow)
|
||||
ci.queueLock.Unlock()
|
||||
return
|
||||
}
|
||||
@@ -54,7 +54,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *FirewallPacket,
|
||||
|
||||
dropReason := f.firewall.Drop(packet, *fwPacket, false, hostinfo, trustedCAs)
|
||||
if dropReason == nil {
|
||||
mc := f.sendNoMetrics(message, 0, ci, hostinfo, hostinfo.remote, packet, nb, out)
|
||||
mc := f.sendNoMetrics(message, st, ci, hostinfo, hostinfo.remote, packet, nb, out)
|
||||
if f.lightHouse != nil && mc%5000 == 0 {
|
||||
f.lightHouse.Query(fwPacket.RemoteIP, f)
|
||||
}
|
||||
|
||||
30
interface.go
30
interface.go
@@ -20,6 +20,8 @@ type Inside interface {
|
||||
WriteRaw([]byte) error
|
||||
}
|
||||
|
||||
type InsideHandler func(hostInfo *HostInfo, ci *ConnectionState, addr *udpAddr, header *Header, out []byte, packet []byte, fp *FirewallPacket, nb []byte)
|
||||
|
||||
type InterfaceConfig struct {
|
||||
HostMap *HostMap
|
||||
Outside *udpConn
|
||||
@@ -61,6 +63,9 @@ type Interface struct {
|
||||
tunQueues int
|
||||
version string
|
||||
|
||||
// handlers are mapped by protocol version -> type -> subtype
|
||||
handlers map[uint8]map[NebulaMessageType]map[NebulaMessageSubType]InsideHandler
|
||||
|
||||
metricHandshakes metrics.Histogram
|
||||
messageMetrics *MessageMetrics
|
||||
}
|
||||
@@ -103,6 +108,29 @@ func NewInterface(c *InterfaceConfig) (*Interface, error) {
|
||||
}
|
||||
|
||||
ifce.connectionManager = newConnectionManager(ifce, c.checkInterval, c.pendingDeletionInterval)
|
||||
ifce.handlers = map[uint8]map[NebulaMessageType]map[NebulaMessageSubType]InsideHandler{
|
||||
Version: {
|
||||
handshake: {
|
||||
handshakeIXPSK0: ifce.rxMetrics(ifce.handleHandshakePacket),
|
||||
},
|
||||
message: {
|
||||
subTypeNone: ifce.encrypted(ifce.handleMessagePacket),
|
||||
},
|
||||
recvError: {
|
||||
subTypeNone: ifce.rxMetrics(ifce.handleRecvErrorPacket),
|
||||
},
|
||||
lightHouse: {
|
||||
subTypeNone: ifce.rxMetrics(ifce.encrypted(ifce.handleLighthousePacket)),
|
||||
},
|
||||
test: {
|
||||
testRequest: ifce.rxMetrics(ifce.encrypted(ifce.handleTestPacket)),
|
||||
testReply: ifce.rxMetrics(ifce.encrypted(ifce.handleTestPacket)),
|
||||
},
|
||||
closeTunnel: {
|
||||
subTypeNone: ifce.rxMetrics(ifce.encrypted(ifce.handleCloseTunnelPacket)),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return ifce, nil
|
||||
}
|
||||
@@ -168,7 +196,7 @@ func (f *Interface) listenIn(i int) {
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
f.consumeInsidePacket(packet[:n], fwPacket, nb, out)
|
||||
f.consumeInsidePacket(subTypeNone, packet[:n], fwPacket, nb, out)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
93
outside.go
93
outside.go
@@ -39,98 +39,15 @@ func (f *Interface) readOutsidePackets(addr *udpAddr, out []byte, packet []byte,
|
||||
ci = hostinfo.ConnectionState
|
||||
}
|
||||
|
||||
switch header.Type {
|
||||
case message:
|
||||
if !f.handleEncrypted(ci, addr, header) {
|
||||
return
|
||||
}
|
||||
handle := f.handlers[header.Version][header.Type][header.Subtype]
|
||||
|
||||
f.decryptToTun(hostinfo, header.MessageCounter, out, packet, fwPacket, nb)
|
||||
|
||||
// Fallthrough to the bottom to record incoming traffic
|
||||
|
||||
case lightHouse:
|
||||
f.messageMetrics.Rx(header.Type, header.Subtype, 1)
|
||||
if !f.handleEncrypted(ci, addr, header) {
|
||||
return
|
||||
}
|
||||
|
||||
d, err := f.decrypt(hostinfo, header.MessageCounter, out, packet, header, nb)
|
||||
if err != nil {
|
||||
hostinfo.logger().WithError(err).WithField("udpAddr", addr).
|
||||
WithField("packet", packet).
|
||||
Error("Failed to decrypt lighthouse packet")
|
||||
|
||||
//TODO: maybe after build 64 is out? 06/14/2018 - NB
|
||||
//f.sendRecvError(net.Addr(addr), header.RemoteIndex)
|
||||
return
|
||||
}
|
||||
|
||||
f.lightHouse.HandleRequest(addr, hostinfo.hostId, d, hostinfo.GetCert(), f)
|
||||
|
||||
// Fallthrough to the bottom to record incoming traffic
|
||||
|
||||
case test:
|
||||
f.messageMetrics.Rx(header.Type, header.Subtype, 1)
|
||||
if !f.handleEncrypted(ci, addr, header) {
|
||||
return
|
||||
}
|
||||
|
||||
d, err := f.decrypt(hostinfo, header.MessageCounter, out, packet, header, nb)
|
||||
if err != nil {
|
||||
hostinfo.logger().WithError(err).WithField("udpAddr", addr).
|
||||
WithField("packet", packet).
|
||||
Error("Failed to decrypt test packet")
|
||||
|
||||
//TODO: maybe after build 64 is out? 06/14/2018 - NB
|
||||
//f.sendRecvError(net.Addr(addr), header.RemoteIndex)
|
||||
return
|
||||
}
|
||||
|
||||
if header.Subtype == testRequest {
|
||||
// This testRequest might be from TryPromoteBest, so we should roam
|
||||
// to the new IP address before responding
|
||||
f.handleHostRoaming(hostinfo, addr)
|
||||
f.send(test, testReply, ci, hostinfo, hostinfo.remote, d, nb, out)
|
||||
}
|
||||
|
||||
// Fallthrough to the bottom to record incoming traffic
|
||||
|
||||
// Non encrypted messages below here, they should not fall through to avoid tracking incoming traffic since they
|
||||
// are unauthenticated
|
||||
|
||||
case handshake:
|
||||
f.messageMetrics.Rx(header.Type, header.Subtype, 1)
|
||||
HandleIncomingHandshake(f, addr, packet, header, hostinfo)
|
||||
return
|
||||
|
||||
case recvError:
|
||||
f.messageMetrics.Rx(header.Type, header.Subtype, 1)
|
||||
// TODO: Remove this with recv_error deprecation
|
||||
f.handleRecvError(addr, header)
|
||||
return
|
||||
|
||||
case closeTunnel:
|
||||
f.messageMetrics.Rx(header.Type, header.Subtype, 1)
|
||||
if !f.handleEncrypted(ci, addr, header) {
|
||||
return
|
||||
}
|
||||
|
||||
hostinfo.logger().WithField("udpAddr", addr).
|
||||
Info("Close tunnel received, tearing down.")
|
||||
|
||||
f.closeTunnel(hostinfo)
|
||||
return
|
||||
|
||||
default:
|
||||
if handle == nil {
|
||||
f.messageMetrics.Rx(header.Type, header.Subtype, 1)
|
||||
hostinfo.logger().Debugf("Unexpected packet received from %s", addr)
|
||||
return
|
||||
}
|
||||
|
||||
f.handleHostRoaming(hostinfo, addr)
|
||||
|
||||
f.connectionManager.In(hostinfo.hostId)
|
||||
handle(hostinfo, ci, addr, header, out, packet, fwPacket, nb)
|
||||
}
|
||||
|
||||
func (f *Interface) closeTunnel(hostInfo *HostInfo) {
|
||||
@@ -258,7 +175,7 @@ func (f *Interface) decrypt(hostinfo *HostInfo, mc uint64, out []byte, packet []
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (f *Interface) decryptToTun(hostinfo *HostInfo, messageCounter uint64, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||
func (f *Interface) decryptTo(write func([]byte) error, hostinfo *HostInfo, messageCounter uint64, out []byte, packet []byte, fwPacket *FirewallPacket, nb []byte) {
|
||||
var err error
|
||||
|
||||
out, err = hostinfo.ConnectionState.dKey.DecryptDanger(out, packet[:HeaderLen], packet[HeaderLen:], messageCounter, nb)
|
||||
@@ -293,7 +210,7 @@ func (f *Interface) decryptToTun(hostinfo *HostInfo, messageCounter uint64, out
|
||||
}
|
||||
|
||||
f.connectionManager.In(hostinfo.hostId)
|
||||
err = f.inside.WriteRaw(out)
|
||||
err = write(out)
|
||||
if err != nil {
|
||||
l.WithError(err).Error("Failed to write to tun")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user