diff --git a/packet/packet.go b/packet/packet.go index 0d096ec..31b9fd9 100644 --- a/packet/packet.go +++ b/packet/packet.go @@ -84,13 +84,19 @@ func (p *Packet) SetSegSizeForTX() { binary.NativeEndian.PutUint16(p.Control[unix.CmsgLen(0):unix.CmsgLen(0)+2], uint16(p.SegSize)) } -func (p *Packet) CompatibleForSegmentationWith(otherP *Packet) bool { +func (p *Packet) CompatibleForSegmentationWith(otherP *Packet, currentTotalSize int) bool { //same dest if !slices.Equal(p.Name, otherP.Name) { return false } + //don't get too big + if len(p.Payload)+currentTotalSize >= 0xffff { + return false + } + //same body len + //todo allow single different size at end if len(p.Payload) != len(otherP.Payload) { return false //todo technically you can cram one extra in } diff --git a/udp/udp_linux.go b/udp/udp_linux.go index 47dc869..f1aa781 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -18,6 +18,9 @@ import ( "golang.org/x/sys/unix" ) +const iovMax = 128 //1024 //no unix constant for this? from limits.h +//todo I'd like this to be 1024 but we seem to hit errors around ~130? + type StdConn struct { sysFd int isV4 bool @@ -66,8 +69,12 @@ func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch in return nil, fmt.Errorf("unable to bind to socket: %s", err) } - msgs := make([]rawMessage, 0, 8192) //todo configure - iovs := make([][]iovec, 0, len(msgs)) + const batchSize = 8192 + msgs := make([]rawMessage, 0, batchSize) //todo configure + iovs := make([][]iovec, batchSize) + for i := range iovs { + iovs[i] = make([]iovec, iovMax) + } return &StdConn{ sysFd: fd, isV4: ip.Is4(), @@ -220,11 +227,11 @@ func (u *StdConn) WriteBatch(pkts []*packet.Packet) (int, error) { } u.msgs = u.msgs[:0] - u.iovs = u.iovs[:0] + //u.iovs = u.iovs[:0] sent := 0 - const maxIovLen = 48 var mostRecentPkt *packet.Packet + mostRecentPktSize := 0 //segmenting := false idx := 0 for _, pkt := range pkts { @@ -233,19 +240,18 @@ func (u *StdConn) WriteBatch(pkts []*packet.Packet) (int, error) { continue } lastIdx := idx - 1 - if mostRecentPkt != nil && pkt.CompatibleForSegmentationWith(mostRecentPkt) && u.msgs[lastIdx].Hdr.Iovlen < maxIovLen { //todo math this more good - + if mostRecentPkt != nil && pkt.CompatibleForSegmentationWith(mostRecentPkt, mostRecentPktSize) && u.msgs[lastIdx].Hdr.Iovlen < iovMax { u.msgs[lastIdx].Hdr.Controllen = uint64(len(mostRecentPkt.Control)) u.msgs[lastIdx].Hdr.Control = &mostRecentPkt.Control[0] + + u.iovs[lastIdx][u.msgs[lastIdx].Hdr.Iovlen].Base = &pkt.Payload[0] + u.iovs[lastIdx][u.msgs[lastIdx].Hdr.Iovlen].Len = uint64(len(pkt.Payload)) u.msgs[lastIdx].Hdr.Iovlen++ - u.iovs[lastIdx] = append(u.iovs[lastIdx], iovec{ - Base: &pkt.Payload[0], - Len: uint64(len(pkt.Payload)), - }) + + mostRecentPktSize += len(pkt.Payload) mostRecentPkt.SetSegSizeForTX() } else { u.msgs = append(u.msgs, rawMessage{}) - u.iovs = append(u.iovs, make([]iovec, 1, maxIovLen)) //todo less garbage u.iovs[idx][0] = iovec{ Base: &pkt.Payload[0], Len: uint64(len(pkt.Payload)), @@ -263,8 +269,8 @@ func (u *StdConn) WriteBatch(pkts []*packet.Packet) (int, error) { msg.Hdr.Name = &pkt.Name[0] msg.Hdr.Namelen = uint32(len(pkt.Name)) mostRecentPkt = pkt + mostRecentPktSize = len(pkt.Payload) } - } if len(u.msgs) == 0 { @@ -287,6 +293,23 @@ func (u *StdConn) WriteBatch(pkts []*packet.Packet) (int, error) { if errno == unix.EINTR { continue } + //for i := 0; i < len(u.msgs); i++ { + // for j := 0; j < int(u.msgs[i].Hdr.Iovlen); j++ { + // u.l.WithFields(logrus.Fields{ + // "msg_index": i, + // "iov idx": j, + // "iov": fmt.Sprintf("%+v", u.iovs[i][j]), + // }).Warn("failed to send message") + // } + // + //} + u.l.WithFields(logrus.Fields{ + "errno": errno, + "idx": idx, + "len": len(u.msgs), + "deets": fmt.Sprintf("%+v", u.msgs), + "lastIOV": fmt.Sprintf("%+v", u.iovs[len(u.msgs)-1][u.msgs[len(u.msgs)-1].Hdr.Iovlen-1]), + }).Error("failed to send message") return sent + offset, &net.OpError{Op: "sendmmsg", Err: errno} }