Skip to content

Commit b5886d8

Browse files
committed
Implement parallel shard refresh behind cluster settings
Signed-off-by: Ashish Singh <[email protected]>
1 parent cf31931 commit b5886d8

File tree

9 files changed

+317
-15
lines changed

9 files changed

+317
-15
lines changed

server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,10 @@ public static final IndexShard newIndexShard(
719719
DefaultRemoteStoreSettings.INSTANCE,
720720
false,
721721
IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting),
722-
mock(Function.class)
722+
mock(Function.class),
723+
false,
724+
() -> Boolean.FALSE,
725+
() -> indexService.getRefreshInterval()
723726
);
724727
}
725728

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -823,7 +823,8 @@ public void apply(Settings value, Settings current, Settings previous) {
823823
),
824824

825825
// Setting related to refresh optimisations
826-
IndicesService.CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING
826+
IndicesService.CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING,
827+
IndicesService.CLUSTER_REFRESH_SHARD_LEVEL_ENABLED_SETTING
827828
)
828829
)
829830
);

server/src/main/java/org/opensearch/index/IndexModule.java

+4
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,7 @@ public IndexService newIndexService(
631631
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
632632
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
633633
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
634+
Supplier<Boolean> shardLevelRefreshEnabled,
634635
RecoverySettings recoverySettings,
635636
RemoteStoreSettings remoteStoreSettings
636637
) throws IOException {
@@ -655,6 +656,7 @@ public IndexService newIndexService(
655656
translogFactorySupplier,
656657
clusterDefaultRefreshIntervalSupplier,
657658
fixedRefreshIntervalSchedulingEnabled,
659+
shardLevelRefreshEnabled,
658660
recoverySettings,
659661
remoteStoreSettings,
660662
(s) -> {},
@@ -683,6 +685,7 @@ public IndexService newIndexService(
683685
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
684686
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
685687
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
688+
Supplier<Boolean> shardLevelRefreshEnabled,
686689
RecoverySettings recoverySettings,
687690
RemoteStoreSettings remoteStoreSettings,
688691
Consumer<IndexShard> replicator,
@@ -745,6 +748,7 @@ public IndexService newIndexService(
745748
translogFactorySupplier,
746749
clusterDefaultRefreshIntervalSupplier,
747750
fixedRefreshIntervalSchedulingEnabled,
751+
shardLevelRefreshEnabled.get(),
748752
recoverySettings,
749753
remoteStoreSettings,
750754
fileCache,

server/src/main/java/org/opensearch/index/IndexService.java

+89-8
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,9 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
199199
private final CompositeIndexSettings compositeIndexSettings;
200200
private final Consumer<IndexShard> replicator;
201201
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;
202+
private final Object refreshMutex = new Object();
203+
private volatile TimeValue refreshInterval;
204+
private volatile boolean shardLevelRefreshEnabled;
202205

203206
public IndexService(
204207
IndexSettings indexSettings,
@@ -234,6 +237,7 @@ public IndexService(
234237
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
235238
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
236239
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
240+
boolean shardLevelRefreshEnabled,
237241
RecoverySettings recoverySettings,
238242
RemoteStoreSettings remoteStoreSettings,
239243
FileCache fileCache,
@@ -310,8 +314,9 @@ public IndexService(
310314
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
311315
this.clusterDefaultRefreshIntervalSupplier = clusterDefaultRefreshIntervalSupplier;
312316
this.fixedRefreshIntervalSchedulingEnabled = fixedRefreshIntervalSchedulingEnabled;
317+
this.shardLevelRefreshEnabled = shardLevelRefreshEnabled;
318+
this.refreshInterval = getRefreshInterval();
313319
// kick off async ops for the first shard in this index
314-
this.refreshTask = new AsyncRefreshTask(this);
315320
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
316321
// disable these checks for ingestion source engine
317322
if (!indexSettings.getIndexMetadata().useIngestionSource()) {
@@ -329,6 +334,10 @@ public IndexService(
329334
this.replicator = replicator;
330335
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
331336
updateFsyncTaskIfNecessary();
337+
if (shardLevelRefreshEnabled == false) {
338+
logger.debug("shard level refresh is disabled for index [{}]", indexSettings.getIndex().getName());
339+
startIndexLevelRefreshTask();
340+
}
332341
}
333342

334343
public IndexService(
@@ -365,6 +374,7 @@ public IndexService(
365374
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
366375
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
367376
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
377+
boolean shardLevelRefreshEnabled,
368378
RecoverySettings recoverySettings,
369379
RemoteStoreSettings remoteStoreSettings
370380
) {
@@ -402,6 +412,7 @@ public IndexService(
402412
translogFactorySupplier,
403413
clusterDefaultRefreshIntervalSupplier,
404414
fixedRefreshIntervalSchedulingEnabled,
415+
shardLevelRefreshEnabled,
405416
recoverySettings,
406417
remoteStoreSettings,
407418
null,
@@ -704,7 +715,10 @@ protected void closeInternal() {
704715
remoteStoreSettings,
705716
seedRemote,
706717
discoveryNodes,
707-
segmentReplicationStatsProvider
718+
segmentReplicationStatsProvider,
719+
shardLevelRefreshEnabled,
720+
fixedRefreshIntervalSchedulingEnabled,
721+
this::getRefreshInterval
708722
);
709723
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
710724
eventListener.afterIndexShardCreated(indexShard);
@@ -1133,9 +1147,10 @@ private void updateReplicationTask() {
11331147
* 2. {@code index.refresh_interval} index setting changes.
11341148
*/
11351149
public void onRefreshIntervalChange() {
1136-
if (refreshTask.getInterval().equals(getRefreshInterval())) {
1150+
if (refreshInterval.equals(getRefreshInterval())) {
11371151
return;
11381152
}
1153+
refreshInterval = getRefreshInterval();
11391154
// once we change the refresh interval we schedule yet another refresh
11401155
// to ensure we are in a clean and predictable state.
11411156
// it doesn't matter if we move from or to <code>-1</code> in both cases we want
@@ -1177,10 +1192,20 @@ private void updateFsyncTaskIfNecessary() {
11771192
}
11781193

11791194
private void rescheduleRefreshTasks() {
1180-
try {
1181-
refreshTask.close();
1182-
} finally {
1183-
refreshTask = new AsyncRefreshTask(this);
1195+
synchronized (refreshMutex) {
1196+
if (shardLevelRefreshEnabled) {
1197+
try {
1198+
stopShardLevelRefreshTasks();
1199+
} finally {
1200+
startShardLevelRefreshTasks();
1201+
}
1202+
} else {
1203+
try {
1204+
stopIndexLevelRefreshTask();
1205+
} finally {
1206+
startIndexLevelRefreshTask();
1207+
}
1208+
}
11841209
}
11851210
}
11861211

@@ -1304,7 +1329,7 @@ private void sync(final Consumer<IndexShard> sync, final String source) {
13041329
* Gets the refresh interval seen by the index service. Index setting overrides takes the highest precedence.
13051330
* @return the refresh interval.
13061331
*/
1307-
private TimeValue getRefreshInterval() {
1332+
public TimeValue getRefreshInterval() {
13081333
if (getIndexSettings().isExplicitRefresh()) {
13091334
return getIndexSettings().getRefreshInterval();
13101335
}
@@ -1584,4 +1609,60 @@ public boolean clearCaches(boolean queryCache, boolean fieldDataCache, String...
15841609
return clearedAtLeastOne;
15851610
}
15861611

1612+
/**
1613+
* This method is called when the refresh level is changed from index level to shard level or vice versa.
1614+
*/
1615+
public void onRefreshLevelChange(boolean newShardLevelRefreshVal) {
1616+
synchronized (refreshMutex) {
1617+
if (this.shardLevelRefreshEnabled != newShardLevelRefreshVal) {
1618+
boolean prevShardLevelRefreshVal = this.shardLevelRefreshEnabled;
1619+
this.shardLevelRefreshEnabled = newShardLevelRefreshVal;
1620+
// The refresh mode has changed from index level to shard level
1621+
logger.info("refresh tasks rescheduled oldVal={} newVal={}", prevShardLevelRefreshVal, newShardLevelRefreshVal);
1622+
if (newShardLevelRefreshVal) {
1623+
try {
1624+
stopIndexLevelRefreshTask();
1625+
} finally {
1626+
startShardLevelRefreshTasks();
1627+
}
1628+
} else {
1629+
try {
1630+
stopShardLevelRefreshTasks();
1631+
} finally {
1632+
startIndexLevelRefreshTask();
1633+
}
1634+
}
1635+
}
1636+
}
1637+
}
1638+
1639+
private void stopIndexLevelRefreshTask() {
1640+
// The refresh task is expected to be non-null at this point.
1641+
assert Objects.nonNull(refreshTask);
1642+
refreshTask.close();
1643+
refreshTask = null;
1644+
}
1645+
1646+
private void startShardLevelRefreshTasks() {
1647+
for (IndexShard shard : this.shards.values()) {
1648+
shard.startRefreshTask();
1649+
}
1650+
}
1651+
1652+
private void stopShardLevelRefreshTasks() {
1653+
for (IndexShard shard : this.shards.values()) {
1654+
shard.stopRefreshTask();
1655+
}
1656+
}
1657+
1658+
private void startIndexLevelRefreshTask() {
1659+
// The refresh task is expected to be null at this point.
1660+
assert Objects.isNull(refreshTask);
1661+
refreshTask = new AsyncRefreshTask(this);
1662+
}
1663+
1664+
// Visible for testing
1665+
boolean isShardLevelRefreshEnabled() {
1666+
return shardLevelRefreshEnabled;
1667+
}
15871668
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+68-2
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import org.opensearch.common.CheckedRunnable;
8383
import org.opensearch.common.Nullable;
8484
import org.opensearch.common.SetOnce;
85+
import org.opensearch.common.annotation.ExperimentalApi;
8586
import org.opensearch.common.annotation.PublicApi;
8687
import org.opensearch.common.collect.Tuple;
8788
import org.opensearch.common.concurrent.GatedCloseable;
@@ -95,6 +96,7 @@
9596
import org.opensearch.common.settings.Settings;
9697
import org.opensearch.common.unit.TimeValue;
9798
import org.opensearch.common.util.BigArrays;
99+
import org.opensearch.common.util.concurrent.AbstractAsyncTask;
98100
import org.opensearch.common.util.concurrent.AbstractRunnable;
99101
import org.opensearch.common.util.concurrent.AsyncIOProcessor;
100102
import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor;
@@ -367,6 +369,9 @@ Runnable getGlobalCheckpointSyncer() {
367369
private final ShardMigrationState shardMigrationState;
368370
private DiscoveryNodes discoveryNodes;
369371
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;
372+
private final Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled;
373+
private final Supplier<TimeValue> refreshInterval;
374+
private volatile AsyncShardRefreshTask refreshTask;
370375

371376
public IndexShard(
372377
final ShardRouting shardRouting,
@@ -398,7 +403,10 @@ public IndexShard(
398403
final RemoteStoreSettings remoteStoreSettings,
399404
boolean seedRemote,
400405
final DiscoveryNodes discoveryNodes,
401-
final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
406+
final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider,
407+
final boolean shardLevelRefreshEnabled,
408+
final Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
409+
final Supplier<TimeValue> refreshInterval
402410
) throws IOException {
403411
super(shardRouting.shardId(), indexSettings);
404412
assert shardRouting.initializing();
@@ -501,6 +509,11 @@ public boolean shouldCache(Query query) {
501509
this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote);
502510
this.discoveryNodes = discoveryNodes;
503511
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
512+
this.fixedRefreshIntervalSchedulingEnabled = fixedRefreshIntervalSchedulingEnabled;
513+
this.refreshInterval = refreshInterval;
514+
if (shardLevelRefreshEnabled) {
515+
startRefreshTask();
516+
}
504517
}
505518

506519
/**
@@ -2099,7 +2112,7 @@ public void close(String reason, boolean flushEngine, boolean deleted) throws IO
20992112
} finally {
21002113
// playing safe here and close the engine even if the above succeeds - close can be called multiple times
21012114
// Also closing refreshListeners to prevent us from accumulating any more listeners
2102-
IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions);
2115+
IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions, refreshTask);
21032116

21042117
if (deleted && engine != null && isPrimaryMode()) {
21052118
// Translog Clean up
@@ -5484,4 +5497,57 @@ public ShardIngestionState getIngestionState() {
54845497
IngestionEngine ingestionEngine = (IngestionEngine) engine;
54855498
return ingestionEngine.getIngestionState();
54865499
}
5500+
5501+
@ExperimentalApi
5502+
public final class AsyncShardRefreshTask extends AbstractAsyncTask {
5503+
5504+
private final IndexShard indexShard;
5505+
private final Logger logger;
5506+
5507+
public AsyncShardRefreshTask(IndexShard indexShard) {
5508+
super(indexShard.logger, indexShard.threadPool, refreshInterval.get(), true, fixedRefreshIntervalSchedulingEnabled);
5509+
this.logger = indexShard.logger;
5510+
this.indexShard = indexShard;
5511+
rescheduleIfNecessary();
5512+
}
5513+
5514+
@Override
5515+
protected boolean mustReschedule() {
5516+
return indexShard.state != IndexShardState.CLOSED
5517+
&& indexShard.indexSettings.getIndexMetadata().getState() == IndexMetadata.State.OPEN;
5518+
}
5519+
5520+
@Override
5521+
protected void runInternal() {
5522+
indexShard.scheduledRefresh();
5523+
}
5524+
5525+
@Override
5526+
protected String getThreadPool() {
5527+
return ThreadPool.Names.REFRESH;
5528+
}
5529+
5530+
@Override
5531+
public String toString() {
5532+
return "shard_refresh";
5533+
}
5534+
}
5535+
5536+
public void startRefreshTask() {
5537+
// The refresh task is expected to be null at this point.
5538+
assert Objects.isNull(refreshTask);
5539+
refreshTask = new AsyncShardRefreshTask(this);
5540+
}
5541+
5542+
public void stopRefreshTask() {
5543+
// The refresh task is expected to be non-null at this point.
5544+
assert Objects.nonNull(refreshTask);
5545+
refreshTask.close();
5546+
refreshTask = null;
5547+
}
5548+
5549+
// Visible for testing
5550+
public AsyncShardRefreshTask getRefreshTask() {
5551+
return refreshTask;
5552+
}
54875553
}

0 commit comments

Comments
 (0)