@@ -80,8 +80,11 @@ type nsqConn struct {
80
80
readTimeout time.Duration
81
81
writeTimeout time.Duration
82
82
stopper sync.Once
83
- dying chan struct {}
84
- drainReady chan struct {}
83
+ dying chan int
84
+ drainReady chan int
85
+ readyChan chan int
86
+ exitChan chan int
87
+ backoffCounter int32
85
88
}
86
89
87
90
func newNSQConn (addr string , readTimeout time.Duration , writeTimeout time.Duration ) (* nsqConn , error ) {
@@ -97,8 +100,10 @@ func newNSQConn(addr string, readTimeout time.Duration, writeTimeout time.Durati
97
100
finishedMessages : make (chan * FinishedMessage ),
98
101
readTimeout : readTimeout ,
99
102
writeTimeout : writeTimeout ,
100
- dying : make (chan struct {}, 1 ),
101
- drainReady : make (chan struct {}),
103
+ dying : make (chan int , 1 ),
104
+ drainReady : make (chan int ),
105
+ readyChan : make (chan int , 1 ),
106
+ exitChan : make (chan int ),
102
107
}
103
108
104
109
nc .SetWriteDeadline (time .Now ().Add (nc .writeTimeout ))
@@ -162,6 +167,8 @@ type Reader struct {
162
167
ExitChan chan int // read from this channel to block your main loop
163
168
164
169
// internal variables
170
+ maxBackoffDuration time.Duration
171
+ maxBackoffCount int32
165
172
maxInFlight int
166
173
incomingMessages chan * incomingMessage
167
174
nsqConnections map [string ]* nsqConn
@@ -209,6 +216,7 @@ func NewReader(topic string, channel string) (*Reader, error) {
209
216
WriteTimeout : time .Second ,
210
217
maxInFlight : 1 ,
211
218
}
219
+ q .SetMaxBackoffDuration (120 * time .Second )
212
220
return q , nil
213
221
}
214
222
@@ -255,10 +263,19 @@ func (q *Reader) SetMaxInFlight(maxInFlight int) {
255
263
q .maxInFlight = maxInFlight
256
264
257
265
for _ , c := range q .nsqConnections {
258
- q .updateReady (c )
266
+ select {
267
+ case c .readyChan <- 1 :
268
+ default :
269
+ }
259
270
}
260
271
}
261
272
273
+ // SetMaxBackoffDuration sets the maximum duration a connection will backoff from message processing
274
+ func (q * Reader ) SetMaxBackoffDuration (duration time.Duration ) {
275
+ q .maxBackoffDuration = duration
276
+ q .maxBackoffCount = int32 (math .Max (1 , math .Ceil (math .Log2 (duration .Seconds ()))))
277
+ }
278
+
262
279
// MaxInFlight returns the configured maximum number of messages to allow in-flight.
263
280
func (q * Reader ) MaxInFlight () int {
264
281
return q .maxInFlight
@@ -381,6 +398,7 @@ func (q *Reader) ConnectToNSQ(addr string) error {
381
398
382
399
go q .readLoop (connection )
383
400
go q .finishLoop (connection )
401
+ go q .rdyLoop (connection )
384
402
385
403
return nil
386
404
}
@@ -396,7 +414,7 @@ func handleError(q *Reader, c *nsqConn, errMsg string) {
396
414
397
415
func (q * Reader ) readLoop (c * nsqConn ) {
398
416
// prime our ready state
399
- err := q .updateReady (c )
417
+ err := q .updateRDY (c )
400
418
if err != nil {
401
419
handleError (q , c , fmt .Sprintf ("[%s] failed to send initial ready - %s" , c , err .Error ()))
402
420
q .stopFinishLoop (c )
@@ -412,7 +430,7 @@ func (q *Reader) readLoop(c *nsqConn) {
412
430
log .Printf ("[%s] delaying close of FinishedMesages channel; %d outstanding messages" , c , c .messagesInFlight )
413
431
}
414
432
log .Printf ("[%s] stopped read loop " , c )
415
- break
433
+ goto exit
416
434
}
417
435
418
436
resp , err := ReadResponse (c )
@@ -460,7 +478,7 @@ func (q *Reader) readLoop(c *nsqConn) {
460
478
err := c .sendCommand (& buf , Nop ())
461
479
if err != nil {
462
480
handleError (q , c , fmt .Sprintf ("[%s] error sending NOP - %s" , c , err .Error ()))
463
- return
481
+ goto exit
464
482
}
465
483
}
466
484
case FrameTypeError :
@@ -469,49 +487,82 @@ func (q *Reader) readLoop(c *nsqConn) {
469
487
log .Printf ("[%s] unknown message type %d" , c , frameType )
470
488
}
471
489
472
- q .updateReady (c )
490
+ select {
491
+ case c .readyChan <- 1 :
492
+ default :
493
+ }
473
494
}
495
+
496
+ exit:
497
+ log .Printf ("[%s] readLoop exiting" , c )
474
498
}
475
499
476
500
func (q * Reader ) finishLoop (c * nsqConn ) {
477
501
var buf bytes.Buffer
502
+ var backoffCounter int32
503
+ var backoffUpdated bool
504
+ var backoffDeadline time.Time
478
505
479
506
for {
480
507
select {
481
508
case <- c .dying :
482
509
log .Printf ("[%s] breaking out of finish loop " , c )
483
510
// Indicate drainReady because we will not pull any more off finishedMessages
484
- c .drainReady <- struct {}{}
485
- return
511
+ c .drainReady <- 1
512
+ goto exit
486
513
case msg := <- c .finishedMessages :
487
514
// Decrement this here so it is correct even if we can't respond to nsqd
488
515
atomic .AddInt64 (& q .messagesInFlight , - 1 )
489
516
atomic .AddInt64 (& c .messagesInFlight , - 1 )
517
+ now := time .Now ()
490
518
491
519
if msg .Success {
492
520
if q .VerboseLogging {
493
521
log .Printf ("[%s] finishing %s" , c , msg .Id )
494
522
}
523
+
495
524
err := c .sendCommand (& buf , Finish (msg .Id ))
496
525
if err != nil {
497
526
log .Printf ("[%s] error finishing %s - %s" , c , msg .Id , err .Error ())
498
527
q .stopFinishLoop (c )
499
528
continue
500
529
}
530
+
501
531
atomic .AddUint64 (& c .messagesFinished , 1 )
502
532
atomic .AddUint64 (& q .MessagesFinished , 1 )
533
+
534
+ if backoffCounter > 0 && now .After (backoffDeadline ) {
535
+ backoffCounter --
536
+ backoffUpdated = true
537
+ }
503
538
} else {
504
539
if q .VerboseLogging {
505
540
log .Printf ("[%s] requeuing %s" , c , msg .Id )
506
541
}
542
+
507
543
err := c .sendCommand (& buf , Requeue (msg .Id , msg .RequeueDelayMs ))
508
544
if err != nil {
509
545
log .Printf ("[%s] error requeueing %s - %s" , c , msg .Id , err .Error ())
510
546
q .stopFinishLoop (c )
511
547
continue
512
548
}
549
+
513
550
atomic .AddUint64 (& c .messagesRequeued , 1 )
514
551
atomic .AddUint64 (& q .MessagesRequeued , 1 )
552
+
553
+ if backoffCounter < q .maxBackoffCount && now .After (backoffDeadline ) {
554
+ backoffCounter ++
555
+ backoffUpdated = true
556
+ }
557
+ }
558
+
559
+ atomic .StoreInt32 (& c .backoffCounter , backoffCounter )
560
+ // prevent many async failures/successes from immediately resulting in
561
+ // max backoff/normal rate (by ensuring that we dont continually incr/decr
562
+ // the counter during a backoff period)
563
+ if backoffCounter > 0 && backoffUpdated {
564
+ backoffDuration := q .backoffDuration (backoffCounter )
565
+ backoffDeadline = now .Add (backoffDuration )
515
566
}
516
567
517
568
if atomic .LoadInt64 (& c .messagesInFlight ) == 0 &&
@@ -521,13 +572,16 @@ func (q *Reader) finishLoop(c *nsqConn) {
521
572
}
522
573
}
523
574
}
575
+
576
+ exit:
577
+ log .Printf ("[%s] finishLoop exiting" , c )
524
578
}
525
579
526
580
func (q * Reader ) stopFinishLoop (c * nsqConn ) {
527
581
c .stopper .Do (func () {
528
582
log .Printf ("[%s] beginning stopFinishLoop logic" , c )
529
583
// This doesn't block because dying has buffer of 1
530
- c .dying <- struct {}{}
584
+ c .dying <- 1
531
585
532
586
// Drain the finishedMessages channel
533
587
go func () {
@@ -536,6 +590,7 @@ func (q *Reader) stopFinishLoop(c *nsqConn) {
536
590
<- c .finishedMessages
537
591
}
538
592
}()
593
+ close (c .exitChan )
539
594
c .Close ()
540
595
delete (q .nsqConnections , c .String ())
541
596
@@ -564,9 +619,54 @@ func (q *Reader) stopFinishLoop(c *nsqConn) {
564
619
})
565
620
}
566
621
567
- func (q * Reader ) updateReady (c * nsqConn ) error {
568
- var buf bytes.Buffer
622
+ func (q * Reader ) backoffDuration (count int32 ) time.Duration {
623
+ backoffDuration := time .Second * time .Duration (math .Pow (2 , float64 (count )))
624
+ if backoffDuration > q .maxBackoffDuration {
625
+ backoffDuration = q .maxBackoffDuration
626
+ }
627
+ return backoffDuration
628
+ }
629
+
630
+ func (q * Reader ) rdyLoop (c * nsqConn ) {
631
+ readyChan := c .readyChan
632
+ var backoffTimer * time.Timer
633
+ var backoffTimerChan <- chan time.Time
634
+
635
+ for {
636
+ select {
637
+ case <- backoffTimerChan :
638
+ log .Printf ("[%s] backoff time expired, continuing with RDY 1..." , c )
639
+ // while in backoff only ever let 1 message at a time through
640
+ q .sendRDY (c , 1 )
641
+ readyChan = c .readyChan
642
+ case <- readyChan :
643
+ backoffCounter := atomic .LoadInt32 (& c .backoffCounter )
644
+
645
+ // send ready immediately
646
+ if backoffCounter == 0 {
647
+ q .updateRDY (c )
648
+ continue
649
+ }
650
+
651
+ backoffDuration := q .backoffDuration (backoffCounter )
652
+ backoffTimer = time .NewTimer (backoffDuration )
653
+ backoffTimerChan = backoffTimer .C
654
+ readyChan = nil
655
+
656
+ log .Printf ("[%s] backing off for %.02f seconds" , c , backoffDuration .Seconds ())
657
+ case <- c .exitChan :
658
+ if backoffTimer != nil {
659
+ backoffTimer .Stop ()
660
+ }
661
+ goto exit
662
+ }
663
+ }
569
664
665
+ exit:
666
+ log .Printf ("[%s] rdyLoop exiting" , c )
667
+ }
668
+
669
+ func (q * Reader ) updateRDY (c * nsqConn ) error {
570
670
if atomic .LoadInt32 (& c .stopFlag ) != 0 {
571
671
return nil
572
672
}
@@ -578,12 +678,7 @@ func (q *Reader) updateReady(c *nsqConn) error {
578
678
if q .VerboseLogging {
579
679
log .Printf ("[%s] sending RDY %d (%d remain)" , c , mif , remain )
580
680
}
581
- atomic .StoreInt64 (& c .rdyCount , int64 (mif ))
582
- err := c .sendCommand (& buf , Ready (mif ))
583
- if err != nil {
584
- handleError (q , c , fmt .Sprintf ("[%s] error sending RDY %d - %s" , c , mif , err .Error ()))
585
- return err
586
- }
681
+ q .sendRDY (c , mif )
587
682
} else {
588
683
if q .VerboseLogging {
589
684
log .Printf ("[%s] skip sending RDY (%d remain out of %d)" , c , remain , mif )
@@ -593,6 +688,17 @@ func (q *Reader) updateReady(c *nsqConn) error {
593
688
return nil
594
689
}
595
690
691
+ func (q * Reader ) sendRDY (c * nsqConn , count int ) error {
692
+ var buf bytes.Buffer
693
+ atomic .StoreInt64 (& c .rdyCount , int64 (count ))
694
+ err := c .sendCommand (& buf , Ready (count ))
695
+ if err != nil {
696
+ handleError (q , c , fmt .Sprintf ("[%s] error sending RDY %d - %s" , c , count , err .Error ()))
697
+ return err
698
+ }
699
+ return nil
700
+ }
701
+
596
702
// Stop will gracefully stop the Reader
597
703
func (q * Reader ) Stop () {
598
704
var buf bytes.Buffer
0 commit comments