Skip to content

Implement parallel shard refresh behind cluster settings #17782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.x]
### Added
- Add multi-threaded writer support in pull-based ingestion ([#17912](https://github.com/opensearch-project/OpenSearch/pull/17912))
- Implement parallel shard refresh behind cluster settings ([#17782](https://github.com/opensearch-project/OpenSearch/pull/17782))

### Changed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Assert;

import java.io.IOException;
Expand Down Expand Up @@ -721,7 +722,11 @@ public static final IndexShard newIndexShard(
false,
IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting),
mock(Function.class),
new MergedSegmentWarmerFactory(null, null, null)
new MergedSegmentWarmerFactory(null, null, null),
false,
OpenSearchTestCase::randomBoolean,
() -> indexService.getIndexSettings().getRefreshInterval(),
indexService.getRefreshMutex()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,8 @@ public void apply(Settings value, Settings current, Settings previous) {
),

// Setting related to refresh optimisations
IndicesService.CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING
IndicesService.CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING,
IndicesService.CLUSTER_REFRESH_SHARD_LEVEL_ENABLED_SETTING
)
)
);
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ public IndexService newIndexService(
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
Supplier<Boolean> shardLevelRefreshEnabled,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier
Expand All @@ -656,6 +657,7 @@ public IndexService newIndexService(
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
fixedRefreshIntervalSchedulingEnabled,
shardLevelRefreshEnabled,
recoverySettings,
remoteStoreSettings,
(s) -> {},
Expand Down Expand Up @@ -685,6 +687,7 @@ public IndexService newIndexService(
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
Supplier<Boolean> shardLevelRefreshEnabled,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Consumer<IndexShard> replicator,
Expand Down Expand Up @@ -748,6 +751,7 @@ public IndexService newIndexService(
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
fixedRefreshIntervalSchedulingEnabled,
shardLevelRefreshEnabled.get(),
recoverySettings,
remoteStoreSettings,
fileCache,
Expand Down
107 changes: 100 additions & 7 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;
private final Object refreshMutex = new Object();
private volatile TimeValue refreshInterval;
private volatile boolean shardLevelRefreshEnabled;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -234,6 +237,7 @@
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
boolean shardLevelRefreshEnabled,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
FileCache fileCache,
Expand Down Expand Up @@ -311,8 +315,9 @@
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
this.clusterDefaultRefreshIntervalSupplier = clusterDefaultRefreshIntervalSupplier;
this.fixedRefreshIntervalSchedulingEnabled = fixedRefreshIntervalSchedulingEnabled;
this.shardLevelRefreshEnabled = shardLevelRefreshEnabled;
this.refreshInterval = getRefreshInterval();
// kick off async ops for the first shard in this index
this.refreshTask = new AsyncRefreshTask(this);
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
// disable these checks for ingestion source engine
if (!indexSettings.getIndexMetadata().useIngestionSource()) {
Expand All @@ -329,6 +334,12 @@
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
indexSettings.setDefaultMaxMergesAtOnce(clusterDefaultMaxMergeAtOnceSupplier.get());
updateFsyncTaskIfNecessary();
synchronized (refreshMutex) {
if (shardLevelRefreshEnabled == false) {
logger.debug("shard level refresh is disabled for index [{}]", indexSettings.getIndex().getName());
startIndexLevelRefreshTask();
}
}
}

public IndexService(
Expand Down Expand Up @@ -365,6 +376,7 @@
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
boolean shardLevelRefreshEnabled,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Supplier<Integer> clusterDefaultMaxMergeAtOnce
Expand Down Expand Up @@ -403,6 +415,7 @@
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
fixedRefreshIntervalSchedulingEnabled,
shardLevelRefreshEnabled,
recoverySettings,
remoteStoreSettings,
null,
Expand Down Expand Up @@ -708,7 +721,11 @@
seedRemote,
discoveryNodes,
segmentReplicationStatsProvider,
mergedSegmentWarmerFactory
mergedSegmentWarmerFactory,
shardLevelRefreshEnabled,
fixedRefreshIntervalSchedulingEnabled,
this::getRefreshInterval,
refreshMutex
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down Expand Up @@ -1142,9 +1159,10 @@
* 2. {@code index.refresh_interval} index setting changes.
*/
public void onRefreshIntervalChange() {
if (refreshTask.getInterval().equals(getRefreshInterval())) {
if (refreshInterval.equals(getRefreshInterval())) {
return;
}
refreshInterval = getRefreshInterval();
// once we change the refresh interval we schedule yet another refresh
// to ensure we are in a clean and predictable state.
// it doesn't matter if we move from or to <code>-1</code> in both cases we want
Expand Down Expand Up @@ -1186,10 +1204,20 @@
}

private void rescheduleRefreshTasks() {
try {
refreshTask.close();
} finally {
refreshTask = new AsyncRefreshTask(this);
synchronized (refreshMutex) {
if (shardLevelRefreshEnabled) {
try {
stopShardLevelRefreshTasks();
} finally {
startShardLevelRefreshTasks();
}
} else {
try {
stopIndexLevelRefreshTask();
} finally {
startIndexLevelRefreshTask();
}
}
}
}

Expand Down Expand Up @@ -1593,4 +1621,69 @@
return clearedAtLeastOne;
}

/**
* This method is called when the refresh level is changed from index level to shard level or vice versa.
*/
public void onRefreshLevelChange(boolean newShardLevelRefreshVal) {
synchronized (refreshMutex) {
if (this.shardLevelRefreshEnabled != newShardLevelRefreshVal) {
boolean prevShardLevelRefreshVal = this.shardLevelRefreshEnabled;
this.shardLevelRefreshEnabled = newShardLevelRefreshVal;
// The refresh mode has changed from index level to shard level and vice versa
logger.info("refresh tasks rescheduled oldVal={} newVal={}", prevShardLevelRefreshVal, newShardLevelRefreshVal);
if (newShardLevelRefreshVal) {
try {
stopIndexLevelRefreshTask();
} finally {
startShardLevelRefreshTasks();
}
} else {
try {
stopShardLevelRefreshTasks();
} finally {
startIndexLevelRefreshTask();
}
}
}
}
}

private void stopIndexLevelRefreshTask() {
assert Thread.holdsLock(refreshMutex);
// The refresh task is expected to be non-null at this point.
assert Objects.nonNull(refreshTask);
refreshTask.close();
refreshTask = null;
}

private void startShardLevelRefreshTasks() {
assert Thread.holdsLock(refreshMutex);
for (IndexShard shard : this.shards.values()) {
shard.startRefreshTask();
}
}

private void stopShardLevelRefreshTasks() {
assert Thread.holdsLock(refreshMutex);
for (IndexShard shard : this.shards.values()) {
shard.stopRefreshTask();
}
}

private void startIndexLevelRefreshTask() {
assert Thread.holdsLock(refreshMutex);
// The refresh task is expected to be null at this point.
assert Objects.isNull(refreshTask);
refreshTask = new AsyncRefreshTask(this);
}

// Visible for testing
boolean isShardLevelRefreshEnabled() {
return shardLevelRefreshEnabled;
}

// Visible for testing
public Object getRefreshMutex() {
return refreshMutex;

Check warning on line 1687 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L1687

Added line #L1687 was not covered by tests
}
}
80 changes: 78 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
Expand All @@ -95,6 +96,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.concurrent.AbstractAsyncTask;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.AsyncIOProcessor;
import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor;
Expand Down Expand Up @@ -369,6 +371,10 @@
private DiscoveryNodes discoveryNodes;
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;
private final MergedSegmentWarmerFactory mergedSegmentWarmerFactory;
private final Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled;
private final Supplier<TimeValue> refreshInterval;
private final Object refreshMutex;
private volatile AsyncShardRefreshTask refreshTask;

public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -401,7 +407,11 @@
boolean seedRemote,
final DiscoveryNodes discoveryNodes,
final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider,
final MergedSegmentWarmerFactory mergedSegmentWarmerFactory
final MergedSegmentWarmerFactory mergedSegmentWarmerFactory,
final boolean shardLevelRefreshEnabled,
final Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
final Supplier<TimeValue> refreshInterval,
final Object refreshMutex
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -505,6 +515,14 @@
this.discoveryNodes = discoveryNodes;
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
this.mergedSegmentWarmerFactory = mergedSegmentWarmerFactory;
this.fixedRefreshIntervalSchedulingEnabled = fixedRefreshIntervalSchedulingEnabled;
this.refreshInterval = refreshInterval;
this.refreshMutex = Objects.requireNonNull(refreshMutex);
synchronized (this.refreshMutex) {
if (shardLevelRefreshEnabled) {
startRefreshTask();

Check warning on line 523 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L523

Added line #L523 was not covered by tests
}
}
}

/**
Expand Down Expand Up @@ -2111,7 +2129,7 @@
} finally {
// playing safe here and close the engine even if the above succeeds - close can be called multiple times
// Also closing refreshListeners to prevent us from accumulating any more listeners
IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions);
IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions, refreshTask);

if (deleted && engine != null && isPrimaryMode()) {
// Translog Clean up
Expand Down Expand Up @@ -5497,4 +5515,62 @@
IngestionEngine ingestionEngine = (IngestionEngine) engine;
return ingestionEngine.getIngestionState();
}

/**
* Async shard refresh task for running refreshes at shard level independently
*/
@ExperimentalApi
public final class AsyncShardRefreshTask extends AbstractAsyncTask {

private final IndexShard indexShard;
private final Logger logger;

public AsyncShardRefreshTask(IndexShard indexShard) {
super(indexShard.logger, indexShard.threadPool, refreshInterval.get(), true, fixedRefreshIntervalSchedulingEnabled);
this.logger = indexShard.logger;
this.indexShard = indexShard;
rescheduleIfNecessary();
}

@Override
protected boolean mustReschedule() {
return indexShard.state != IndexShardState.CLOSED
&& indexShard.indexSettings.getIndexMetadata().getState() == IndexMetadata.State.OPEN;
}

@Override
protected void runInternal() {
indexShard.scheduledRefresh();
}

@Override
protected String getThreadPool() {
return ThreadPool.Names.REFRESH;
}

@Override
public String toString() {
return "shard_refresh";

Check warning on line 5553 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L5553

Added line #L5553 was not covered by tests
}
}

public void startRefreshTask() {
assert Thread.holdsLock(refreshMutex);
// The refresh task is expected to be null at this point.
assert Objects.isNull(refreshTask);
refreshTask = new AsyncShardRefreshTask(this);
}

public void stopRefreshTask() {
assert Thread.holdsLock(refreshMutex);
// The refresh task is expected to be non-null at this point.
assert Objects.nonNull(refreshTask);
refreshTask.close();
refreshTask = null;
Comment on lines +5568 to +5569
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally the close should de-reference the refreshTask

}

// Visible for testing
public AsyncShardRefreshTask getRefreshTask() {
return refreshTask;
}
}
Loading
Loading