Skip to content

[Backport 3.0] Enabled Async Shard Batch Fetch by default (#18139) #18162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 3.0.x]
### Added
- Enabled Async Shard Batch Fetch by default ([#18139](https://github.com/opensearch-project/OpenSearch/pull/18139))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need to go in the release notes if we merge this into the 3.0 release. Also the changelog entry on main should be removed if this ends up being released in 3.0.


### Changed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -1479,9 +1480,14 @@ public void testDoNotInfinitelyWaitForMapping() {
}

/** Makes sure the new cluster-manager does not repeatedly fetch index metadata from recovering replicas */
public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception {
public void testOngoingRecoveryAndClusterManagerFailOverForASFDisabled() throws Exception {
String indexName = "test";
internalCluster().startNodes(2);
// ASF Disabled
internalCluster().startNodes(
2,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), false).build()
);

String nodeWithPrimary = internalCluster().startDataOnlyNode();
assertAcked(
client().admin()
Expand Down Expand Up @@ -1544,6 +1550,84 @@ public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception {
ensureGreen(indexName);
}

public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception {
String indexName = "test";
internalCluster().startNodes(2);
String nodeWithPrimary = internalCluster().startDataOnlyNode();
assertAcked(
client().admin()
.indices()
.prepareCreate(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.routing.allocation.include._name", nodeWithPrimary)
)
);
MockTransportService transport = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary);
CountDownLatch phase1ReadyBlocked = new CountDownLatch(1);
CountDownLatch allowToCompletePhase1Latch = new CountDownLatch(1);
Semaphore blockRecovery = new Semaphore(1);
transport.addSendBehavior((connection, requestId, action, request, options) -> {
if (PeerRecoveryTargetService.Actions.CLEAN_FILES.equals(action) && blockRecovery.tryAcquire()) {
phase1ReadyBlocked.countDown();
try {
allowToCompletePhase1Latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
connection.sendRequest(requestId, action, request, options);
});
try {
String nodeWithReplica = internalCluster().startDataOnlyNode();
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.routing.allocation.include._name", nodeWithPrimary + "," + nodeWithReplica)
)
);
phase1ReadyBlocked.await();
internalCluster().restartNode(
clusterService().state().nodes().getClusterManagerNode().getName(),
new InternalTestCluster.RestartCallback()
);
internalCluster().ensureAtLeastNumDataNodes(3);
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.putNull("index.routing.allocation.include._name")
)
);

ClusterState state = client().admin().cluster().prepareState().get().getState();
assertTrue(
state.routingTable().index(indexName).shardsWithState(ShardRoutingState.UNASSIGNED).size() == 1
&& state.routingTable().index(indexName).shardsWithState(ShardRoutingState.INITIALIZING).size() == 1
);
/*
Shard assignment is stuck because recovery is blocked at CLEAN_FILES stage. Once, it times out after 60s the replica shards get assigned.
https://github.com/opensearch-project/OpenSearch/issues/18098.

Stack trace:
Caused by: org.opensearch.transport.ReceiveTimeoutTransportException: [node_t3][127.0.0.1:56648][internal:index/shard/recovery/clean_files] request_id [20] timed out after [60026ms]
at org.opensearch.transport.TransportService$TimeoutHandler.run(TransportService.java:1399) ~[main/:?]
*/
ensureGreen(TimeValue.timeValueSeconds(62), indexName);
} finally {
allowToCompletePhase1Latch.countDown();
}
}

public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
List<String> nodes = randomSubsetOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,19 @@ public interface ExistingShardsAllocator {

/**
* Boolean setting to enable/disable batch allocation of unassigned shards already existing on disk.
* This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate
* This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate
* in one or more go.
*
* Enable this setting if your ExistingShardAllocator is implementing the
* <p>
* This setting is enabled by default. In your ExistingShardAllocator implement the
* {@link ExistingShardsAllocator#allocateAllUnassignedShards(RoutingAllocation, boolean)} method.
* The default implementation of this method is not optimized and assigns shards one by one.
*
* If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be use for it , i.e,
* <p>
* If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be used for it , i.e,
* {@link ShardsBatchGatewayAllocator}.
*
* This setting is experimental at this point.
*/
Setting<Boolean> EXISTING_SHARDS_ALLOCATOR_BATCH_MODE = Setting.boolSetting(
"cluster.allocator.existing_shards_allocator.batch_enabled",
false,
true,
Setting.Property.NodeScope
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
private TimeValue replicaShardsBatchGatewayAllocatorTimeout;
private volatile Priority followUpRerouteTaskPriority;
public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20);
public static final TimeValue DEFAULT_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20);
private final ClusterManagerMetrics clusterManagerMetrics;

/**
Expand All @@ -105,7 +106,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
*/
public static final Setting<TimeValue> PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY,
TimeValue.MINUS_ONE,
DEFAULT_ALLOCATOR_TIMEOUT,
TimeValue.MINUS_ONE,
new Setting.Validator<>() {
@Override
Expand All @@ -129,7 +130,7 @@ public void validate(TimeValue timeValue) {
*/
public static final Setting<TimeValue> REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY,
TimeValue.MINUS_ONE,
DEFAULT_ALLOCATOR_TIMEOUT,
TimeValue.MINUS_ONE,
new Setting.Validator<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.snapshots.EmptySnapshotsInfoService;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.gateway.TestGatewayAllocator;
import org.opensearch.test.gateway.TestShardBatchGatewayAllocator;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -192,8 +194,10 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
final String unrealisticAllocatorName = "unrealistic";
final Map<String, ExistingShardsAllocator> allocatorMap = new HashMap<>();
final TestGatewayAllocator testGatewayAllocator = new TestGatewayAllocator();
final TestShardBatchGatewayAllocator testShardBatchGatewayAllocator = new TestShardBatchGatewayAllocator();
allocatorMap.put(GatewayAllocator.ALLOCATOR_NAME, testGatewayAllocator);
allocatorMap.put(unrealisticAllocatorName, new UnrealisticAllocator());
allocatorMap.put(ShardsBatchGatewayAllocator.ALLOCATOR_NAME, testShardBatchGatewayAllocator);
allocationService.setExistingShardsAllocators(allocatorMap);

final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,18 @@ public void testTwoLoggersDifferentLevel() {
public void testMultipleSlowLoggersUseSingleLog4jLogger() {
LoggerContext context = (LoggerContext) LogManager.getContext(false);

SearchContext ctx1 = searchContextWithSourceAndTask(createIndex("index-1"));
IndexService index1 = createIndex("index-1");
IndexService index2 = createIndex("index-2");

SearchContext ctx1 = searchContextWithSourceAndTask(index1);
IndexSettings settings1 = new IndexSettings(
createIndexMetadata(SlowLogLevel.WARN, "index-1", UUIDs.randomBase64UUID()),
Settings.EMPTY
);
SearchSlowLog log1 = new SearchSlowLog(settings1);
int numberOfLoggersBefore = context.getLoggers().size();

SearchContext ctx2 = searchContextWithSourceAndTask(createIndex("index-2"));
SearchContext ctx2 = searchContextWithSourceAndTask(index2);
IndexSettings settings2 = new IndexSettings(
createIndexMetadata(SlowLogLevel.TRACE, "index-2", UUIDs.randomBase64UUID()),
Settings.EMPTY
Expand Down
Loading