mirror of
https://github.com/slackhq/nebula.git
synced 2025-11-22 16:34:25 +01:00
re-use IOVs
This commit is contained in:
@@ -84,13 +84,19 @@ func (p *Packet) SetSegSizeForTX() {
|
||||
binary.NativeEndian.PutUint16(p.Control[unix.CmsgLen(0):unix.CmsgLen(0)+2], uint16(p.SegSize))
|
||||
}
|
||||
|
||||
func (p *Packet) CompatibleForSegmentationWith(otherP *Packet) bool {
|
||||
func (p *Packet) CompatibleForSegmentationWith(otherP *Packet, currentTotalSize int) bool {
|
||||
//same dest
|
||||
if !slices.Equal(p.Name, otherP.Name) {
|
||||
return false
|
||||
}
|
||||
|
||||
//don't get too big
|
||||
if len(p.Payload)+currentTotalSize >= 0xffff {
|
||||
return false
|
||||
}
|
||||
|
||||
//same body len
|
||||
//todo allow single different size at end
|
||||
if len(p.Payload) != len(otherP.Payload) {
|
||||
return false //todo technically you can cram one extra in
|
||||
}
|
||||
|
||||
@@ -18,6 +18,9 @@ import (
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
const iovMax = 128 //1024 //no unix constant for this? from limits.h
|
||||
//todo I'd like this to be 1024 but we seem to hit errors around ~130?
|
||||
|
||||
type StdConn struct {
|
||||
sysFd int
|
||||
isV4 bool
|
||||
@@ -66,8 +69,12 @@ func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch in
|
||||
return nil, fmt.Errorf("unable to bind to socket: %s", err)
|
||||
}
|
||||
|
||||
msgs := make([]rawMessage, 0, 8192) //todo configure
|
||||
iovs := make([][]iovec, 0, len(msgs))
|
||||
const batchSize = 8192
|
||||
msgs := make([]rawMessage, 0, batchSize) //todo configure
|
||||
iovs := make([][]iovec, batchSize)
|
||||
for i := range iovs {
|
||||
iovs[i] = make([]iovec, iovMax)
|
||||
}
|
||||
return &StdConn{
|
||||
sysFd: fd,
|
||||
isV4: ip.Is4(),
|
||||
@@ -220,11 +227,11 @@ func (u *StdConn) WriteBatch(pkts []*packet.Packet) (int, error) {
|
||||
}
|
||||
|
||||
u.msgs = u.msgs[:0]
|
||||
u.iovs = u.iovs[:0]
|
||||
//u.iovs = u.iovs[:0]
|
||||
|
||||
sent := 0
|
||||
const maxIovLen = 48
|
||||
var mostRecentPkt *packet.Packet
|
||||
mostRecentPktSize := 0
|
||||
//segmenting := false
|
||||
idx := 0
|
||||
for _, pkt := range pkts {
|
||||
@@ -233,19 +240,18 @@ func (u *StdConn) WriteBatch(pkts []*packet.Packet) (int, error) {
|
||||
continue
|
||||
}
|
||||
lastIdx := idx - 1
|
||||
if mostRecentPkt != nil && pkt.CompatibleForSegmentationWith(mostRecentPkt) && u.msgs[lastIdx].Hdr.Iovlen < maxIovLen { //todo math this more good
|
||||
|
||||
if mostRecentPkt != nil && pkt.CompatibleForSegmentationWith(mostRecentPkt, mostRecentPktSize) && u.msgs[lastIdx].Hdr.Iovlen < iovMax {
|
||||
u.msgs[lastIdx].Hdr.Controllen = uint64(len(mostRecentPkt.Control))
|
||||
u.msgs[lastIdx].Hdr.Control = &mostRecentPkt.Control[0]
|
||||
|
||||
u.iovs[lastIdx][u.msgs[lastIdx].Hdr.Iovlen].Base = &pkt.Payload[0]
|
||||
u.iovs[lastIdx][u.msgs[lastIdx].Hdr.Iovlen].Len = uint64(len(pkt.Payload))
|
||||
u.msgs[lastIdx].Hdr.Iovlen++
|
||||
u.iovs[lastIdx] = append(u.iovs[lastIdx], iovec{
|
||||
Base: &pkt.Payload[0],
|
||||
Len: uint64(len(pkt.Payload)),
|
||||
})
|
||||
|
||||
mostRecentPktSize += len(pkt.Payload)
|
||||
mostRecentPkt.SetSegSizeForTX()
|
||||
} else {
|
||||
u.msgs = append(u.msgs, rawMessage{})
|
||||
u.iovs = append(u.iovs, make([]iovec, 1, maxIovLen)) //todo less garbage
|
||||
u.iovs[idx][0] = iovec{
|
||||
Base: &pkt.Payload[0],
|
||||
Len: uint64(len(pkt.Payload)),
|
||||
@@ -263,8 +269,8 @@ func (u *StdConn) WriteBatch(pkts []*packet.Packet) (int, error) {
|
||||
msg.Hdr.Name = &pkt.Name[0]
|
||||
msg.Hdr.Namelen = uint32(len(pkt.Name))
|
||||
mostRecentPkt = pkt
|
||||
mostRecentPktSize = len(pkt.Payload)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if len(u.msgs) == 0 {
|
||||
@@ -287,6 +293,23 @@ func (u *StdConn) WriteBatch(pkts []*packet.Packet) (int, error) {
|
||||
if errno == unix.EINTR {
|
||||
continue
|
||||
}
|
||||
//for i := 0; i < len(u.msgs); i++ {
|
||||
// for j := 0; j < int(u.msgs[i].Hdr.Iovlen); j++ {
|
||||
// u.l.WithFields(logrus.Fields{
|
||||
// "msg_index": i,
|
||||
// "iov idx": j,
|
||||
// "iov": fmt.Sprintf("%+v", u.iovs[i][j]),
|
||||
// }).Warn("failed to send message")
|
||||
// }
|
||||
//
|
||||
//}
|
||||
u.l.WithFields(logrus.Fields{
|
||||
"errno": errno,
|
||||
"idx": idx,
|
||||
"len": len(u.msgs),
|
||||
"deets": fmt.Sprintf("%+v", u.msgs),
|
||||
"lastIOV": fmt.Sprintf("%+v", u.iovs[len(u.msgs)-1][u.msgs[len(u.msgs)-1].Hdr.Iovlen-1]),
|
||||
}).Error("failed to send message")
|
||||
return sent + offset, &net.OpError{Op: "sendmmsg", Err: errno}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user