Skip to content

Commit 730128a

Browse files
author
Nagaraj G
committed
Fixing stale cache issue post snapshot restore
Signed-off-by: Nagaraj G <[email protected]>
1 parent 835f899 commit 730128a

File tree

4 files changed

+97
-1
lines changed

4 files changed

+97
-1
lines changed

src/main/java/org/opensearch/security/OpenSearchSecurityPlugin.java

+2
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,8 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
870870
}
871871
}
872872
}.toListener());
873+
874+
indexModule.addIndexEventListener(cr);
873875
}
874876
}
875877

src/main/java/org/opensearch/security/configuration/ConfigurationRepository.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,13 @@
7575
import org.opensearch.common.util.concurrent.ThreadContext.StoredContext;
7676
import org.opensearch.core.action.ActionListener;
7777
import org.opensearch.core.common.Strings;
78+
import org.opensearch.core.index.Index;
79+
import org.opensearch.core.index.shard.ShardId;
7880
import org.opensearch.core.rest.RestStatus;
7981
import org.opensearch.core.xcontent.MediaTypeRegistry;
8082
import org.opensearch.env.Environment;
83+
import org.opensearch.index.shard.IndexEventListener;
84+
import org.opensearch.index.shard.IndexShard;
8185
import org.opensearch.security.auditlog.AuditLog;
8286
import org.opensearch.security.auditlog.config.AuditConfig;
8387
import org.opensearch.security.securityconf.DynamicConfigFactory;
@@ -93,8 +97,9 @@
9397
import org.opensearch.transport.client.Client;
9498

9599
import static org.opensearch.security.support.ConfigConstants.SECURITY_ALLOW_DEFAULT_INIT_USE_CLUSTER_STATE;
100+
import static org.opensearch.security.support.SnapshotRestoreHelper.isSecurityIndexRestoredFromSnapshot;
96101

97-
public class ConfigurationRepository implements ClusterStateListener {
102+
public class ConfigurationRepository implements ClusterStateListener, IndexEventListener {
98103
private static final Logger LOGGER = LogManager.getLogger(ConfigurationRepository.class);
99104

100105
private final String securityIndex;
@@ -668,4 +673,21 @@ public Void run() {
668673
});
669674
}
670675
}
676+
677+
@Override
678+
public void afterIndexShardStarted(IndexShard indexShard) {
679+
final ShardId shardId = indexShard.shardId();
680+
final Index index = shardId.getIndex();
681+
682+
// Check if this is a security index shard
683+
if (securityIndex.equals(index.getName())) {
684+
// Only trigger on primary shard to avoid multiple reloads
685+
if (indexShard.routingEntry() != null && indexShard.routingEntry().primary()) {
686+
if (isSecurityIndexRestoredFromSnapshot(clusterService, index, securityIndex)) {
687+
LOGGER.info("Security index primary shard {} started - " + "config reloading for snapshot restore", shardId);
688+
reloadConfiguration(CType.values());
689+
}
690+
}
691+
}
692+
}
671693
}

src/main/java/org/opensearch/security/support/SnapshotRestoreHelper.java

+22
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,18 @@
3030
import java.security.PrivilegedAction;
3131
import java.util.List;
3232
import java.util.Objects;
33+
import java.util.Optional;
3334

3435
import org.apache.logging.log4j.LogManager;
3536
import org.apache.logging.log4j.Logger;
3637

3738
import org.opensearch.SpecialPermission;
3839
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
3940
import org.opensearch.action.support.PlainActionFuture;
41+
import org.opensearch.cluster.RestoreInProgress;
42+
import org.opensearch.cluster.service.ClusterService;
4043
import org.opensearch.common.util.IndexUtils;
44+
import org.opensearch.core.index.Index;
4145
import org.opensearch.repositories.RepositoriesService;
4246
import org.opensearch.repositories.Repository;
4347
import org.opensearch.security.OpenSearchSecurityPlugin;
@@ -89,6 +93,24 @@ public static SnapshotInfo getSnapshotInfo(RestoreSnapshotRequest restoreRequest
8993
return snapshotInfo;
9094
}
9195

96+
public static boolean isSecurityIndexRestoredFromSnapshot(ClusterService clusterService, Index index, String securityIndex) {
97+
final String threadName = Thread.currentThread().getName();
98+
boolean isSecurityIndexRestoredFromSnapshot = false;
99+
100+
try {
101+
setCurrentThreadName("[" + ThreadPool.Names.GENERIC + "]");
102+
isSecurityIndexRestoredFromSnapshot = Optional.ofNullable(
103+
clusterService.state().<RestoreInProgress>custom(RestoreInProgress.TYPE)
104+
).map(restore -> restore.iterator().hasNext() && restore.iterator().next().indices().contains(securityIndex)).orElse(false);
105+
} catch (Exception e) {
106+
log.warn("Could not determine if index {} was restored from snapshot, assuming new index", index.getName(), e);
107+
} finally {
108+
setCurrentThreadName(threadName);
109+
}
110+
111+
return isSecurityIndexRestoredFromSnapshot;
112+
}
113+
92114
@SuppressWarnings("removal")
93115
private static void setCurrentThreadName(final String name) {
94116
final SecurityManager sm = System.getSecurityManager();

src/test/java/org/opensearch/security/configuration/ConfigurationRepositoryTest.java

+50
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.io.IOException;
1616
import java.nio.file.Path;
1717
import java.time.Instant;
18+
import java.util.Collections;
1819
import java.util.Set;
1920
import java.util.concurrent.TimeoutException;
2021

@@ -34,17 +35,22 @@
3435
import org.opensearch.cluster.ClusterChangedEvent;
3536
import org.opensearch.cluster.ClusterState;
3637
import org.opensearch.cluster.ClusterStateUpdateTask;
38+
import org.opensearch.cluster.RestoreInProgress;
3739
import org.opensearch.cluster.block.ClusterBlocks;
3840
import org.opensearch.cluster.metadata.IndexMetadata;
3941
import org.opensearch.cluster.metadata.MappingMetadata;
4042
import org.opensearch.cluster.metadata.Metadata;
4143
import org.opensearch.cluster.node.DiscoveryNode;
4244
import org.opensearch.cluster.node.DiscoveryNodes;
45+
import org.opensearch.cluster.routing.ShardRouting;
4346
import org.opensearch.cluster.service.ClusterService;
4447
import org.opensearch.common.Priority;
4548
import org.opensearch.common.settings.Settings;
4649
import org.opensearch.core.action.ActionListener;
50+
import org.opensearch.core.index.Index;
51+
import org.opensearch.core.index.shard.ShardId;
4752
import org.opensearch.core.rest.RestStatus;
53+
import org.opensearch.index.shard.IndexShard;
4854
import org.opensearch.security.DefaultObjectMapper;
4955
import org.opensearch.security.auditlog.AuditLog;
5056
import org.opensearch.security.securityconf.DynamicConfigFactory;
@@ -63,6 +69,7 @@
6369

6470
import org.mockito.ArgumentCaptor;
6571
import org.mockito.Mock;
72+
import org.mockito.Mockito;
6673
import org.mockito.junit.MockitoJUnitRunner;
6774
import org.mockito.stubbing.OngoingStubbing;
6875

@@ -82,9 +89,11 @@
8289
import static org.mockito.Mockito.anyString;
8390
import static org.mockito.Mockito.doAnswer;
8491
import static org.mockito.Mockito.doCallRealMethod;
92+
import static org.mockito.Mockito.doReturn;
8593
import static org.mockito.Mockito.mock;
8694
import static org.mockito.Mockito.never;
8795
import static org.mockito.Mockito.reset;
96+
import static org.mockito.Mockito.spy;
8897
import static org.mockito.Mockito.times;
8998
import static org.mockito.Mockito.verify;
9099
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -569,6 +578,47 @@ public void getConfigurationsFromIndex_SecurityIndexNotInitiallyReady() throws I
569578
assertThat(result.size(), is(CType.values().size()));
570579
}
571580

581+
@Test
582+
public void afterIndexShardStarted_whenSecurityIndexUpdated() throws InterruptedException, TimeoutException {
583+
Settings settings = Settings.builder().build();
584+
ConfigurationRepository configurationRepository = spy(createConfigurationRepository(settings));
585+
IndexShard indexShard = mock(IndexShard.class);
586+
ShardRouting shardRouting = mock(ShardRouting.class);
587+
ShardId shardId = mock(ShardId.class);
588+
Index index = mock(Index.class);
589+
ClusterState mockClusterState = mock(ClusterState.class);
590+
RestoreInProgress mockRestore = mock(RestoreInProgress.class);
591+
RestoreInProgress.Entry mockEntry = mock(RestoreInProgress.Entry.class);
592+
593+
// Setup mock behavior
594+
when(indexShard.shardId()).thenReturn(shardId);
595+
when(shardId.getIndex()).thenReturn(index);
596+
when(index.getName()).thenReturn(ConfigConstants.OPENDISTRO_SECURITY_DEFAULT_CONFIG_INDEX);
597+
when(indexShard.routingEntry()).thenReturn(shardRouting);
598+
when(clusterService.state()).thenReturn(mockClusterState);
599+
when(mockClusterState.custom(RestoreInProgress.TYPE)).thenReturn(mockRestore);
600+
601+
// when replica shard updated
602+
when(shardRouting.primary()).thenReturn(false);
603+
configurationRepository.afterIndexShardStarted(indexShard);
604+
verify(configurationRepository, never()).reloadConfiguration(any());
605+
606+
// when primary shard updated
607+
doReturn(true).when(configurationRepository).reloadConfiguration(any());
608+
when(shardRouting.primary()).thenReturn(true);
609+
when(mockRestore.iterator()).thenReturn(Collections.singletonList(mockEntry).iterator());
610+
when(mockEntry.indices()).thenReturn(Collections.singletonList(ConfigConstants.OPENDISTRO_SECURITY_DEFAULT_CONFIG_INDEX));
611+
configurationRepository.afterIndexShardStarted(indexShard);
612+
verify(configurationRepository).reloadConfiguration(CType.values());
613+
614+
// When there is error in checking if restored from snapshot
615+
Mockito.reset(configurationRepository);
616+
when(clusterService.state()).thenThrow(new RuntimeException("ClusterState exception"));
617+
when(shardRouting.primary()).thenReturn(true);
618+
configurationRepository.afterIndexShardStarted(indexShard);
619+
verify(configurationRepository, never()).reloadConfiguration(any());
620+
}
621+
572622
void assertClusterState(final ArgumentCaptor<ClusterStateUpdateTask> clusterStateUpdateTaskCaptor) throws Exception {
573623
final var initializedStateUpdate = clusterStateUpdateTaskCaptor.getValue();
574624
assertThat(initializedStateUpdate.priority(), is(Priority.IMMEDIATE));

0 commit comments

Comments
 (0)