Skip to content

Commit 541979e

Browse files
committed
Filter shards for sliced search at coordinator
Prior to this commit, a sliced search would fan out to every shard, then apply a MatchNoDocsQuery filter on shards that don't correspond to the current slice. This still creates a (useless) search context on each shard for every slice, though. For a long-running sliced scroll, this can quickly exhaust the number of available scroll contexts. This change avoids fanning out to all the shards by checking at the coordinator if a shard is matched by the current slice. This should reduce the number of open scroll contexts to max(numShards, numSlices) instead of numShards * numSlices. Signed-off-by: Michael Froh <[email protected]>
1 parent 5ba909a commit 541979e

File tree

8 files changed

+87
-42
lines changed

8 files changed

+87
-42
lines changed

server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.opensearch.core.common.Strings;
4242
import org.opensearch.core.common.io.stream.StreamInput;
4343
import org.opensearch.core.common.io.stream.StreamOutput;
44+
import org.opensearch.search.slice.SliceBuilder;
4445

4546
import java.io.IOException;
4647
import java.util.Objects;
@@ -61,6 +62,8 @@ public class ClusterSearchShardsRequest extends ClusterManagerNodeReadRequest<Cl
6162
@Nullable
6263
private String preference;
6364
private IndicesOptions indicesOptions = IndicesOptions.lenientExpandOpen();
65+
@Nullable
66+
private SliceBuilder sliceBuilder;
6467

6568
public ClusterSearchShardsRequest() {}
6669

@@ -166,4 +169,13 @@ public ClusterSearchShardsRequest preference(String preference) {
166169
public String preference() {
167170
return this.preference;
168171
}
172+
173+
public ClusterSearchShardsRequest slice(SliceBuilder sliceBuilder) {
174+
this.sliceBuilder = sliceBuilder;
175+
return this;
176+
}
177+
178+
public SliceBuilder slice() {
179+
return this.sliceBuilder;
180+
}
169181
}

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ protected void clusterManagerOperation(
133133

134134
Set<String> nodeIds = new HashSet<>();
135135
GroupShardsIterator<ShardIterator> groupShardsIterator = clusterService.operationRouting()
136-
.searchShards(clusterState, concreteIndices, routingMap, request.preference());
136+
.searchShards(clusterState, concreteIndices, routingMap, request.preference(), null, null, request.slice());
137137
ShardRouting shard;
138138
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];
139139
int currentGroup = 0;

server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,7 @@ private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener<
247247
throw blockException;
248248
}
249249

250-
shardsIt = clusterService.operationRouting()
251-
.searchShards(clusterService.state(), new String[] { request.index() }, null, null, null, null);
250+
shardsIt = clusterService.operationRouting().searchShards(clusterService.state(), new String[] { request.index() }, null, null);
252251
}
253252

254253
public void start() {

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import org.opensearch.search.pipeline.SearchPipelineService;
8686
import org.opensearch.search.profile.ProfileShardResult;
8787
import org.opensearch.search.profile.SearchProfileShardResults;
88+
import org.opensearch.search.slice.SliceBuilder;
8889
import org.opensearch.tasks.CancellableTask;
8990
import org.opensearch.tasks.Task;
9091
import org.opensearch.tasks.TaskResourceTrackingService;
@@ -551,6 +552,7 @@ private ActionListener<SearchSourceBuilder> buildRewriteListener(
551552
);
552553
} else {
553554
AtomicInteger skippedClusters = new AtomicInteger(0);
555+
SliceBuilder slice = searchRequest.source() == null ? null : searchRequest.source().slice();
554556
collectSearchShards(
555557
searchRequest.indicesOptions(),
556558
searchRequest.preference(),
@@ -559,6 +561,7 @@ private ActionListener<SearchSourceBuilder> buildRewriteListener(
559561
remoteClusterIndices,
560562
remoteClusterService,
561563
threadPool,
564+
slice,
562565
ActionListener.wrap(searchShardsResponses -> {
563566
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup(
564567
searchShardsResponses
@@ -787,6 +790,7 @@ static void collectSearchShards(
787790
Map<String, OriginalIndices> remoteIndicesByCluster,
788791
RemoteClusterService remoteClusterService,
789792
ThreadPool threadPool,
793+
SliceBuilder slice,
790794
ActionListener<Map<String, ClusterSearchShardsResponse>> listener
791795
) {
792796
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
@@ -800,7 +804,8 @@ static void collectSearchShards(
800804
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices).indicesOptions(indicesOptions)
801805
.local(true)
802806
.preference(preference)
803-
.routing(routing);
807+
.routing(routing)
808+
.slice(slice);
804809
clusterClient.admin()
805810
.cluster()
806811
.searchShards(
@@ -1042,14 +1047,16 @@ private void executeSearch(
10421047
concreteLocalIndices[i] = indices[i].getName();
10431048
}
10441049
Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
1050+
SliceBuilder slice = searchRequest.source() == null ? null : searchRequest.source().slice();
10451051
GroupShardsIterator<ShardIterator> localShardRoutings = clusterService.operationRouting()
10461052
.searchShards(
10471053
clusterState,
10481054
concreteLocalIndices,
10491055
routingMap,
10501056
searchRequest.preference(),
10511057
searchService.getResponseCollectorService(),
1052-
nodeSearchCounts
1058+
nodeSearchCounts,
1059+
slice
10531060
);
10541061
localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)
10551062
.map(it -> new SearchShardIterator(searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))

server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.cluster.routing;
3434

35+
import org.apache.lucene.util.CollectionUtil;
3536
import org.opensearch.cluster.ClusterState;
3637
import org.opensearch.cluster.metadata.IndexMetadata;
3738
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
@@ -48,10 +49,12 @@
4849
import org.opensearch.index.IndexModule;
4950
import org.opensearch.index.IndexNotFoundException;
5051
import org.opensearch.node.ResponseCollectorService;
52+
import org.opensearch.search.slice.SliceBuilder;
5153

5254
import java.util.ArrayList;
5355
import java.util.Arrays;
5456
import java.util.Collections;
57+
import java.util.Comparator;
5558
import java.util.HashSet;
5659
import java.util.List;
5760
import java.util.Map;
@@ -230,7 +233,7 @@ public GroupShardsIterator<ShardIterator> searchShards(
230233
@Nullable Map<String, Set<String>> routing,
231234
@Nullable String preference
232235
) {
233-
return searchShards(clusterState, concreteIndices, routing, preference, null, null);
236+
return searchShards(clusterState, concreteIndices, routing, preference, null, null, null);
234237
}
235238

236239
public GroupShardsIterator<ShardIterator> searchShards(
@@ -239,9 +242,10 @@ public GroupShardsIterator<ShardIterator> searchShards(
239242
@Nullable Map<String, Set<String>> routing,
240243
@Nullable String preference,
241244
@Nullable ResponseCollectorService collectorService,
242-
@Nullable Map<String, Long> nodeCounts
245+
@Nullable Map<String, Long> nodeCounts,
246+
@Nullable SliceBuilder slice
243247
) {
244-
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
248+
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing, slice);
245249
final Set<ShardIterator> set = new HashSet<>(shards.size());
246250
for (IndexShardRoutingTable shard : shards) {
247251
IndexMetadata indexMetadataForShard = indexMetadata(clusterState, shard.shardId.getIndex().getName());
@@ -290,25 +294,36 @@ public static ShardIterator getShards(ClusterState clusterState, ShardId shardId
290294
private Set<IndexShardRoutingTable> computeTargetedShards(
291295
ClusterState clusterState,
292296
String[] concreteIndices,
293-
@Nullable Map<String, Set<String>> routing
297+
@Nullable Map<String, Set<String>> routing,
298+
@Nullable SliceBuilder slice
294299
) {
295300
routing = routing == null ? EMPTY_ROUTING : routing; // just use an empty map
296301
final Set<IndexShardRoutingTable> set = new HashSet<>();
297302
// we use set here and not list since we might get duplicates
298303
for (String index : concreteIndices) {
304+
Set<IndexShardRoutingTable> indexSet = new HashSet<>();
299305
final IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
300306
final IndexMetadata indexMetadata = indexMetadata(clusterState, index);
301307
final Set<String> effectiveRouting = routing.get(index);
302308
if (effectiveRouting != null) {
303309
for (String r : effectiveRouting) {
304310
final int routingPartitionSize = indexMetadata.getRoutingPartitionSize();
305311
for (int partitionOffset = 0; partitionOffset < routingPartitionSize; partitionOffset++) {
306-
set.add(RoutingTable.shardRoutingTable(indexRouting, calculateScaledShardId(indexMetadata, r, partitionOffset)));
312+
indexSet.add(
313+
RoutingTable.shardRoutingTable(indexRouting, calculateScaledShardId(indexMetadata, r, partitionOffset))
314+
);
307315
}
308316
}
309317
} else {
310318
for (IndexShardRoutingTable indexShard : indexRouting) {
311-
set.add(indexShard);
319+
indexSet.add(indexShard);
320+
}
321+
}
322+
List<IndexShardRoutingTable> shards = new ArrayList<>(indexSet);
323+
CollectionUtil.timSort(shards, Comparator.comparing(s -> s.shardId));
324+
for (int i = 0; i < shards.size(); i++) {
325+
if (slice == null || slice.shardMatches(i, shards.size())) {
326+
set.add(shards.get(i));
312327
}
313328
}
314329
}

server/src/main/java/org/opensearch/search/slice/SliceBuilder.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,15 @@ public int hashCode() {
214214
return Objects.hash(this.field, this.id, this.max);
215215
}
216216

217+
public boolean shardMatches(int shardId, int numShards) {
218+
if (max >= numShards) {
219+
// Slices are distributed over shards
220+
return id % numShards == shardId;
221+
}
222+
// Shards are distributed over slices
223+
return shardId % max == id;
224+
}
225+
217226
/**
218227
* Converts this QueryBuilder to a lucene {@link Query}.
219228
*
@@ -255,7 +264,12 @@ public Query toFilter(ClusterService clusterService, ShardSearchRequest request,
255264
}
256265
}
257266

258-
String field = this.field;
267+
if (shardMatches(shardId, numShards) == false) {
268+
// We should have already excluded this shard before routing to it.
269+
// If we somehow land here, then we match nothing.
270+
return new MatchNoDocsQuery("this shard is not part of the slice");
271+
}
272+
259273
boolean useTermQuery = false;
260274
if ("_uid".equals(field)) {
261275
throw new IllegalArgumentException("Computing slices on the [_uid] field is illegal for 7.x indices, use [_id] instead");
@@ -277,12 +291,7 @@ public Query toFilter(ClusterService clusterService, ShardSearchRequest request,
277291
// the number of slices is greater than the number of shards
278292
// in such case we can reduce the number of requested shards by slice
279293

280-
// first we check if the slice is responsible of this shard
281294
int targetShard = id % numShards;
282-
if (targetShard != shardId) {
283-
// the shard is not part of this slice, we can skip it.
284-
return new MatchNoDocsQuery("this shard is not part of the slice");
285-
}
286295
// compute the number of slices where this shard appears
287296
int numSlicesInShard = max / numShards;
288297
int rest = max % numShards;
@@ -301,14 +310,8 @@ public Query toFilter(ClusterService clusterService, ShardSearchRequest request,
301310
? new TermsSliceQuery(field, shardSlice, numSlicesInShard)
302311
: new DocValuesSliceQuery(field, shardSlice, numSlicesInShard);
303312
}
304-
// the number of shards is greater than the number of slices
313+
// the number of shards is greater than the number of slices. If we target this shard, we target all of it.
305314

306-
// check if the shard is assigned to the slice
307-
int targetSlice = shardId % max;
308-
if (id != targetSlice) {
309-
// the shard is not part of this slice, we can skip it.
310-
return new MatchNoDocsQuery("this shard is not part of the slice");
311-
}
312315
return new MatchAllDocsQuery();
313316
}
314317

@@ -321,6 +324,8 @@ private GroupShardsIterator<ShardIterator> buildShardIterator(ClusterService clu
321324
Map<String, Set<String>> routingMap = request.indexRoutings().length > 0
322325
? Collections.singletonMap(indices[0], Sets.newHashSet(request.indexRoutings()))
323326
: null;
327+
// Note that we do *not* want to filter this set of shard IDs based on the slice, since we want the
328+
// full set of shards matched by the routing parameters.
324329
return clusterService.operationRouting().searchShards(state, indices, routingMap, request.preference());
325330
}
326331

server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -809,6 +809,7 @@ public void testCollectSearchShards() throws Exception {
809809
remoteIndicesByCluster,
810810
remoteClusterService,
811811
threadPool,
812+
null,
812813
new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)
813814
);
814815
awaitLatch(latch, 5, TimeUnit.SECONDS);
@@ -835,6 +836,7 @@ public void testCollectSearchShards() throws Exception {
835836
remoteIndicesByCluster,
836837
remoteClusterService,
837838
threadPool,
839+
null,
838840
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)
839841
);
840842
awaitLatch(latch, 5, TimeUnit.SECONDS);
@@ -880,6 +882,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
880882
remoteIndicesByCluster,
881883
remoteClusterService,
882884
threadPool,
885+
null,
883886
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)
884887
);
885888
awaitLatch(latch, 5, TimeUnit.SECONDS);
@@ -907,6 +910,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
907910
remoteIndicesByCluster,
908911
remoteClusterService,
909912
threadPool,
913+
null,
910914
new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)
911915
);
912916
awaitLatch(latch, 5, TimeUnit.SECONDS);
@@ -949,6 +953,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
949953
remoteIndicesByCluster,
950954
remoteClusterService,
951955
threadPool,
956+
null,
952957
new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)
953958
);
954959
awaitLatch(latch, 5, TimeUnit.SECONDS);

0 commit comments

Comments
 (0)