Skip to content

Commit ca03fdd

Browse files
authored
Added Search Only strict routing setting (#17803)
* Added Search Only strict routing setting Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Added Changelog Signed-off-by: Vinay Krishna Pudyodu <[email protected]> --------- Signed-off-by: Vinay Krishna Pudyodu <[email protected]>
1 parent e8b975d commit ca03fdd

File tree

5 files changed

+165
-1
lines changed

5 files changed

+165
-1
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2626
- Add GRPC DocumentService and Bulk endpoint ([#17727](https://github.com/opensearch-project/OpenSearch/pull/17727))
2727
- Added scale to zero (`search_only` mode) support for OpenSearch reader writer separation ([#17299](https://github.com/opensearch-project/OpenSearch/pull/17299)
2828
- [Star Tree] [Search] Resolving numeric range aggregation with metric aggregation using star-tree ([#17273](https://github.com/opensearch-project/OpenSearch/pull/17273))
29+
- Added Search Only strict routing setting ([#17803](https://github.com/opensearch-project/OpenSearch/pull/17803))
2930

3031
### Changed
3132
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))

server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java

+77
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.indices.settings;
1010

11+
import org.opensearch.action.search.SearchPhaseExecutionException;
1112
import org.opensearch.action.search.SearchResponse;
1213
import org.opensearch.action.support.WriteRequest;
1314
import org.opensearch.cluster.ClusterState;
@@ -30,6 +31,8 @@
3031
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
3132
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
3233
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
34+
import static org.hamcrest.Matchers.anyOf;
35+
import static org.hamcrest.core.IsEqual.equalTo;
3336

3437
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
3538
public class SearchOnlyReplicaIT extends RemoteStoreBaseIntegTestCase {
@@ -236,6 +239,80 @@ public void testSearchReplicaRoutingPreference() throws IOException {
236239
assertEquals(nodeId, indexShardRoutingTable.searchOnlyReplicas().get(0).currentNodeId());
237240
}
238241

242+
public void testSearchReplicaRoutingPreferenceWhenSearchReplicaUnassigned() {
243+
internalCluster().startClusterManagerOnlyNode();
244+
internalCluster().startDataOnlyNode();
245+
createIndex(TEST_INDEX, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1).build());
246+
ensureYellow(TEST_INDEX);
247+
client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
248+
249+
// By default cluster.routing.search_only.strict is set as true
250+
// When cluster.routing.search_only.strict is set as true, and no assigned search replica is available,
251+
// search request will fail since it will route only to search replica but it's not available
252+
Throwable throwable = assertThrows(
253+
SearchPhaseExecutionException.class,
254+
() -> client().prepareSearch(TEST_INDEX).setPreference(null).setQuery(QueryBuilders.matchAllQuery()).get()
255+
);
256+
257+
assertEquals("all shards failed", throwable.getMessage());
258+
259+
// Set cluster.routing.search_only.strict as false
260+
client().admin()
261+
.cluster()
262+
.prepareUpdateSettings()
263+
.setTransientSettings(Settings.builder().put("cluster.routing.search_only.strict", false))
264+
.get();
265+
266+
// When cluster.routing.search_only.strict is set as false, and no assigned search replica is available;
267+
// search request will fall back to querying writers
268+
SearchResponse response = client().prepareSearch(TEST_INDEX).setPreference(null).setQuery(QueryBuilders.matchAllQuery()).get();
269+
270+
String nodeId = response.getHits().getAt(0).getShard().getNodeId();
271+
IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable();
272+
assertEquals(nodeId, indexShardRoutingTable.primaryShard().currentNodeId());
273+
}
274+
275+
public void testSearchReplicaRoutingPreferenceWhenSearchReplicaAssigned() {
276+
internalCluster().startClusterManagerOnlyNode();
277+
internalCluster().startDataOnlyNode();
278+
createIndex(TEST_INDEX, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1).build());
279+
ensureYellow(TEST_INDEX);
280+
client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
281+
282+
internalCluster().startSearchOnlyNode();
283+
ensureGreen(TEST_INDEX);
284+
285+
// By default cluster.routing.search_only.strict is set as true
286+
// When cluster.routing.search_only.strict is set as true, and assigned search replica is available;
287+
// search request will succeed
288+
SearchResponse response = client().prepareSearch(TEST_INDEX).setPreference(null).setQuery(QueryBuilders.matchAllQuery()).get();
289+
290+
String nodeId = response.getHits().getAt(0).getShard().getNodeId();
291+
IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable();
292+
assertEquals(nodeId, indexShardRoutingTable.searchOnlyReplicas().get(0).currentNodeId());
293+
294+
// Set cluster.routing.search_only.strict as false
295+
client().admin()
296+
.cluster()
297+
.prepareUpdateSettings()
298+
.setTransientSettings(Settings.builder().put("cluster.routing.search_only.strict", false))
299+
.get();
300+
301+
// When cluster.routing.search_only.strict is set as false, and assigned search replica is available;
302+
// search request can land on either writer or reader
303+
response = client().prepareSearch(TEST_INDEX).setPreference(null).setQuery(QueryBuilders.matchAllQuery()).get();
304+
305+
nodeId = response.getHits().getAt(0).getShard().getNodeId();
306+
indexShardRoutingTable = getIndexShardRoutingTable();
307+
assertThat(
308+
nodeId,
309+
anyOf(
310+
equalTo(indexShardRoutingTable.primaryShard().currentNodeId()),
311+
equalTo(indexShardRoutingTable.searchOnlyReplicas().get(0).currentNodeId())
312+
)
313+
);
314+
}
315+
239316
public void testUnableToAllocateSearchReplicaWontBlockRegularReplicaAllocation() {
240317
int numSearchReplicas = 1;
241318
int numWriterReplicas = 1;

server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,21 @@ public class OperationRouting {
118118
Preference.PREFER_NODES
119119
);
120120

121+
public static final Setting<Boolean> STRICT_SEARCH_ONLY_ROUTING_ENABLED = Setting.boolSetting(
122+
"cluster.routing.search_only.strict",
123+
true,
124+
Setting.Property.Dynamic,
125+
Setting.Property.NodeScope
126+
);
127+
121128
private volatile List<String> awarenessAttributes;
122129
private volatile boolean useAdaptiveReplicaSelection;
123130
private volatile boolean ignoreAwarenessAttr;
124131
private volatile double weightedRoutingDefaultWeight;
125132
private volatile boolean isFailOpenEnabled;
126133
private volatile boolean isStrictWeightedShardRouting;
127134
private volatile boolean ignoreWeightedRouting;
135+
private volatile boolean isStrictSearchOnlyShardRouting;
128136
private final boolean isReaderWriterSplitEnabled;
129137

130138
public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
@@ -140,12 +148,14 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
140148
this.isFailOpenEnabled = WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(settings);
141149
this.isStrictWeightedShardRouting = STRICT_WEIGHTED_SHARD_ROUTING_ENABLED.get(settings);
142150
this.ignoreWeightedRouting = IGNORE_WEIGHTED_SHARD_ROUTING.get(settings);
151+
this.isStrictSearchOnlyShardRouting = STRICT_SEARCH_ONLY_ROUTING_ENABLED.get(settings);
143152
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
144153
clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes);
145154
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_DEFAULT_WEIGHT, this::setWeightedRoutingDefaultWeight);
146155
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled);
147156
clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting);
148157
clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting);
158+
clusterSettings.addSettingsUpdateConsumer(STRICT_SEARCH_ONLY_ROUTING_ENABLED, this::setStrictSearchOnlyShardRouting);
149159
this.isReaderWriterSplitEnabled = FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings);
150160
}
151161

@@ -193,6 +203,10 @@ public double getWeightedRoutingDefaultWeight() {
193203
return this.weightedRoutingDefaultWeight;
194204
}
195205

206+
void setStrictSearchOnlyShardRouting(boolean strictSearchOnlyShardRouting) {
207+
this.isStrictSearchOnlyShardRouting = strictSearchOnlyShardRouting;
208+
}
209+
196210
public ShardIterator indexShards(ClusterState clusterState, String index, String id, @Nullable String routing) {
197211
return shards(clusterState, index, id, routing).shardsIt();
198212
}
@@ -265,7 +279,7 @@ public GroupShardsIterator<ShardIterator> searchShards(
265279

266280
if (isReaderWriterSplitEnabled) {
267281
if (preference == null || preference.isEmpty()) {
268-
if (indexMetadataForShard.getNumberOfSearchOnlyReplicas() > 0) {
282+
if (indexMetadataForShard.getNumberOfSearchOnlyReplicas() > 0 && isStrictSearchOnlyShardRouting) {
269283
preference = Preference.SEARCH_REPLICA.type();
270284
}
271285
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,7 @@ public void apply(Settings value, Settings current, Settings previous) {
613613
OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED,
614614
OperationRouting.STRICT_WEIGHTED_SHARD_ROUTING_ENABLED,
615615
OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING,
616+
OperationRouting.STRICT_SEARCH_ONLY_ROUTING_ENABLED,
616617
IndexGraveyard.SETTING_MAX_TOMBSTONES,
617618
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
618619
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,

server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java

+71
Original file line numberDiff line numberDiff line change
@@ -1196,6 +1196,77 @@ public void testSearchReplicaDefaultRouting() throws Exception {
11961196
}
11971197
}
11981198

1199+
public void testSearchReplicaRoutingWhenSearchOnlyStrictSettingIsFalse() throws Exception {
1200+
final int numShards = 1;
1201+
final int numReplicas = 2;
1202+
final int numSearchReplicas = 2;
1203+
final String indexName = "test";
1204+
final String[] indexNames = new String[] { indexName };
1205+
1206+
ClusterService clusterService = null;
1207+
ThreadPool threadPool = null;
1208+
1209+
try {
1210+
OperationRouting opRouting = new OperationRouting(
1211+
Settings.builder().put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, "true").build(),
1212+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
1213+
);
1214+
opRouting.setStrictSearchOnlyShardRouting(false);
1215+
1216+
ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(
1217+
indexNames,
1218+
numShards,
1219+
numReplicas,
1220+
numSearchReplicas
1221+
);
1222+
IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index(indexName).getShards().get(0);
1223+
ShardId shardId = indexShardRoutingTable.searchOnlyReplicas().get(0).shardId();
1224+
1225+
threadPool = new TestThreadPool("testSearchReplicaDefaultRouting");
1226+
clusterService = ClusterServiceUtils.createClusterService(threadPool);
1227+
1228+
// add a search replica in initializing state:
1229+
DiscoveryNode node = new DiscoveryNode(
1230+
"node_initializing",
1231+
OpenSearchTestCase.buildNewFakeTransportAddress(),
1232+
Collections.emptyMap(),
1233+
new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES),
1234+
Version.CURRENT
1235+
);
1236+
1237+
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
1238+
.settings(Settings.builder().put(state.metadata().index(indexName).getSettings()).build())
1239+
.numberOfSearchReplicas(3)
1240+
.numberOfReplicas(2)
1241+
.build();
1242+
Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(indexMetadata, false).generateClusterUuidIfNeeded();
1243+
IndexRoutingTable.Builder indexShardRoutingBuilder = IndexRoutingTable.builder(indexMetadata.getIndex());
1244+
indexShardRoutingBuilder.addIndexShard(indexShardRoutingTable);
1245+
indexShardRoutingBuilder.addShard(
1246+
TestShardRouting.newShardRouting(shardId, node.getId(), null, false, true, ShardRoutingState.INITIALIZING, null)
1247+
);
1248+
state = ClusterState.builder(state)
1249+
.routingTable(RoutingTable.builder().add(indexShardRoutingBuilder).build())
1250+
.metadata(metadataBuilder.build())
1251+
.build();
1252+
1253+
GroupShardsIterator<ShardIterator> groupIterator = opRouting.searchShards(state, indexNames, null, null);
1254+
assertThat("one group per shard", groupIterator.size(), equalTo(numShards));
1255+
for (ShardIterator shardIterator : groupIterator) {
1256+
assertEquals("We should have all 6 shards returned", shardIterator.size(), 6);
1257+
for (ShardRouting shardRouting : shardIterator) {
1258+
assertTrue(
1259+
"Any shard can exist with when cluster.routing.search_only.strict is set as false",
1260+
shardRouting.isSearchOnly() || shardRouting.primary() || shardRouting.isSearchOnly() == false
1261+
);
1262+
}
1263+
}
1264+
} finally {
1265+
IOUtils.close(clusterService);
1266+
terminate(threadPool);
1267+
}
1268+
}
1269+
11991270
private DiscoveryNode[] setupNodes() {
12001271
// Sets up two data nodes in zone-a and one data node in zone-b
12011272
List<String> zones = Arrays.asList("a", "a", "b");

0 commit comments

Comments
 (0)