Skip to content

Commit 1919e02

Browse files
Cheaper skip handling
1 parent c702eb9 commit 1919e02

File tree

4 files changed

+34
-26
lines changed

4 files changed

+34
-26
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.elasticsearch.search.SearchPhaseResult;
3434
import org.elasticsearch.search.SearchShardTarget;
3535
import org.elasticsearch.search.builder.PointInTimeBuilder;
36-
import org.elasticsearch.search.builder.SearchSourceBuilder;
3736
import org.elasticsearch.search.internal.AliasFilter;
3837
import org.elasticsearch.search.internal.SearchContext;
3938
import org.elasticsearch.search.internal.ShardSearchContextId;
@@ -88,18 +87,18 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
8887
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
8988
private final Object shardFailuresMutex = new Object();
9089
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
91-
private final AtomicInteger successfulOps = new AtomicInteger();
90+
private final AtomicInteger successfulOps;
9291
private final SearchTimeProvider timeProvider;
9392
private final SearchResponse.Clusters clusters;
9493

95-
protected final List<SearchShardIterator> toSkipShardsIts;
9694
protected final List<SearchShardIterator> shardsIts;
9795
private final SearchShardIterator[] shardIterators;
9896
private final AtomicInteger outstandingShards;
9997
private final int maxConcurrentRequestsPerNode;
10098
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
10199
private final boolean throttleConcurrentRequests;
102100
private final AtomicBoolean requestCancelled = new AtomicBoolean();
101+
private final int skippedCount;
103102

104103
// protected for tests
105104
protected final List<Releasable> releasables = new ArrayList<>();
@@ -125,18 +124,19 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
125124
) {
126125
super(name);
127126
this.namedWriteableRegistry = namedWriteableRegistry;
128-
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
129127
final List<SearchShardIterator> iterators = new ArrayList<>();
128+
int skipped = 0;
130129
for (final SearchShardIterator iterator : shardsIts) {
131130
if (iterator.skip()) {
132-
toSkipIterators.add(iterator);
131+
skipped++;
133132
} else {
134133
iterators.add(iterator);
135134
}
136135
}
137-
this.toSkipShardsIts = toSkipIterators;
136+
this.skippedCount = skipped;
138137
this.shardsIts = iterators;
139-
outstandingShards = new AtomicInteger(shardsIts.size());
138+
outstandingShards = new AtomicInteger(shardsIts.size() - skipped);
139+
successfulOps = new AtomicInteger(skipped);
140140
this.shardIterators = iterators.toArray(new SearchShardIterator[0]);
141141
// we later compute the shard index based on the natural order of the shards
142142
// that participate in the search request. This means that this number is
@@ -167,11 +167,19 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
167167
protected void notifyListShards(
168168
SearchProgressListener progressListener,
169169
SearchResponse.Clusters clusters,
170-
SearchSourceBuilder sourceBuilder
170+
SearchRequest searchRequest,
171+
List<SearchShardIterator> allIterators
171172
) {
173+
final List<SearchShardIterator> skipped = new ArrayList<>();
174+
for (SearchShardIterator iter : allIterators) {
175+
if (iter.skip()) {
176+
skipped.add(iter);
177+
}
178+
}
179+
var sourceBuilder = searchRequest.source();
172180
progressListener.notifyListShards(
173181
SearchProgressListener.buildSearchShardsFromIter(this.shardsIts),
174-
SearchProgressListener.buildSearchShardsFromIter(toSkipShardsIts),
182+
SearchProgressListener.buildSearchShardsFromIter(skipped),
175183
clusters,
176184
sourceBuilder == null || sourceBuilder.size() > 0,
177185
timeProvider
@@ -215,10 +223,6 @@ public final void start() {
215223

216224
@Override
217225
protected final void run() {
218-
for (final SearchShardIterator iterator : toSkipShardsIts) {
219-
assert iterator.skip();
220-
skipShard(iterator);
221-
}
222226
final Map<SearchShardIterator, Integer> shardIndexMap = Maps.newHashMapWithExpectedSize(shardIterators.length);
223227
for (int i = 0; i < shardIterators.length; i++) {
224228
shardIndexMap.put(shardIterators[i], i);
@@ -237,15 +241,11 @@ protected final void run() {
237241
performPhaseOnShard(shardIndex, shardRoutings, routing);
238242
}
239243
}
244+
} else {
245+
onPhaseDone();
240246
}
241247
}
242248

243-
void skipShard(SearchShardIterator iterator) {
244-
successfulOps.incrementAndGet();
245-
assert iterator.skip();
246-
successfulShardExecution();
247-
}
248-
249249
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
250250
if (throttleConcurrentRequests) {
251251
var pendingExecutions = pendingExecutionsPerNode.computeIfAbsent(
@@ -343,7 +343,7 @@ protected void executeNextPhase(String currentPhase, Supplier<SearchPhase> nextP
343343
"Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})",
344344
discrepancy,
345345
successfulOps.get(),
346-
toSkipShardsIts.size(),
346+
skippedCount,
347347
getNumShards(),
348348
currentPhase
349349
);
@@ -585,7 +585,7 @@ private SearchResponse buildSearchResponse(
585585
scrollId,
586586
getNumShards(),
587587
numSuccess,
588-
toSkipShardsIts.size(),
588+
skippedCount,
589589
buildTookInMillis(),
590590
failures,
591591
clusters,

server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
7373
this.progressListener = task.getProgressListener();
7474
// don't build the SearchShard list (can be expensive) if the SearchProgressListener won't use it
7575
if (progressListener != SearchProgressListener.NOOP) {
76-
notifyListShards(progressListener, clusters, request.source());
76+
notifyListShards(progressListener, clusters, request, shardsIts);
7777
}
7878
this.client = client;
7979
}

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
8585

8686
// don't build the SearchShard list (can be expensive) if the SearchProgressListener won't use it
8787
if (progressListener != SearchProgressListener.NOOP) {
88-
notifyListShards(progressListener, clusters, request.source());
88+
notifyListShards(progressListener, clusters, request, shardsIts);
8989
}
9090
}
9191

server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,17 @@ public void testShardNotAvailableWithDisallowPartialFailures() {
223223
ArraySearchPhaseResults<SearchPhaseResult> phaseResults = new ArraySearchPhaseResults<>(numShards);
224224
AbstractSearchAsyncAction<SearchPhaseResult> action = createAction(searchRequest, phaseResults, listener, false, new AtomicLong());
225225
// skip one to avoid the "all shards failed" failure.
226-
SearchShardIterator skipIterator = new SearchShardIterator(null, null, Collections.emptyList(), null);
227-
skipIterator.skip(true);
228-
action.skipShard(skipIterator);
226+
action.onShardResult(new SearchPhaseResult() {
227+
@Override
228+
public int getShardIndex() {
229+
return 0;
230+
}
231+
232+
@Override
233+
public SearchShardTarget getSearchShardTarget() {
234+
return new SearchShardTarget(null, null, null);
235+
}
236+
});
229237
assertThat(exception.get(), instanceOf(SearchPhaseExecutionException.class));
230238
SearchPhaseExecutionException searchPhaseExecutionException = (SearchPhaseExecutionException) exception.get();
231239
assertEquals("Partial shards failure (" + (numShards - 1) + " shards unavailable)", searchPhaseExecutionException.getMessage());

0 commit comments

Comments
 (0)