Skip to content

Commit 68c358c

Browse files
committed
Make Remote Publication a dynamic setting
Signed-off-by: Shivansh Arora <[email protected]>
1 parent 330b249 commit 68c358c

File tree

10 files changed

+48
-31
lines changed

10 files changed

+48
-31
lines changed

server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
3939
import org.opensearch.cluster.metadata.Metadata;
4040
import org.opensearch.cluster.node.DiscoveryNode;
41+
import org.opensearch.common.settings.ClusterSettings;
4142
import org.opensearch.common.settings.Settings;
4243
import org.opensearch.common.util.io.IOUtils;
4344
import org.opensearch.gateway.remote.ClusterMetadataManifest;
@@ -54,6 +55,7 @@
5455

5556
import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
5657
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING;
58+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
5759
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;
5860

5961
/**
@@ -87,8 +89,8 @@ public CoordinationState(
8789
DiscoveryNode localNode,
8890
PersistedStateRegistry persistedStateRegistry,
8991
ElectionStrategy electionStrategy,
90-
Settings settings
91-
) {
92+
Settings settings,
93+
ClusterSettings clusterSettings) {
9294
this.localNode = localNode;
9395

9496
// persisted state registry
@@ -105,10 +107,10 @@ public CoordinationState(
105107
.getLastAcceptedConfiguration();
106108
this.publishVotes = new VoteCollection();
107109
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
108-
// ToDo: revisit this check while making the setting dynamic
109110
this.isRemotePublicationEnabled = isRemoteStateEnabled
110111
&& REMOTE_PUBLICATION_SETTING.get(settings)
111-
&& localNode.isRemoteStatePublicationEnabled();
112+
&& localNode.isRemoteStatePublicationConfigured();
113+
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
112114
}
113115

114116
public boolean isRemotePublicationEnabled() {
@@ -651,6 +653,15 @@ private boolean shouldCommitRemotePersistedState() {
651653
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null;
652654
}
653655

656+
private void setRemotePublicationSetting(boolean remotePublicationSetting) {
657+
if (remotePublicationSetting == false) {
658+
this.isRemotePublicationEnabled = false;
659+
} else {
660+
this.isRemotePublicationEnabled = isRemoteStateEnabled && localNode.isRemoteStatePublicationConfigured();
661+
}
662+
663+
}
664+
654665
/**
655666
* Pluggable persistence layer for {@link CoordinationState}.
656667
*

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
186186
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
187187
private final NodeHealthService nodeHealthService;
188188
private final PersistedStateRegistry persistedStateRegistry;
189+
private final RemoteClusterStateService remoteClusterStateService;
189190
private final RemoteStoreNodeService remoteStoreNodeService;
191+
private final ClusterSettings clusterSettings;
190192

191193
/**
192194
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
@@ -310,6 +312,8 @@ public Coordinator(
310312
this.persistedStateRegistry = persistedStateRegistry;
311313
this.localNodeCommissioned = true;
312314
this.remoteStoreNodeService = remoteStoreNodeService;
315+
this.remoteClusterStateService = remoteClusterStateService;
316+
this.clusterSettings = clusterSettings;
313317
}
314318

315319
private ClusterFormationState getClusterFormationState() {
@@ -858,7 +862,7 @@ boolean publicationInProgress() {
858862
@Override
859863
protected void doStart() {
860864
synchronized (mutex) {
861-
coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings));
865+
coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings, clusterSettings));
862866
peerFinder.setCurrentTerm(getCurrentTerm());
863867
configuredHostsResolver.start();
864868
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
@@ -903,9 +907,9 @@ public DiscoveryStats stats() {
903907
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
904908
}
905909
});
906-
if (coordinationState.get().isRemotePublicationEnabled()) {
907-
stats.add(publicationHandler.getFullDownloadStats());
908-
stats.add(publicationHandler.getDiffDownloadStats());
910+
if (remoteClusterStateService != null) {
911+
stats.add(remoteClusterStateService.getFullDownloadStats());
912+
stats.add(remoteClusterStateService.getDiffDownloadStats());
909913
}
910914
clusterStateStats.setPersistenceStats(stats);
911915
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);

server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -508,10 +508,10 @@ private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joi
508508

509509
assert existingNodes.isEmpty() == false;
510510
Optional<DiscoveryNode> remotePublicationNode = existingNodes.stream()
511-
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
511+
.filter(DiscoveryNode::isRemoteStatePublicationConfigured)
512512
.findFirst();
513513

514-
if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) {
514+
if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationConfigured()) {
515515
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES);
516516
}
517517
}

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -178,14 +178,6 @@ public PublishClusterStateStats stats() {
178178
);
179179
}
180180

181-
public PersistedStateStats getFullDownloadStats() {
182-
return remoteClusterStateService.getFullDownloadStats();
183-
}
184-
185-
public PersistedStateStats getDiffDownloadStats() {
186-
return remoteClusterStateService.getDiffDownloadStats();
187-
}
188-
189181
private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
190182
try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) {
191183
ClusterState incomingState;
@@ -356,7 +348,7 @@ public PublicationContext newPublicationContext(
356348
) {
357349
if (isRemotePublicationEnabled == true) {
358350
if (allNodesRemotePublicationEnabled.get() == false) {
359-
if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
351+
if (validateRemotePublicationConfiguredOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
360352
allNodesRemotePublicationEnabled.set(true);
361353
}
362354
}
@@ -374,11 +366,11 @@ public PublicationContext newPublicationContext(
374366
return publicationContext;
375367
}
376368

377-
private boolean validateRemotePublicationOnAllNodes(DiscoveryNodes discoveryNodes) {
369+
private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes discoveryNodes) {
378370
assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0;
379371
for (DiscoveryNode node : discoveryNodes.getNodes().values()) {
380372
// if a node is non-remote then created local publication context
381-
if (node.isRemoteStatePublicationEnabled() == false) {
373+
if (node.isRemoteStatePublicationConfigured() == false) {
382374
return false;
383375
}
384376
}

server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ public boolean isRemoteStoreNode() {
518518
* Returns whether remote cluster state publication is enabled on this node
519519
* @return true if the node contains remote cluster state node attribute and remote routing table node attribute
520520
*/
521-
public boolean isRemoteStatePublicationEnabled() {
521+
public boolean isRemoteStatePublicationConfigured() {
522522
return this.getAttributes()
523523
.keySet()
524524
.stream()

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@
111111
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;
112112
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
113113
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX;
114+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
114115
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;
115116

116117
/**
@@ -132,7 +133,7 @@ public class RemoteClusterStateService implements Closeable {
132133
REMOTE_PUBLICATION_SETTING_KEY,
133134
false,
134135
Property.NodeScope,
135-
Property.Final
136+
Property.Dynamic
136137
);
137138

138139
/**
@@ -232,7 +233,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) {
232233
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
233234
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
234235
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
235-
private final boolean isPublicationEnabled;
236+
private boolean isPublicationEnabled;
236237
private final String remotePathPrefix;
237238

238239
private final RemoteClusterStateCache remoteClusterStateCache;
@@ -273,9 +274,10 @@ public RemoteClusterStateService(
273274
this.remoteStateStats = new RemotePersistenceStats();
274275
this.namedWriteableRegistry = namedWriteableRegistry;
275276
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
276-
this.isPublicationEnabled = REMOTE_PUBLICATION_SETTING.get(settings)
277+
this.isPublicationEnabled = clusterSettings.get(REMOTE_PUBLICATION_SETTING)
277278
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
278279
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
280+
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
279281
this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings);
280282
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
281283
repositoriesService,
@@ -1109,6 +1111,14 @@ private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteCl
11091111
this.remoteClusterStateValidationMode = remoteClusterStateValidationMode;
11101112
}
11111113

1114+
private void setRemotePublicationSetting(boolean remotePublicationSetting) {
1115+
if (remotePublicationSetting == false) {
1116+
this.isPublicationEnabled = false;
1117+
} else {
1118+
this.isPublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings);
1119+
}
1120+
}
1121+
11121122
// Package private for unit test
11131123
RemoteRoutingTableService getRemoteRoutingTableService() {
11141124
return this.remoteRoutingTableService;

server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1283,7 +1283,7 @@ public static CoordinationState createCoordinationState(
12831283
DiscoveryNode localNode,
12841284
Settings settings
12851285
) {
1286-
return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings);
1286+
return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings, null);
12871287
}
12881288

12891289
public static ClusterState clusterState(

server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,8 @@ public void testPrevotingIndicatesElectionSuccess() {
302302
localNode,
303303
persistedStateRegistry,
304304
ElectionStrategy.DEFAULT_INSTANCE,
305-
Settings.EMPTY
306-
);
305+
Settings.EMPTY,
306+
null);
307307

308308
final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE);
309309

server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class MockNode {
9494
);
9595
PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
9696
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialState));
97-
coordinationState = new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, Settings.EMPTY);
97+
coordinationState = new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, Settings.EMPTY, null);
9898
}
9999

100100
final DiscoveryNode localNode;

test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ static class ClusterNode {
150150
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState);
151151

152152
this.electionStrategy = electionStrategy;
153-
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY);
153+
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null);
154154
}
155155

156156
void reboot() {
@@ -189,7 +189,7 @@ void reboot() {
189189
localNode.getVersion()
190190
);
191191

192-
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY);
192+
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null);
193193
}
194194

195195
void setInitialState(CoordinationMetadata.VotingConfiguration initialConfig, long initialValue) {

0 commit comments

Comments
 (0)