@@ -6,9 +6,11 @@ import (
6
6
"io"
7
7
"net"
8
8
"strings"
9
+ "sync/atomic"
9
10
"time"
10
11
11
12
"github.com/armon/go-metrics"
13
+ "github.com/hashicorp/consul/agent/cache"
12
14
"github.com/hashicorp/consul/agent/consul/state"
13
15
"github.com/hashicorp/consul/agent/metadata"
14
16
"github.com/hashicorp/consul/agent/pool"
@@ -443,6 +445,157 @@ RUN_QUERY:
443
445
return err
444
446
}
445
447
448
+ type sharedQueryFn func (memdb.WatchSet , * state.Store ) (uint64 , func (uint64 , interface {}) error , error )
449
+
450
+ func (s * Server ) sharedBlockingQuery (req cache.Request , res interface {}, queryOpts * structs.QueryOptions , queryMeta * structs.QueryMeta , fn sharedQueryFn ) error {
451
+ var timeout * time.Timer
452
+
453
+ if queryOpts .RequireConsistent {
454
+ if err := s .consistentRead (); err != nil {
455
+ return err
456
+ }
457
+ }
458
+
459
+ // Restrict the max query time, and ensure there is always one.
460
+ if queryOpts .MaxQueryTime > maxQueryTime {
461
+ queryOpts .MaxQueryTime = maxQueryTime
462
+ } else if queryOpts .MaxQueryTime <= 0 {
463
+ queryOpts .MaxQueryTime = defaultQueryTime
464
+ }
465
+
466
+ // Apply a small amount of jitter to the request.
467
+ queryOpts .MaxQueryTime += lib .RandomStagger (queryOpts .MaxQueryTime / jitterFraction )
468
+
469
+ // Setup a query timeout.
470
+ timeout = time .NewTimer (queryOpts .MaxQueryTime )
471
+ defer timeout .Stop ()
472
+
473
+ cacheInfo := req .CacheInfo ()
474
+
475
+ // Check if a blocking query is already runnning for this request
476
+ s .blockingQueriesLock .RLock ()
477
+ queryState , alreadyInserted := s .blockingQueries [cacheInfo .Key ]
478
+ s .blockingQueriesLock .RUnlock ()
479
+
480
+ // If not, run one
481
+ if ! alreadyInserted {
482
+ ws := memdb .NewWatchSet ()
483
+ // run the func a first time to get the index
484
+ firstRunIndex , apply , err := fn (ws , s .fsm .State ())
485
+
486
+ s .blockingQueriesLock .Lock ()
487
+ queryState , alreadyInserted = s .blockingQueries [cacheInfo .Key ]
488
+ if alreadyInserted {
489
+ // Another query raced with us and already ran a blocking query
490
+ s .blockingQueriesLock .Unlock ()
491
+ } else {
492
+ // Add query to map
493
+ queryState = newBlockingQueryState (firstRunIndex , apply , err )
494
+ s .blockingQueries [cacheInfo .Key ] = queryState
495
+ s .blockingQueriesLock .Unlock ()
496
+
497
+ // Run the shared blocking query
498
+ go s .runSharedBlockingQuery (firstRunIndex , cacheInfo .Key , ws , queryMeta , queryState , fn )
499
+ }
500
+ }
501
+
502
+ stateIndex := atomic .LoadUint64 (& queryState .Index )
503
+
504
+ if stateIndex <= queryOpts .MinQueryIndex {
505
+ // Increment the shared query watcher
506
+ atomic .AddInt32 (& queryState .Watchers , 1 )
507
+
508
+ // block on either timeout or shared query
509
+ select {
510
+ case <- timeout .C :
511
+ if n := atomic .AddInt32 (& queryState .Watchers , - 1 ); n == 0 {
512
+ s .logger .Println ("[TRACE] consul: cancelling shared blocking query because there is no more watchers" )
513
+
514
+ // we were the last request to wait on the shared blocking wuery and we reached MaxQueryTime, cancel the blocking query
515
+ close (queryState .Cancel )
516
+ }
517
+ case <- queryState .Done :
518
+ }
519
+ }
520
+
521
+ if err := queryState .Err .Load (); err != nil {
522
+ return err .(error )
523
+ }
524
+
525
+ return queryState .Apply .Load ().(func (uint64 , interface {}) error )(atomic .LoadUint64 (& queryState .Index ), res )
526
+ }
527
+
528
+ func (s * Server ) runSharedBlockingQuery (index uint64 , cacheKey string , ws memdb.WatchSet , queryMeta * structs.QueryMeta , queryState * blockingQueryState , fn sharedQueryFn ) {
529
+ s .logger .Println ("[TRACE] consul: running shared blocking query" )
530
+
531
+ // Wait initial query watchset
532
+ expired := ws .Watch (queryState .Cancel )
533
+
534
+ RUN_QUERY:
535
+
536
+ // If the read must be consistent we verify that we are still the leader.
537
+
538
+ // Run the query.
539
+ metrics .IncrCounter ([]string {"rpc" , "query" }, 1 )
540
+
541
+ // Operate on a consistent set of state. This makes sure that the
542
+ // abandon channel goes with the state that the caller is using to
543
+ // build watches.
544
+ state := s .fsm .State ()
545
+
546
+ // We can skip all watch tracking if this isn't a blocking query.
547
+ if index > 0 {
548
+ ws = memdb .NewWatchSet ()
549
+
550
+ // This channel will be closed if a snapshot is restored and the
551
+ // whole state store is abandoned.
552
+ ws .Add (state .AbandonCh ())
553
+ }
554
+
555
+ // Block up to the timeout if we didn't see anything fresh.
556
+ idx , apply , err := fn (ws , state )
557
+ // Note we check queryOpts.MinQueryIndex is greater than zero to determine if
558
+ // blocking was requested by client, NOT meta.Index since the state function
559
+ // might return zero if something is not initialised and care wasn't taken to
560
+ // handle that special case (in practice this happened a lot so fixing it
561
+ // systematically here beats trying to remember to add zero checks in every
562
+ // state method). We also need to ensure that unless there is an error, we
563
+ // return an index > 0 otherwise the client will never block and burn CPU and
564
+ // requests.
565
+ if err == nil && idx < 1 {
566
+ idx = 1
567
+ }
568
+ if ! expired && err == nil && index > 0 && idx <= index {
569
+ if expired := ws .Watch (queryState .Cancel ); ! expired {
570
+ // If a restore may have woken us up then bail out from
571
+ // the query immediately. This is slightly race-ey since
572
+ // this might have been interrupted for other reasons,
573
+ // but it's OK to kick it back to the caller in either
574
+ // case.
575
+ select {
576
+ case <- state .AbandonCh ():
577
+ default :
578
+ goto RUN_QUERY
579
+ }
580
+ }
581
+ }
582
+
583
+ // store results
584
+ s .blockingQueriesLock .Lock ()
585
+ if err != nil {
586
+ queryState .Err .Store (err )
587
+ }
588
+ if apply != nil {
589
+ queryState .Apply .Store (apply )
590
+ }
591
+ atomic .StoreUint64 (& queryState .Index , idx )
592
+ delete (s .blockingQueries , cacheKey )
593
+ s .blockingQueriesLock .Unlock ()
594
+
595
+ // notify changed
596
+ close (queryState .Done )
597
+ }
598
+
446
599
// setQueryMeta is used to populate the QueryMeta data for an RPC call
447
600
func (s * Server ) setQueryMeta (m * structs.QueryMeta ) {
448
601
if s .IsLeader () {
0 commit comments