33
33
import org .elasticsearch .search .SearchPhaseResult ;
34
34
import org .elasticsearch .search .SearchShardTarget ;
35
35
import org .elasticsearch .search .builder .PointInTimeBuilder ;
36
- import org .elasticsearch .search .builder .SearchSourceBuilder ;
37
36
import org .elasticsearch .search .internal .AliasFilter ;
38
37
import org .elasticsearch .search .internal .SearchContext ;
39
38
import org .elasticsearch .search .internal .ShardSearchContextId ;
@@ -87,18 +86,18 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
87
86
private final SetOnce <AtomicArray <ShardSearchFailure >> shardFailures = new SetOnce <>();
88
87
private final Object shardFailuresMutex = new Object ();
89
88
private final AtomicBoolean hasShardResponse = new AtomicBoolean (false );
90
- private final AtomicInteger successfulOps = new AtomicInteger () ;
89
+ private final AtomicInteger successfulOps ;
91
90
private final SearchTimeProvider timeProvider ;
92
91
private final SearchResponse .Clusters clusters ;
93
92
94
- protected final List <SearchShardIterator > toSkipShardsIts ;
95
93
protected final List <SearchShardIterator > shardsIts ;
96
94
private final SearchShardIterator [] shardIterators ;
97
95
private final AtomicInteger outstandingShards ;
98
96
private final int maxConcurrentRequestsPerNode ;
99
97
private final Map <String , PendingExecutions > pendingExecutionsPerNode = new ConcurrentHashMap <>();
100
98
private final boolean throttleConcurrentRequests ;
101
99
private final AtomicBoolean requestCancelled = new AtomicBoolean ();
100
+ private final int skippedCount ;
102
101
103
102
// protected for tests
104
103
protected final SubscribableListener <Void > doneFuture = new SubscribableListener <>();
@@ -124,18 +123,19 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
124
123
) {
125
124
super (name );
126
125
this .namedWriteableRegistry = namedWriteableRegistry ;
127
- final List <SearchShardIterator > toSkipIterators = new ArrayList <>();
128
126
final List <SearchShardIterator > iterators = new ArrayList <>();
127
+ int skipped = 0 ;
129
128
for (final SearchShardIterator iterator : shardsIts ) {
130
129
if (iterator .skip ()) {
131
- toSkipIterators . add ( iterator ) ;
130
+ skipped ++ ;
132
131
} else {
133
132
iterators .add (iterator );
134
133
}
135
134
}
136
- this .toSkipShardsIts = toSkipIterators ;
135
+ this .skippedCount = skipped ;
137
136
this .shardsIts = iterators ;
138
- outstandingShards = new AtomicInteger (shardsIts .size ());
137
+ outstandingShards = new AtomicInteger (iterators .size ());
138
+ successfulOps = new AtomicInteger (skipped );
139
139
this .shardIterators = iterators .toArray (new SearchShardIterator [0 ]);
140
140
// we later compute the shard index based on the natural order of the shards
141
141
// that participate in the search request. This means that this number is
@@ -166,11 +166,19 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
166
166
protected void notifyListShards (
167
167
SearchProgressListener progressListener ,
168
168
SearchResponse .Clusters clusters ,
169
- SearchSourceBuilder sourceBuilder
169
+ SearchRequest searchRequest ,
170
+ List <SearchShardIterator > allIterators
170
171
) {
172
+ final List <SearchShard > skipped = new ArrayList <>(allIterators .size () - shardsIts .size ());
173
+ for (SearchShardIterator iter : allIterators ) {
174
+ if (iter .skip ()) {
175
+ skipped .add (new SearchShard (iter .getClusterAlias (), iter .shardId ()));
176
+ }
177
+ }
178
+ var sourceBuilder = searchRequest .source ();
171
179
progressListener .notifyListShards (
172
180
SearchProgressListener .buildSearchShardsFromIter (this .shardsIts ),
173
- SearchProgressListener . buildSearchShardsFromIter ( toSkipShardsIts ) ,
181
+ skipped ,
174
182
clusters ,
175
183
sourceBuilder == null || sourceBuilder .size () > 0 ,
176
184
timeProvider
@@ -219,44 +227,37 @@ public final void start() {
219
227
220
228
@ Override
221
229
protected final void run () {
222
- for ( final SearchShardIterator iterator : toSkipShardsIts ) {
223
- assert iterator . skip ();
224
- skipShard ( iterator ) ;
230
+ if ( outstandingShards . get () == 0 ) {
231
+ onPhaseDone ();
232
+ return ;
225
233
}
226
234
final Map <SearchShardIterator , Integer > shardIndexMap = Maps .newHashMapWithExpectedSize (shardIterators .length );
227
235
for (int i = 0 ; i < shardIterators .length ; i ++) {
228
236
shardIndexMap .put (shardIterators [i ], i );
229
237
}
230
- if (shardsIts .size () > 0 ) {
231
- doCheckNoMissingShards (getName (), request , shardsIts );
232
- Version version = request .minCompatibleShardNode ();
233
- if (version != null && Version .CURRENT .minimumCompatibilityVersion ().equals (version ) == false ) {
234
- if (checkMinimumVersion (shardsIts ) == false ) {
235
- throw new VersionMismatchException (
236
- "One of the shards is incompatible with the required minimum version [{}]" ,
237
- request .minCompatibleShardNode ()
238
- );
239
- }
240
- }
241
- for (int i = 0 ; i < shardsIts .size (); i ++) {
242
- final SearchShardIterator shardRoutings = shardsIts .get (i );
243
- assert shardRoutings .skip () == false ;
244
- assert shardIndexMap .containsKey (shardRoutings );
245
- int shardIndex = shardIndexMap .get (shardRoutings );
246
- final SearchShardTarget routing = shardRoutings .nextOrNull ();
247
- if (routing == null ) {
248
- failOnUnavailable (shardIndex , shardRoutings );
249
- } else {
250
- performPhaseOnShard (shardIndex , shardRoutings , routing );
251
- }
238
+ doCheckNoMissingShards (getName (), request , shardsIts );
239
+ Version version = request .minCompatibleShardNode ();
240
+ if (version != null && Version .CURRENT .minimumCompatibilityVersion ().equals (version ) == false ) {
241
+ if (checkMinimumVersion (shardsIts ) == false ) {
242
+ throw new VersionMismatchException (
243
+ "One of the shards is incompatible with the required minimum version [{}]" ,
244
+ request .minCompatibleShardNode ()
245
+ );
252
246
}
253
247
}
254
- }
248
+ for (int i = 0 ; i < shardsIts .size (); i ++) {
249
+ final SearchShardIterator shardRoutings = shardsIts .get (i );
250
+ assert shardRoutings .skip () == false ;
251
+ assert shardIndexMap .containsKey (shardRoutings );
252
+ int shardIndex = shardIndexMap .get (shardRoutings );
253
+ final SearchShardTarget routing = shardRoutings .nextOrNull ();
254
+ if (routing == null ) {
255
+ failOnUnavailable (shardIndex , shardRoutings );
256
+ } else {
257
+ performPhaseOnShard (shardIndex , shardRoutings , routing );
258
+ }
255
259
256
- void skipShard (SearchShardIterator iterator ) {
257
- successfulOps .incrementAndGet ();
258
- assert iterator .skip ();
259
- successfulShardExecution ();
260
+ }
260
261
}
261
262
262
263
private boolean checkMinimumVersion (List <SearchShardIterator > shardsIts ) {
@@ -274,32 +275,6 @@ private boolean checkMinimumVersion(List<SearchShardIterator> shardsIts) {
274
275
return true ;
275
276
}
276
277
277
- private static boolean assertExecuteOnStartThread () {
278
- // Ensure that the current code has the following stacktrace:
279
- // AbstractSearchAsyncAction#start -> AbstractSearchAsyncAction#executePhase -> AbstractSearchAsyncAction#performPhaseOnShard
280
- final StackTraceElement [] stackTraceElements = Thread .currentThread ().getStackTrace ();
281
- assert stackTraceElements .length >= 6 : stackTraceElements ;
282
- int index = 0 ;
283
- assert stackTraceElements [index ++].getMethodName ().equals ("getStackTrace" );
284
- assert stackTraceElements [index ++].getMethodName ().equals ("assertExecuteOnStartThread" );
285
- assert stackTraceElements [index ++].getMethodName ().equals ("failOnUnavailable" );
286
- if (stackTraceElements [index ].getMethodName ().equals ("performPhaseOnShard" )) {
287
- assert stackTraceElements [index ].getClassName ().endsWith ("CanMatchPreFilterSearchPhase" );
288
- index ++;
289
- }
290
- assert stackTraceElements [index ].getClassName ().endsWith ("AbstractSearchAsyncAction" );
291
- assert stackTraceElements [index ++].getMethodName ().equals ("run" );
292
-
293
- assert stackTraceElements [index ].getClassName ().endsWith ("AbstractSearchAsyncAction" );
294
- assert stackTraceElements [index ++].getMethodName ().equals ("executePhase" );
295
-
296
- assert stackTraceElements [index ].getClassName ().endsWith ("AbstractSearchAsyncAction" );
297
- assert stackTraceElements [index ++].getMethodName ().equals ("start" );
298
-
299
- assert stackTraceElements [index ].getClassName ().endsWith ("AbstractSearchAsyncAction" ) == false ;
300
- return true ;
301
- }
302
-
303
278
private void performPhaseOnShard (final int shardIndex , final SearchShardIterator shardIt , final SearchShardTarget shard ) {
304
279
if (throttleConcurrentRequests ) {
305
280
var pendingExecutions = pendingExecutionsPerNode .computeIfAbsent (
@@ -318,7 +293,7 @@ private void doPerformPhaseOnShard(int shardIndex, SearchShardIterator shardIt,
318
293
public void innerOnResponse (Result result ) {
319
294
try {
320
295
releasable .close ();
321
- onShardResult (result , shardIt );
296
+ onShardResult (result );
322
297
} catch (Exception exc ) {
323
298
onShardFailure (shardIndex , shard , shardIt , exc );
324
299
}
@@ -341,7 +316,6 @@ public void onFailure(Exception e) {
341
316
}
342
317
343
318
private void failOnUnavailable (int shardIndex , SearchShardIterator shardIt ) {
344
- assert assertExecuteOnStartThread ();
345
319
SearchShardTarget unassignedShard = new SearchShardTarget (null , shardIt .shardId (), shardIt .getClusterAlias ());
346
320
onShardFailure (shardIndex , unassignedShard , shardIt , new NoShardAvailableActionException (shardIt .shardId ()));
347
321
}
@@ -398,7 +372,7 @@ protected void executeNextPhase(String currentPhase, Supplier<SearchPhase> nextP
398
372
"Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})" ,
399
373
discrepancy ,
400
374
successfulOps .get (),
401
- toSkipShardsIts . size () ,
375
+ skippedCount ,
402
376
getNumShards (),
403
377
currentPhase
404
378
);
@@ -537,9 +511,8 @@ void onShardFailure(final int shardIndex, SearchShardTarget shardTarget, Excepti
537
511
/**
538
512
* Executed once for every successful shard level request.
539
513
* @param result the result returned form the shard
540
- * @param shardIt the shard iterator
541
514
*/
542
- protected void onShardResult (Result result , SearchShardIterator shardIt ) {
515
+ protected void onShardResult (Result result ) {
543
516
assert result .getShardIndex () != -1 : "shard index is not set" ;
544
517
assert result .getSearchShardTarget () != null : "search shard target must not be null" ;
545
518
hasShardResponse .set (true );
@@ -637,7 +610,7 @@ private SearchResponse buildSearchResponse(
637
610
scrollId ,
638
611
getNumShards (),
639
612
numSuccess ,
640
- toSkipShardsIts . size () ,
613
+ skippedCount ,
641
614
buildTookInMillis (),
642
615
failures ,
643
616
clusters ,
@@ -729,7 +702,7 @@ void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connecti
729
702
/**
730
703
* Executed once all shard results have been received and processed
731
704
* @see #onShardFailure(int, SearchShardTarget, Exception)
732
- * @see #onShardResult(SearchPhaseResult, SearchShardIterator )
705
+ * @see #onShardResult(SearchPhaseResult)
733
706
*/
734
707
private void onPhaseDone () { // as a tribute to @kimchy aka. finishHim()
735
708
executeNextPhase (getName (), this ::getNextPhase );
0 commit comments