Skip to content

Commit 0cbd848

Browse files
authored
Enabled Async Shard Batch Fetch by default (#18139)
Signed-off-by: Manik Garg <[email protected]>
1 parent b19d393 commit 0cbd848

File tree

6 files changed

+105
-14
lines changed

6 files changed

+105
-14
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1111
- Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/))
1212
- Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988))
1313
- Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088))
14+
- Enabled Async Shard Batch Fetch by default ([#18139](https://github.com/opensearch-project/OpenSearch/pull/18139))
1415

1516
### Changed
1617

server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java

+86-2
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.opensearch.cluster.routing.ShardRouting;
6868
import org.opensearch.cluster.routing.ShardRoutingState;
6969
import org.opensearch.cluster.routing.UnassignedInfo;
70+
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
7071
import org.opensearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
7172
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
7273
import org.opensearch.cluster.service.ClusterService;
@@ -1479,9 +1480,14 @@ public void testDoNotInfinitelyWaitForMapping() {
14791480
}
14801481

14811482
/** Makes sure the new cluster-manager does not repeatedly fetch index metadata from recovering replicas */
1482-
public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception {
1483+
public void testOngoingRecoveryAndClusterManagerFailOverForASFDisabled() throws Exception {
14831484
String indexName = "test";
1484-
internalCluster().startNodes(2);
1485+
// ASF Disabled
1486+
internalCluster().startNodes(
1487+
2,
1488+
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), false).build()
1489+
);
1490+
14851491
String nodeWithPrimary = internalCluster().startDataOnlyNode();
14861492
assertAcked(
14871493
client().admin()
@@ -1544,6 +1550,84 @@ public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception {
15441550
ensureGreen(indexName);
15451551
}
15461552

1553+
public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception {
1554+
String indexName = "test";
1555+
internalCluster().startNodes(2);
1556+
String nodeWithPrimary = internalCluster().startDataOnlyNode();
1557+
assertAcked(
1558+
client().admin()
1559+
.indices()
1560+
.prepareCreate(indexName)
1561+
.setSettings(
1562+
Settings.builder()
1563+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
1564+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
1565+
.put("index.routing.allocation.include._name", nodeWithPrimary)
1566+
)
1567+
);
1568+
MockTransportService transport = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary);
1569+
CountDownLatch phase1ReadyBlocked = new CountDownLatch(1);
1570+
CountDownLatch allowToCompletePhase1Latch = new CountDownLatch(1);
1571+
Semaphore blockRecovery = new Semaphore(1);
1572+
transport.addSendBehavior((connection, requestId, action, request, options) -> {
1573+
if (PeerRecoveryTargetService.Actions.CLEAN_FILES.equals(action) && blockRecovery.tryAcquire()) {
1574+
phase1ReadyBlocked.countDown();
1575+
try {
1576+
allowToCompletePhase1Latch.await();
1577+
} catch (InterruptedException e) {
1578+
throw new AssertionError(e);
1579+
}
1580+
}
1581+
connection.sendRequest(requestId, action, request, options);
1582+
});
1583+
try {
1584+
String nodeWithReplica = internalCluster().startDataOnlyNode();
1585+
assertAcked(
1586+
client().admin()
1587+
.indices()
1588+
.prepareUpdateSettings(indexName)
1589+
.setSettings(
1590+
Settings.builder()
1591+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
1592+
.put("index.routing.allocation.include._name", nodeWithPrimary + "," + nodeWithReplica)
1593+
)
1594+
);
1595+
phase1ReadyBlocked.await();
1596+
internalCluster().restartNode(
1597+
clusterService().state().nodes().getClusterManagerNode().getName(),
1598+
new InternalTestCluster.RestartCallback()
1599+
);
1600+
internalCluster().ensureAtLeastNumDataNodes(3);
1601+
assertAcked(
1602+
client().admin()
1603+
.indices()
1604+
.prepareUpdateSettings(indexName)
1605+
.setSettings(
1606+
Settings.builder()
1607+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
1608+
.putNull("index.routing.allocation.include._name")
1609+
)
1610+
);
1611+
1612+
ClusterState state = client().admin().cluster().prepareState().get().getState();
1613+
assertTrue(
1614+
state.routingTable().index(indexName).shardsWithState(ShardRoutingState.UNASSIGNED).size() == 1
1615+
&& state.routingTable().index(indexName).shardsWithState(ShardRoutingState.INITIALIZING).size() == 1
1616+
);
1617+
/*
1618+
Shard assignment is stuck because recovery is blocked at CLEAN_FILES stage. Once, it times out after 60s the replica shards get assigned.
1619+
https://github.com/opensearch-project/OpenSearch/issues/18098.
1620+
1621+
Stack trace:
1622+
Caused by: org.opensearch.transport.ReceiveTimeoutTransportException: [node_t3][127.0.0.1:56648][internal:index/shard/recovery/clean_files] request_id [20] timed out after [60026ms]
1623+
at org.opensearch.transport.TransportService$TimeoutHandler.run(TransportService.java:1399) ~[main/:?]
1624+
*/
1625+
ensureGreen(TimeValue.timeValueSeconds(62), indexName);
1626+
} finally {
1627+
allowToCompletePhase1Latch.countDown();
1628+
}
1629+
}
1630+
15471631
public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception {
15481632
internalCluster().ensureAtLeastNumDataNodes(2);
15491633
List<String> nodes = randomSubsetOf(

server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java

+6-8
Original file line numberDiff line numberDiff line change
@@ -64,21 +64,19 @@ public interface ExistingShardsAllocator {
6464

6565
/**
6666
* Boolean setting to enable/disable batch allocation of unassigned shards already existing on disk.
67-
* This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate
67+
* This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate
6868
* in one or more go.
69-
*
70-
* Enable this setting if your ExistingShardAllocator is implementing the
69+
* <p>
70+
* This setting is enabled by default. In your ExistingShardAllocator implement the
7171
* {@link ExistingShardsAllocator#allocateAllUnassignedShards(RoutingAllocation, boolean)} method.
7272
* The default implementation of this method is not optimized and assigns shards one by one.
73-
*
74-
* If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be use for it , i.e,
73+
* <p>
74+
* If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be used for it , i.e,
7575
* {@link ShardsBatchGatewayAllocator}.
76-
*
77-
* This setting is experimental at this point.
7876
*/
7977
Setting<Boolean> EXISTING_SHARDS_ALLOCATOR_BATCH_MODE = Setting.boolSetting(
8078
"cluster.allocator.existing_shards_allocator.batch_enabled",
81-
false,
79+
true,
8280
Setting.Property.NodeScope
8381
);
8482

server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
8585
private TimeValue replicaShardsBatchGatewayAllocatorTimeout;
8686
private volatile Priority followUpRerouteTaskPriority;
8787
public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20);
88+
public static final TimeValue DEFAULT_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20);
8889
private final ClusterManagerMetrics clusterManagerMetrics;
8990

9091
/**
@@ -105,7 +106,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
105106
*/
106107
public static final Setting<TimeValue> PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
107108
PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY,
108-
TimeValue.MINUS_ONE,
109+
DEFAULT_ALLOCATOR_TIMEOUT,
109110
TimeValue.MINUS_ONE,
110111
new Setting.Validator<>() {
111112
@Override
@@ -129,7 +130,7 @@ public void validate(TimeValue timeValue) {
129130
*/
130131
public static final Setting<TimeValue> REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
131132
REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY,
132-
TimeValue.MINUS_ONE,
133+
DEFAULT_ALLOCATOR_TIMEOUT,
133134
TimeValue.MINUS_ONE,
134135
new Setting.Validator<>() {
135136
@Override

server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java

+4
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,14 @@
5959
import org.opensearch.common.settings.ClusterSettings;
6060
import org.opensearch.common.settings.Settings;
6161
import org.opensearch.gateway.GatewayAllocator;
62+
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
6263
import org.opensearch.snapshots.EmptySnapshotsInfoService;
6364
import org.opensearch.telemetry.metrics.Histogram;
6465
import org.opensearch.telemetry.metrics.MetricsRegistry;
6566
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
6667
import org.opensearch.test.OpenSearchTestCase;
6768
import org.opensearch.test.gateway.TestGatewayAllocator;
69+
import org.opensearch.test.gateway.TestShardBatchGatewayAllocator;
6870

6971
import java.util.Arrays;
7072
import java.util.Collections;
@@ -192,8 +194,10 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
192194
final String unrealisticAllocatorName = "unrealistic";
193195
final Map<String, ExistingShardsAllocator> allocatorMap = new HashMap<>();
194196
final TestGatewayAllocator testGatewayAllocator = new TestGatewayAllocator();
197+
final TestShardBatchGatewayAllocator testShardBatchGatewayAllocator = new TestShardBatchGatewayAllocator();
195198
allocatorMap.put(GatewayAllocator.ALLOCATOR_NAME, testGatewayAllocator);
196199
allocatorMap.put(unrealisticAllocatorName, new UnrealisticAllocator());
200+
allocatorMap.put(ShardsBatchGatewayAllocator.ALLOCATOR_NAME, testShardBatchGatewayAllocator);
197201
allocationService.setExistingShardsAllocators(allocatorMap);
198202

199203
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();

server/src/test/java/org/opensearch/index/SearchSlowLogTests.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -211,15 +211,18 @@ public void testTwoLoggersDifferentLevel() {
211211
public void testMultipleSlowLoggersUseSingleLog4jLogger() {
212212
LoggerContext context = (LoggerContext) LogManager.getContext(false);
213213

214-
SearchContext ctx1 = searchContextWithSourceAndTask(createIndex("index-1"));
214+
IndexService index1 = createIndex("index-1");
215+
IndexService index2 = createIndex("index-2");
216+
217+
SearchContext ctx1 = searchContextWithSourceAndTask(index1);
215218
IndexSettings settings1 = new IndexSettings(
216219
createIndexMetadata(SlowLogLevel.WARN, "index-1", UUIDs.randomBase64UUID()),
217220
Settings.EMPTY
218221
);
219222
SearchSlowLog log1 = new SearchSlowLog(settings1);
220223
int numberOfLoggersBefore = context.getLoggers().size();
221224

222-
SearchContext ctx2 = searchContextWithSourceAndTask(createIndex("index-2"));
225+
SearchContext ctx2 = searchContextWithSourceAndTask(index2);
223226
IndexSettings settings2 = new IndexSettings(
224227
createIndexMetadata(SlowLogLevel.TRACE, "index-2", UUIDs.randomBase64UUID()),
225228
Settings.EMPTY

0 commit comments

Comments
 (0)