diff --git a/interface.go b/interface.go index 082906d..af63745 100644 --- a/interface.go +++ b/interface.go @@ -271,7 +271,7 @@ func (f *Interface) listenOut(i int) { fwPacket := &firewall.Packet{} nb := make([]byte, 12, 12) - li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) { + li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte, q int) { f.readOutsidePackets(fromUdpAddr, nil, plaintext[:0], payload, h, fwPacket, lhh, nb, i, ctCache.Get(f.l)) }) } diff --git a/udp/conn.go b/udp/conn.go index 895b0df..d00f276 100644 --- a/udp/conn.go +++ b/udp/conn.go @@ -11,6 +11,7 @@ const MTU = 9001 type EncReader func( addr netip.AddrPort, payload []byte, + q int, ) type Conn interface { diff --git a/udp/udp_linux.go b/udp/udp_linux.go index 4ab466f..3ca2e88 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -25,7 +25,7 @@ const ( defaultGSOMaxSegments = 64 defaultGSOMaxBytes = 64000 defaultGROReadBufferSize = 2 * defaultGSOMaxBytes - defaultGSOFlushTimeout = 50 * time.Microsecond + defaultGSOFlushTimeout = 100 * time.Microsecond ) type StdConn struct { @@ -171,19 +171,17 @@ func (u *StdConn) ListenOut(r EncReader) { for { //desiredControl := int(u.controlLen.Load()) - hasControl := len(controls) > 0 + //hasControl := len(controls) > 0 //if (desiredControl > 0) != hasControl || (desiredControl > 0 && hasControl && len(controls[0]) != desiredControl) { // msgs, buffers, names, controls = u.PrepareRawMessages(u.batch) // hasControl = len(controls) > 0 //} // - if hasControl { - for i := range msgs { - if len(controls) <= i || len(controls[i]) == 0 { - continue - } - msgs[i].Hdr.Controllen = controllen(len(controls[i])) + for i := range msgs { + if len(controls) <= i || len(controls[i]) == 0 { + continue } + msgs[i].Hdr.Controllen = controllen(len(controls[i])) } n, err := read(msgs) @@ -224,7 +222,7 @@ func (u *StdConn) ListenOut(r EncReader) { } } - r(addr, buffers[i][:payloadLen]) + r(addr, buffers[i][:payloadLen], 0) } } } @@ -386,6 +384,7 @@ func (u *StdConn) writeToGSO(b []byte, addr netip.AddrPort) error { } } + inBuf := len(u.gsoPendingBuf) + len(b) if len(u.gsoPendingBuf)+len(b) > u.gsoMaxBytes { if err := u.flushPendingLocked(); err != nil { return err @@ -405,7 +404,7 @@ func (u *StdConn) writeToGSO(b []byte, addr netip.AddrPort) error { return u.flushPendingLocked() } - u.scheduleFlushLocked() + u.scheduleFlushLocked(inBuf) return nil } @@ -531,19 +530,25 @@ func (u *StdConn) sendSequentialLocked(buf []byte, addr netip.AddrPort, segSize return nil } -func (u *StdConn) scheduleFlushLocked() { +func (u *StdConn) scheduleFlushLocked(inBuf int) { if u.gsoFlushTimeout <= 0 { _ = u.flushPendingLocked() return } + + t := u.gsoFlushTimeout + if inBuf > u.gsoMaxBytes/2 { + t = t / 2 + } + if u.gsoFlushTimer == nil { - u.gsoFlushTimer = time.AfterFunc(u.gsoFlushTimeout, u.flushTimerHandler) + u.gsoFlushTimer = time.AfterFunc(t, u.flushTimerHandler) return } if !u.gsoFlushTimer.Stop() { // timer already fired or running; allow handler to exit if no data } - u.gsoFlushTimer.Reset(u.gsoFlushTimeout) + u.gsoFlushTimer.Reset(t) } func (u *StdConn) stopFlushTimerLocked() { @@ -621,7 +626,8 @@ func (u *StdConn) emitSegments(r EncReader, addr netip.AddrPort, payload []byte, } //segment := append([]byte(nil), payload[start:end]...) - r(addr, payload[start:end]) + //q := numSegments % 4 //TODO + r(addr, payload[start:end], 0) numSegments++ //segments = append(segments, segment) start = end @@ -714,12 +720,14 @@ func (u *StdConn) parseGROSegment(msg *rawMessage, control []byte) (int, int) { if len(c.Data) >= 4 { segCount = int(binary.NativeEndian.Uint16(c.Data[2:4])) } - u.l.WithFields(logrus.Fields{ - "tag": "gro-debug", - "stage": "parse", - "seg_size": segSize, - "seg_count": segCount, - }).Debug("gro-debug control parsed") + if u.l.Level >= logrus.DebugLevel { + u.l.WithFields(logrus.Fields{ + "tag": "gro-debug", + "stage": "parse", + "seg_size": segSize, + "seg_count": segCount, + }).Debug("gro-debug control parsed") + } return segSize, segCount } }