@@ -281,35 +281,66 @@ func (p *pipe) _background() {
281281 atomic .StoreInt32 (& p .state , 4 )
282282}
283283
284+ type flush struct {
285+ works time.Duration
286+ waits time.Duration
287+ bytes int
288+ }
289+
290+ const flushesKeep = 16
291+ const flushesMask = flushesKeep - 1
292+
284293func (p * pipe ) _backgroundWrite () (err error ) {
285294 var (
286295 ones = make ([]cmds.Completed , 1 )
287296 multi []cmds.Completed
288297 ch chan RedisResult
289298
290- flushDelay = p .maxFlushDelay
291- flushStart = time.Time {}
299+ flushes [flushesKeep ]flush
300+ flushId = 0
301+ bytes = 1 // avoid zero
302+ works = time .Duration (1 )
303+ waits = time .Duration (0 )
304+
305+ userDelay = p .maxFlushDelay
292306 )
293307
308+ var ts1 = time .Now ()
294309 for atomic .LoadInt32 (& p .state ) < 3 {
295310 if ones [0 ], multi , ch = p .queue .NextWriteCmd (); ch == nil {
296- if flushDelay != 0 {
297- flushStart = time .Now ()
298- }
299- if p .w .Buffered () == 0 {
311+ buf := p .w .Buffered ()
312+ if buf == 0 {
300313 err = p .Error ()
314+ } else if userDelay == 0 {
315+ err = p .w .Flush ()
301316 } else {
317+ ts2 := time .Now ()
302318 err = p .w .Flush ()
319+ dur , gap := time .Since (ts2 ), ts2 .Sub (ts1 )
320+ bytes = bytes + buf - flushes [flushId ].bytes
321+ works = works + dur - flushes [flushId ].works
322+ waits = waits + gap - flushes [flushId ].waits
323+ flushes [flushId ] = flush {bytes : buf , works : dur , waits : gap }
324+ flushId = (flushId + 1 ) & flushesMask
325+ ts1 = ts2
303326 }
327+
304328 if err == nil {
305329 if atomic .LoadInt32 (& p .state ) == 1 {
306330 ones [0 ], multi , ch = p .queue .WaitForWrite ()
307331 } else {
308332 runtime .Gosched ()
309333 continue
310334 }
311- if flushDelay != 0 && atomic .LoadInt32 (& p .waits ) > 1 { // do not delay for sequential usage
312- time .Sleep (flushDelay - time .Since (flushStart )) // ref: https://github.com/rueian/rueidis/issues/156
335+ if userDelay != 0 && atomic .LoadInt32 (& p .waits ) > 1 { // do not delay for sequential usage
336+ avgWorks := works / flushesKeep
337+ byteWork := works / time .Duration (bytes )
338+ byteWait := waits / time .Duration (bytes )
339+ if autoDelay := byteWait * avgWorks / (byteWait + byteWork ); autoDelay <= userDelay {
340+ time .Sleep (autoDelay )
341+ } else {
342+ time .Sleep (userDelay ) // ref: https://github.com/rueian/rueidis/issues/156
343+ }
313344 }
314345 }
315346 }
0 commit comments