29
29
import org .elasticsearch .common .io .stream .StreamInput ;
30
30
import org .elasticsearch .common .io .stream .StreamOutput ;
31
31
import org .elasticsearch .common .io .stream .Writeable ;
32
+ import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
32
33
import org .elasticsearch .common .util .concurrent .CountDown ;
33
34
import org .elasticsearch .common .util .concurrent .EsExecutors ;
34
35
import org .elasticsearch .common .util .concurrent .ListenableFuture ;
82
83
import java .util .function .IntUnaryOperator ;
83
84
84
85
import static org .elasticsearch .action .search .SearchPhaseController .getTopDocsSize ;
86
+ import static org .elasticsearch .search .sort .FieldSortBuilder .NAME ;
85
87
import static org .elasticsearch .search .sort .FieldSortBuilder .hasPrimaryFieldSort ;
86
88
87
89
public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction <SearchPhaseResult > {
@@ -96,6 +98,7 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
96
98
private volatile BottomSortValuesCollector bottomSortCollector ;
97
99
private final Client client ;
98
100
private final boolean batchQueryPhase ;
101
+ private final SearchService searchService ;
99
102
100
103
SearchQueryThenFetchAsyncAction (
101
104
Logger logger ,
@@ -114,7 +117,7 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
114
117
SearchTask task ,
115
118
SearchResponse .Clusters clusters ,
116
119
Client client ,
117
- boolean batchQueryPhase
120
+ SearchService searchService
118
121
) {
119
122
super (
120
123
"query" ,
@@ -139,7 +142,8 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
139
142
this .trackTotalHitsUpTo = request .resolveTrackTotalHitsUpTo ();
140
143
this .progressListener = task .getProgressListener ();
141
144
this .client = client ;
142
- this .batchQueryPhase = batchQueryPhase ;
145
+ this .batchQueryPhase = searchService .batchQueryPhase ();
146
+ this .searchService = searchService ;
143
147
// don't build the SearchShard list (can be expensive) if the SearchProgressListener won't use it
144
148
if (progressListener != SearchProgressListener .NOOP ) {
145
149
notifyListShards (progressListener , clusters , request , shardsIts );
@@ -423,7 +427,8 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
423
427
}
424
428
AbstractSearchAsyncAction .doCheckNoMissingShards (getName (), request , shardsIts );
425
429
final Map <CanMatchPreFilterSearchPhase .SendingTarget , NodeQueryRequest > perNodeQueries = new HashMap <>();
426
- final String localNodeId = searchTransportService .transportService ().getLocalNode ().getId ();
430
+ final var transportService = searchTransportService .transportService ();
431
+ final String localNodeId = transportService .getLocalNode ().getId ();
427
432
final int numberOfShardsTotal = shardsIts .size ();
428
433
for (int i = 0 ; i < numberOfShardsTotal ; i ++) {
429
434
final SearchShardIterator shardRoutings = shardsIts .get (i );
@@ -436,30 +441,82 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
436
441
} else {
437
442
final String nodeId = routing .getNodeId ();
438
443
// local requests don't need batching as there's no network latency
439
- if (localNodeId .equals (nodeId )) {
440
- performPhaseOnShard (shardIndex , shardRoutings , routing );
441
- } else {
442
- var perNodeRequest = perNodeQueries .computeIfAbsent (
443
- new CanMatchPreFilterSearchPhase .SendingTarget (routing .getClusterAlias (), nodeId ),
444
- t -> new NodeQueryRequest (request , numberOfShardsTotal , timeProvider .absoluteStartMillis (), t .clusterAlias ())
445
- );
446
- final String indexUUID = routing .getShardId ().getIndex ().getUUID ();
447
- perNodeRequest .shards .add (
448
- new ShardToQuery (
449
- concreteIndexBoosts .getOrDefault (indexUUID , DEFAULT_INDEX_BOOST ),
450
- getOriginalIndices (shardIndex ).indices (),
451
- shardIndex ,
452
- routing .getShardId (),
453
- shardRoutings .getSearchContextId ()
454
- )
455
- );
456
- var filterForAlias = aliasFilter .getOrDefault (indexUUID , AliasFilter .EMPTY );
457
- if (filterForAlias != AliasFilter .EMPTY ) {
458
- perNodeRequest .aliasFilters .putIfAbsent (indexUUID , filterForAlias );
459
- }
444
+ var perNodeRequest = perNodeQueries .computeIfAbsent (
445
+ new CanMatchPreFilterSearchPhase .SendingTarget (routing .getClusterAlias (), nodeId ),
446
+ t -> new NodeQueryRequest (request , numberOfShardsTotal , timeProvider .absoluteStartMillis (), t .clusterAlias ())
447
+ );
448
+ final String indexUUID = routing .getShardId ().getIndex ().getUUID ();
449
+ perNodeRequest .shards .add (
450
+ new ShardToQuery (
451
+ concreteIndexBoosts .getOrDefault (indexUUID , DEFAULT_INDEX_BOOST ),
452
+ getOriginalIndices (shardIndex ).indices (),
453
+ shardIndex ,
454
+ routing .getShardId (),
455
+ shardRoutings .getSearchContextId ()
456
+ )
457
+ );
458
+ var filterForAlias = aliasFilter .getOrDefault (indexUUID , AliasFilter .EMPTY );
459
+ if (filterForAlias != AliasFilter .EMPTY ) {
460
+ perNodeRequest .aliasFilters .putIfAbsent (indexUUID , filterForAlias );
460
461
}
461
462
}
462
463
}
464
+ final var localTarget = new CanMatchPreFilterSearchPhase .SendingTarget (request .getLocalClusterAlias (), localNodeId );
465
+ var localNodeRequest = perNodeQueries .remove (localTarget );
466
+ if (localNodeRequest != null ) {
467
+ transportService .getThreadPool ().executor (ThreadPool .Names .SEARCH_COORDINATION ).execute (new AbstractRunnable () {
468
+ @ Override
469
+ protected void doRun () {
470
+ if (hasPrimaryFieldSort (request .source ())) {
471
+ var pitBuilder = request .pointInTimeBuilder ();
472
+ @ SuppressWarnings ("rawtypes" )
473
+ final MinAndMax [] minAndMax = new MinAndMax [localNodeRequest .shards .size ()];
474
+ for (int i = 0 ; i < minAndMax .length ; i ++) {
475
+ var shardToQuery = localNodeRequest .shards .get (i );
476
+ var shardId = shardToQuery .shardId ;
477
+ var r = buildShardSearchRequest (
478
+ shardId ,
479
+ localNodeRequest .localClusterAlias ,
480
+ shardToQuery .shardIndex ,
481
+ shardToQuery .contextId ,
482
+ new OriginalIndices (shardToQuery .originalIndices , request .indicesOptions ()),
483
+ localNodeRequest .aliasFilters .getOrDefault (shardId .getIndex ().getUUID (), AliasFilter .EMPTY ),
484
+ pitBuilder == null ? null : pitBuilder .getKeepAlive (),
485
+ shardToQuery .boost ,
486
+ request ,
487
+ localNodeRequest .totalShards ,
488
+ localNodeRequest .absoluteStartMillis ,
489
+ false
490
+ );
491
+ minAndMax [i ] = searchService .canMatch (r ).estimatedMinAndMax ();
492
+ }
493
+ try {
494
+ int [] indexes = CanMatchPreFilterSearchPhase .sortShards (
495
+ localNodeRequest .shards ,
496
+ minAndMax ,
497
+ FieldSortBuilder .getPrimaryFieldSortOrNull (request .source ()).order ()
498
+ );
499
+ for (int i = 0 ; i < indexes .length ; i ++) {
500
+ ShardToQuery shardToQuery = localNodeRequest .shards .get (i );
501
+ shardToQuery = localNodeRequest .shards .set (i , shardToQuery );
502
+ localNodeRequest .shards .set (i , shardToQuery );
503
+ }
504
+ } catch (Exception e ) {
505
+ // ignored, field type conflicts will be dealt with in upstream logic
506
+ // TODO: we should fail the query here, we're already seeing a field type conflict on the sort field,
507
+ // no need to actually execute the queries and go through a lot of work before we inevitably have to
508
+ // fail the search
509
+ }
510
+ }
511
+ executeWithoutBatching (localTarget , localNodeRequest );
512
+ }
513
+
514
+ @ Override
515
+ public void onFailure (Exception e ) {
516
+ SearchQueryThenFetchAsyncAction .this .onPhaseFailure (NAME , "" , e );
517
+ }
518
+ });
519
+ }
463
520
perNodeQueries .forEach ((routing , request ) -> {
464
521
if (request .shards .size () == 1 ) {
465
522
executeAsSingleRequest (routing , request .shards .getFirst ());
@@ -477,8 +534,12 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
477
534
executeWithoutBatching (routing , request );
478
535
return ;
479
536
}
480
- searchTransportService .transportService ()
481
- .sendChildRequest (connection , NODE_SEARCH_ACTION_NAME , request , task , new TransportResponseHandler <NodeQueryResponse >() {
537
+ transportService .sendChildRequest (
538
+ connection ,
539
+ NODE_SEARCH_ACTION_NAME ,
540
+ request ,
541
+ task ,
542
+ new TransportResponseHandler <NodeQueryResponse >() {
482
543
@ Override
483
544
public NodeQueryResponse read (StreamInput in ) throws IOException {
484
545
return new NodeQueryResponse (in );
@@ -531,7 +592,8 @@ public void handleException(TransportException e) {
531
592
onPhaseFailure (getName (), "" , cause );
532
593
}
533
594
}
534
- });
595
+ }
596
+ );
535
597
});
536
598
}
537
599
0 commit comments