Compare commits

..

3 Commits

Author SHA1 Message Date
Ryan
a0f8cb2098 works properly 2025-11-05 22:09:06 -05:00
Ryan
d18d1aea67 first 2025-11-05 20:34:02 -05:00
Ryan
f5ff534671 make it work with dnclient 2025-11-05 19:25:32 -05:00
5 changed files with 624 additions and 10 deletions

View File

@@ -1,8 +1,10 @@
package cert
import (
"encoding/hex"
"encoding/pem"
"fmt"
"time"
"golang.org/x/crypto/ed25519"
)
@@ -138,6 +140,101 @@ func MarshalSigningPrivateKeyToPEM(curve Curve, b []byte) []byte {
}
}
// Backward compatibility functions for older API
func MarshalX25519PublicKey(b []byte) []byte {
return MarshalPublicKeyToPEM(Curve_CURVE25519, b)
}
func MarshalX25519PrivateKey(b []byte) []byte {
return MarshalPrivateKeyToPEM(Curve_CURVE25519, b)
}
func MarshalPublicKey(curve Curve, b []byte) []byte {
return MarshalPublicKeyToPEM(curve, b)
}
func MarshalPrivateKey(curve Curve, b []byte) []byte {
return MarshalPrivateKeyToPEM(curve, b)
}
// NebulaCertificate is a compatibility wrapper for the old API
type NebulaCertificate struct {
Details NebulaCertificateDetails
Signature []byte
cert Certificate
}
// NebulaCertificateDetails is a compatibility wrapper for certificate details
type NebulaCertificateDetails struct {
Name string
NotBefore time.Time
NotAfter time.Time
PublicKey []byte
IsCA bool
Issuer []byte
Curve Curve
}
// UnmarshalNebulaCertificateFromPEM provides backward compatibility with the old API
func UnmarshalNebulaCertificateFromPEM(b []byte) (*NebulaCertificate, []byte, error) {
c, rest, err := UnmarshalCertificateFromPEM(b)
if err != nil {
return nil, rest, err
}
issuerBytes, err := func() ([]byte, error) {
issuer := c.Issuer()
if issuer == "" {
return nil, nil
}
decoded, err := hex.DecodeString(issuer)
if err != nil {
return nil, fmt.Errorf("failed to decode issuer fingerprint: %w", err)
}
return decoded, nil
}()
if err != nil {
return nil, rest, err
}
pubKey := c.PublicKey()
if pubKey != nil {
pubKey = append([]byte(nil), pubKey...)
}
sig := c.Signature()
if sig != nil {
sig = append([]byte(nil), sig...)
}
return &NebulaCertificate{
Details: NebulaCertificateDetails{
Name: c.Name(),
NotBefore: c.NotBefore(),
NotAfter: c.NotAfter(),
PublicKey: pubKey,
IsCA: c.IsCA(),
Issuer: issuerBytes,
Curve: c.Curve(),
},
Signature: sig,
cert: c,
}, rest, nil
}
// IssuerString returns the issuer in hex format for compatibility
func (n *NebulaCertificate) IssuerString() string {
if n.Details.Issuer == nil {
return ""
}
return hex.EncodeToString(n.Details.Issuer)
}
// Certificate returns the underlying certificate (read-only)
func (n *NebulaCertificate) Certificate() Certificate {
return n.cert
}
// UnmarshalPrivateKeyFromPEM will try to unmarshal the first pem block in a byte array, returning any non
// consumed data or an error on failure
func UnmarshalPrivateKeyFromPEM(b []byte) ([]byte, []byte, Curve, error) {

View File

@@ -165,7 +165,7 @@ func NewUDPStatsEmitter(udpConns []Conn) func() {
return func() {}
}
func (u *StdConn) ListenOut(r EncReader) {
func (u *StdConn) ListenOut(r EncReader) error {
buffer := make([]byte, MTU)
for {
@@ -174,14 +174,17 @@ func (u *StdConn) ListenOut(r EncReader) {
if err != nil {
if errors.Is(err, net.ErrClosed) {
u.l.WithError(err).Debug("udp socket is closed, exiting read loop")
return
return err
}
u.l.WithError(err).Error("unexpected udp socket receive error")
continue
}
r(netip.AddrPortFrom(rua.Addr().Unmap(), rua.Port()), buffer[:n])
}
return nil
}
func (u *StdConn) Rebind() error {

View File

@@ -5,9 +5,11 @@ package udp
import (
"encoding/binary"
"errors"
"fmt"
"net"
"net/netip"
"sync"
"syscall"
"time"
"unsafe"
@@ -20,11 +22,38 @@ import (
var readTimeout = unix.NsecToTimeval(int64(time.Millisecond * 500))
const (
defaultGSOMaxSegments = 8
defaultGSOFlushTimeout = 150 * time.Microsecond
defaultGROReadBufferSize = MTU * defaultGSOMaxSegments
maxGSOBatchBytes = 0xFFFF
)
var (
errGSOFallback = errors.New("udp gso fallback")
errGSODisabled = errors.New("udp gso disabled")
)
type StdConn struct {
sysFd int
isV4 bool
l *logrus.Logger
batch int
enableGRO bool
enableGSO bool
gsoMu sync.Mutex
gsoBuf []byte
gsoAddr netip.AddrPort
gsoSegSize int
gsoSegments int
gsoMaxSegments int
gsoMaxBytes int
gsoFlushTimeout time.Duration
gsoTimer *time.Timer
groBufSize int
}
func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch int) (Conn, error) {
@@ -69,7 +98,16 @@ func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch in
return nil, fmt.Errorf("unable to bind to socket: %s", err)
}
return &StdConn{sysFd: fd, isV4: ip.Is4(), l: l, batch: batch}, err
return &StdConn{
sysFd: fd,
isV4: ip.Is4(),
l: l,
batch: batch,
gsoMaxSegments: defaultGSOMaxSegments,
gsoMaxBytes: MTU * defaultGSOMaxSegments,
gsoFlushTimeout: defaultGSOFlushTimeout,
groBufSize: MTU,
}, err
}
func (u *StdConn) Rebind() error {
@@ -119,15 +157,42 @@ func (u *StdConn) LocalAddr() (netip.AddrPort, error) {
}
func (u *StdConn) ListenOut(r EncReader) error {
var ip netip.Addr
var (
ip netip.Addr
controls [][]byte
)
msgs, buffers, names := u.PrepareRawMessages(u.batch)
bufSize := u.readBufferSize()
msgs, buffers, names := u.PrepareRawMessages(u.batch, bufSize)
read := u.ReadMulti
if u.batch == 1 {
read = u.ReadSingle
}
for {
desired := u.readBufferSize()
if len(buffers) == 0 || cap(buffers[0]) < desired {
msgs, buffers, names = u.PrepareRawMessages(u.batch, desired)
controls = nil
}
if u.enableGRO {
if controls == nil {
controls = make([][]byte, len(msgs))
for i := range controls {
controls[i] = make([]byte, unix.CmsgSpace(4))
}
}
for i := range msgs {
setRawMessageControl(&msgs[i], controls[i])
}
} else if controls != nil {
for i := range msgs {
setRawMessageControl(&msgs[i], nil)
}
controls = nil
}
n, err := read(msgs)
if err != nil {
return err
@@ -140,11 +205,82 @@ func (u *StdConn) ListenOut(r EncReader) error {
} else {
ip, _ = netip.AddrFromSlice(names[i][8:24])
}
r(netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4])), buffers[i][:msgs[i].Len])
addr := netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4]))
payload := buffers[i][:msgs[i].Len]
if u.enableGRO && u.l.IsLevelEnabled(logrus.DebugLevel) {
ctrlLen := getRawMessageControlLen(&msgs[i])
msgFlags := getRawMessageFlags(&msgs[i])
u.l.WithFields(logrus.Fields{
"tag": "gro-debug",
"stage": "recv",
"payload_len": len(payload),
"ctrl_len": ctrlLen,
"msg_flags": msgFlags,
}).Debug("gro batch data")
if controls != nil && ctrlLen > 0 {
maxDump := ctrlLen
if maxDump > 16 {
maxDump = 16
}
u.l.WithFields(logrus.Fields{
"tag": "gro-debug",
"stage": "control-bytes",
"control_hex": fmt.Sprintf("%x", controls[i][:maxDump]),
"datalen": ctrlLen,
}).Debug("gro control dump")
}
}
sawControl := false
if controls != nil {
if ctrlLen := getRawMessageControlLen(&msgs[i]); ctrlLen > 0 {
if segSize, segCount := parseGROControl(controls[i][:ctrlLen]); segSize > 0 {
sawControl = true
if u.l.IsLevelEnabled(logrus.DebugLevel) {
u.l.WithFields(logrus.Fields{
"tag": "gro-debug",
"stage": "control",
"seg_size": segSize,
"seg_count": segCount,
"payloadLen": len(payload),
}).Debug("gro control parsed")
}
segSize = normalizeGROSegSize(segSize, segCount, len(payload))
if segSize > 0 && segSize < len(payload) {
if u.emitGROSegments(r, addr, payload, segSize) {
continue
}
}
}
}
}
if u.enableGRO && len(payload) > MTU {
if !sawControl && u.l.IsLevelEnabled(logrus.DebugLevel) {
u.l.WithFields(logrus.Fields{
"tag": "gro-debug",
"stage": "fallback",
"payload_len": len(payload),
}).Debug("gro control missing; splitting payload by MTU")
}
if u.emitGROSegments(r, addr, payload, MTU) {
continue
}
}
r(addr, payload)
}
}
}
func (u *StdConn) readBufferSize() int {
if u.enableGRO && u.groBufSize > MTU {
return u.groBufSize
}
return MTU
}
func (u *StdConn) ReadSingle(msgs []rawMessage) (int, error) {
for {
n, _, err := unix.Syscall6(
@@ -193,6 +329,14 @@ func (u *StdConn) ReadMulti(msgs []rawMessage) (int, error) {
}
func (u *StdConn) WriteTo(b []byte, ip netip.AddrPort) error {
if u.enableGSO && ip.IsValid() {
if err := u.queueGSOPacket(b, ip); err == nil {
return nil
} else if !errors.Is(err, errGSOFallback) {
return err
}
}
if u.isV4 {
return u.writeTo4(b, ip)
}
@@ -299,6 +443,94 @@ func (u *StdConn) ReloadConfig(c *config.C) {
u.l.WithError(err).Error("Failed to set listen.so_mark")
}
}
u.configureGRO(c)
u.configureGSO(c)
}
func (u *StdConn) configureGRO(c *config.C) {
if c == nil {
return
}
enable := c.GetBool("listen.enable_gro", false)
if enable == u.enableGRO {
if enable {
if size := c.GetInt("listen.gro_read_buffer", 0); size > 0 {
u.setGROBufferSize(size)
}
}
return
}
if enable {
if err := unix.SetsockoptInt(u.sysFd, unix.SOL_UDP, unix.UDP_GRO, 1); err != nil {
u.l.WithError(err).Warn("Failed to enable UDP GRO")
return
}
u.enableGRO = true
u.setGROBufferSize(c.GetInt("listen.gro_read_buffer", defaultGROReadBufferSize))
u.l.WithField("buffer_size", u.groBufSize).Info("UDP GRO enabled")
return
}
if err := unix.SetsockoptInt(u.sysFd, unix.SOL_UDP, unix.UDP_GRO, 0); err != nil && err != unix.ENOPROTOOPT {
u.l.WithError(err).Warn("Failed to disable UDP GRO")
}
u.enableGRO = false
u.groBufSize = MTU
}
func (u *StdConn) configureGSO(c *config.C) {
enable := c.GetBool("listen.enable_gso", false)
if !enable {
u.disableGSO()
} else {
u.enableGSO = true
}
segments := c.GetInt("listen.gso_max_segments", defaultGSOMaxSegments)
if segments < 1 {
segments = 1
}
u.gsoMaxSegments = segments
maxBytes := c.GetInt("listen.gso_max_bytes", 0)
if maxBytes <= 0 {
maxBytes = MTU * segments
}
if maxBytes > maxGSOBatchBytes {
u.l.WithField("requested", maxBytes).Warn("listen.gso_max_bytes larger than UDP limit; clamping")
maxBytes = maxGSOBatchBytes
}
u.gsoMaxBytes = maxBytes
timeout := c.GetDuration("listen.gso_flush_timeout", defaultGSOFlushTimeout)
if timeout < 0 {
timeout = 0
}
u.gsoFlushTimeout = timeout
}
func (u *StdConn) setGROBufferSize(size int) {
if size < MTU {
size = defaultGROReadBufferSize
}
if size > maxGSOBatchBytes {
size = maxGSOBatchBytes
}
u.groBufSize = size
}
func (u *StdConn) disableGSO() {
u.gsoMu.Lock()
defer u.gsoMu.Unlock()
u.enableGSO = false
_ = u.flushGSOlocked()
u.gsoBuf = nil
u.gsoSegments = 0
u.gsoSegSize = 0
u.stopGSOTimerLocked()
}
func (u *StdConn) getMemInfo(meminfo *[unix.SK_MEMINFO_VARS]uint32) error {
@@ -310,7 +542,239 @@ func (u *StdConn) getMemInfo(meminfo *[unix.SK_MEMINFO_VARS]uint32) error {
return nil
}
func (u *StdConn) queueGSOPacket(b []byte, addr netip.AddrPort) error {
if len(b) == 0 {
return nil
}
u.gsoMu.Lock()
defer u.gsoMu.Unlock()
if !u.enableGSO || !addr.IsValid() || len(b) > u.gsoMaxBytes {
if err := u.flushGSOlocked(); err != nil {
return err
}
return errGSOFallback
}
if u.gsoSegments == 0 {
if cap(u.gsoBuf) < u.gsoMaxBytes {
u.gsoBuf = make([]byte, 0, u.gsoMaxBytes)
}
u.gsoAddr = addr
u.gsoSegSize = len(b)
} else if addr != u.gsoAddr || len(b) != u.gsoSegSize {
if err := u.flushGSOlocked(); err != nil {
return err
}
if cap(u.gsoBuf) < u.gsoMaxBytes {
u.gsoBuf = make([]byte, 0, u.gsoMaxBytes)
}
u.gsoAddr = addr
u.gsoSegSize = len(b)
}
if len(u.gsoBuf)+len(b) > u.gsoMaxBytes {
if err := u.flushGSOlocked(); err != nil {
return err
}
if cap(u.gsoBuf) < u.gsoMaxBytes {
u.gsoBuf = make([]byte, 0, u.gsoMaxBytes)
}
u.gsoAddr = addr
u.gsoSegSize = len(b)
}
u.gsoBuf = append(u.gsoBuf, b...)
u.gsoSegments++
if u.gsoSegments >= u.gsoMaxSegments || u.gsoFlushTimeout <= 0 {
return u.flushGSOlocked()
}
u.scheduleGSOFlushLocked()
return nil
}
func (u *StdConn) flushGSOlocked() error {
if u.gsoSegments == 0 {
u.stopGSOTimerLocked()
return nil
}
payload := append([]byte(nil), u.gsoBuf...)
addr := u.gsoAddr
segSize := u.gsoSegSize
u.gsoBuf = u.gsoBuf[:0]
u.gsoSegments = 0
u.gsoSegSize = 0
u.stopGSOTimerLocked()
if segSize <= 0 {
return errGSOFallback
}
err := u.sendSegmented(payload, addr, segSize)
if errors.Is(err, errGSODisabled) {
u.l.WithField("addr", addr).Warn("UDP GSO disabled by kernel, falling back to sendto")
u.enableGSO = false
return u.sendSegmentsIndividually(payload, addr, segSize)
}
return err
}
func (u *StdConn) sendSegmented(payload []byte, addr netip.AddrPort, segSize int) error {
if len(payload) == 0 {
return nil
}
control := make([]byte, unix.CmsgSpace(2))
hdr := (*unix.Cmsghdr)(unsafe.Pointer(&control[0]))
hdr.Level = unix.SOL_UDP
hdr.Type = unix.UDP_SEGMENT
setCmsgLen(hdr, unix.CmsgLen(2))
binary.NativeEndian.PutUint16(control[unix.CmsgLen(0):unix.CmsgLen(0)+2], uint16(segSize))
var sa unix.Sockaddr
if addr.Addr().Is4() {
var sa4 unix.SockaddrInet4
sa4.Port = int(addr.Port())
sa4.Addr = addr.Addr().As4()
sa = &sa4
} else {
var sa6 unix.SockaddrInet6
sa6.Port = int(addr.Port())
sa6.Addr = addr.Addr().As16()
sa = &sa6
}
if _, err := unix.SendmsgN(u.sysFd, payload, control, sa, 0); err != nil {
if errno, ok := err.(syscall.Errno); ok && (errno == unix.EINVAL || errno == unix.ENOTSUP || errno == unix.EOPNOTSUPP) {
return errGSODisabled
}
return &net.OpError{Op: "sendmsg", Err: err}
}
return nil
}
func (u *StdConn) sendSegmentsIndividually(buf []byte, addr netip.AddrPort, segSize int) error {
if segSize <= 0 {
return errGSOFallback
}
for offset := 0; offset < len(buf); offset += segSize {
end := offset + segSize
if end > len(buf) {
end = len(buf)
}
var err error
if u.isV4 {
err = u.writeTo4(buf[offset:end], addr)
} else {
err = u.writeTo6(buf[offset:end], addr)
}
if err != nil {
return err
}
}
return nil
}
func (u *StdConn) scheduleGSOFlushLocked() {
if u.gsoTimer == nil {
u.gsoTimer = time.AfterFunc(u.gsoFlushTimeout, u.gsoFlushTimer)
return
}
u.gsoTimer.Reset(u.gsoFlushTimeout)
}
func (u *StdConn) stopGSOTimerLocked() {
if u.gsoTimer != nil {
u.gsoTimer.Stop()
u.gsoTimer = nil
}
}
func (u *StdConn) gsoFlushTimer() {
u.gsoMu.Lock()
defer u.gsoMu.Unlock()
_ = u.flushGSOlocked()
}
func parseGROControl(control []byte) (int, int) {
if len(control) == 0 {
return 0, 0
}
cmsgs, err := unix.ParseSocketControlMessage(control)
if err != nil {
return 0, 0
}
for _, c := range cmsgs {
if c.Header.Level == unix.SOL_UDP && c.Header.Type == unix.UDP_GRO && len(c.Data) >= 2 {
segSize := int(binary.NativeEndian.Uint16(c.Data[:2]))
segCount := 0
if len(c.Data) >= 4 {
segCount = int(binary.NativeEndian.Uint16(c.Data[2:4]))
}
return segSize, segCount
}
}
return 0, 0
}
func (u *StdConn) emitGROSegments(r EncReader, addr netip.AddrPort, payload []byte, segSize int) bool {
if segSize <= 0 {
return false
}
for offset := 0; offset < len(payload); offset += segSize {
end := offset + segSize
if end > len(payload) {
end = len(payload)
}
segment := make([]byte, end-offset)
copy(segment, payload[offset:end])
r(addr, segment)
}
return true
}
func normalizeGROSegSize(segSize, segCount, total int) int {
if segSize <= 0 || total <= 0 {
return segSize
}
if segSize > total && segCount > 0 {
segSize = total / segCount
if segSize == 0 {
segSize = total
}
}
if segCount <= 1 && segSize > 0 && total > segSize {
calculated := total / segSize
if calculated <= 1 {
calculated = (total + segSize - 1) / segSize
}
if calculated > 1 {
segCount = calculated
}
}
if segSize > MTU {
return MTU
}
return segSize
}
func (u *StdConn) Close() error {
u.disableGSO()
return syscall.Close(u.sysFd)
}

View File

@@ -30,13 +30,16 @@ type rawMessage struct {
Len uint32
}
func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
func (u *StdConn) PrepareRawMessages(n int, bufSize int) ([]rawMessage, [][]byte, [][]byte) {
if bufSize <= 0 {
bufSize = MTU
}
msgs := make([]rawMessage, n)
buffers := make([][]byte, n)
names := make([][]byte, n)
for i := range msgs {
buffers[i] = make([]byte, MTU)
buffers[i] = make([]byte, bufSize)
names[i] = make([]byte, unix.SizeofSockaddrInet6)
vs := []iovec{
@@ -52,3 +55,25 @@ func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
return msgs, buffers, names
}
func setRawMessageControl(msg *rawMessage, buf []byte) {
if len(buf) == 0 {
msg.Hdr.Control = nil
msg.Hdr.Controllen = 0
return
}
msg.Hdr.Control = &buf[0]
msg.Hdr.Controllen = uint32(len(buf))
}
func getRawMessageControlLen(msg *rawMessage) int {
return int(msg.Hdr.Controllen)
}
func getRawMessageFlags(msg *rawMessage) int {
return int(msg.Hdr.Flags)
}
func setCmsgLen(h *unix.Cmsghdr, l int) {
h.Len = uint32(l)
}

View File

@@ -33,13 +33,16 @@ type rawMessage struct {
Pad0 [4]byte
}
func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
func (u *StdConn) PrepareRawMessages(n int, bufSize int) ([]rawMessage, [][]byte, [][]byte) {
if bufSize <= 0 {
bufSize = MTU
}
msgs := make([]rawMessage, n)
buffers := make([][]byte, n)
names := make([][]byte, n)
for i := range msgs {
buffers[i] = make([]byte, MTU)
buffers[i] = make([]byte, bufSize)
names[i] = make([]byte, unix.SizeofSockaddrInet6)
vs := []iovec{
@@ -55,3 +58,25 @@ func (u *StdConn) PrepareRawMessages(n int) ([]rawMessage, [][]byte, [][]byte) {
return msgs, buffers, names
}
func setRawMessageControl(msg *rawMessage, buf []byte) {
if len(buf) == 0 {
msg.Hdr.Control = nil
msg.Hdr.Controllen = 0
return
}
msg.Hdr.Control = &buf[0]
msg.Hdr.Controllen = uint64(len(buf))
}
func getRawMessageControlLen(msg *rawMessage) int {
return int(msg.Hdr.Controllen)
}
func getRawMessageFlags(msg *rawMessage) int {
return int(msg.Hdr.Flags)
}
func setCmsgLen(h *unix.Cmsghdr, l int) {
h.Len = uint64(l)
}