Skip to content

Indices deletion in leader cluster do not get replicated in follower cluster #1556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.autofollow.concurrent_replication_jobs_trigger_size", 3, 1, 10,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_REPLICATE_INDEX_DELETION: Setting<Boolean> = Setting.boolSetting("plugins.replication.replicate.delete_index", false,
Setting.Property.Dynamic, Setting.Property.NodeScope)
}

override fun createComponents(client: Client, clusterService: ClusterService, threadPool: ThreadPool,
Expand Down Expand Up @@ -363,7 +365,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL, REPLICATION_METADATA_SYNC_INTERVAL,
REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION, REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING,
REPLICATION_INDEX_TRANSLOG_RETENTION_SIZE, REPLICATION_FOLLOWER_BLOCK_START, REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE,
REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD)
REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD, REPLICATION_REPLICATE_INDEX_DELETION)
}
override fun getInternalRepositories(env: Environment, namedXContentRegistry: NamedXContentRegistry,
clusterService: ClusterService, recoverySettings: RecoverySettings): Map<String, Repository.Factory> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import org.opensearch.core.action.ActionListener
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest
import org.opensearch.action.admin.indices.delete.DeleteIndexAction
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
Expand Down Expand Up @@ -104,6 +105,9 @@ import java.util.stream.Collectors
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import org.opensearch.commons.replication.action.ReplicationActions.INTERNAL_STOP_REPLICATION_ACTION_TYPE
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.replication.ReplicationPlugin
import kotlin.streams.toList
import org.opensearch.cluster.DiffableUtils

Expand Down Expand Up @@ -222,8 +226,13 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
updateMetadata()
}
is FailedState -> {
// Try pausing first if we get Failed state. This returns failed state if pause failed
pauseReplication(state)
// Try pausing or stopping if we get Failed state based on settings.
// This returns failed state if pause or stop failed
if (clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_REPLICATE_INDEX_DELETION)) {
stopReplication(state)
} else {
pauseReplication(state)
}
}
else -> {
state
Expand Down Expand Up @@ -682,6 +691,50 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
return MonitoringState
}

private suspend fun stopReplication(state: FailedState): IndexReplicationState {
try {
log.info("Going to initiate stop of $followerIndexName due to deletion of leader index")
val stopReplicationResponse = client.suspendExecute(
replicationMetadata,
INTERNAL_STOP_REPLICATION_ACTION_TYPE, StopIndexReplicationRequest(followerIndexName),
defaultContext = true
)
if (!stopReplicationResponse.isAcknowledged) {
throw ReplicationException(
"Failed to gracefully stop replication after deletion of leader index. " +
"Replication tasks may need to be stopped manually and deleted follower index."
)
}
} catch (e: CancellationException) {
log.error("Encountered CancellationException while stopping $followerIndexName, ignoring it", e)
} catch (e: Exception) {
log.error("Encountered exception while stopping $followerIndexName", e)
return FailedState(state.failedShards,
"Stop failed with \"${e.message}\". Original failure for initiating stop - ${state.errorMsg}")
}
return CompletedState
}

private suspend fun deleteIndex() {
try {
log.info("Deleting the index $followerIndexName due to deletion of leader index")
val deleteIndexResponse = client.suspendExecute(
replicationMetadata,
DeleteIndexAction.INSTANCE, DeleteIndexRequest(followerIndexName),
defaultContext = true
)
if (!deleteIndexResponse.isAcknowledged) {
throw ReplicationException(
"Failed to gracefully delete the follower index after deletion of the leader index. " +
"Follower index may need to be deleted manually."
)
}
} catch (e: Exception) {
log.error("Encountered exception while deleting $followerIndexName", e)
throw e
}
}

private fun findAllReplicationFailedShardTasks(followerIndexName: String, clusterState: ClusterState)
:Map<ShardId, PersistentTask<ShardReplicationParams>> {
val persistentTasks = clusterState.metadata.custom<PersistentTasksCustomMetadata>(PersistentTasksCustomMetadata.TYPE)
Expand Down Expand Up @@ -718,6 +771,12 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
client.execute(PauseIndexReplicationAction.INSTANCE,
PauseIndexReplicationRequest(followerIndexName, TASK_CANCELLATION_REASON))
}

// Deleting the follower index if replication is stopped because of leader index deletion
if (clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_REPLICATE_INDEX_DELETION)
&& currentTaskState.state == ReplicationState.COMPLETED) {
deleteIndex()
}
}

/* This is to minimise overhead of calling an additional listener as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.client.Request
import org.opensearch.client.RequestOptions
Expand Down Expand Up @@ -243,6 +244,40 @@ class StopReplicationIT: MultiClusterRestTestCase() {
followerClient.index(IndexRequest(followerIndexName).id("2").source(sourceMap), RequestOptions.DEFAULT)
}

fun `test delete follower index when leader index is unavailable`() {
val followerIndexName2 = "follower_index2"
val leaderIndexName2 = "leader_index2"
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)

// Enabling the replication of delete index
val settings = Settings.builder()
.put("plugins.replication.replicate.delete_index", true)
.build()
val request = ClusterUpdateSettingsRequest()
request.transientSettings(settings)
followerClient.cluster().putSettings(request, RequestOptions.DEFAULT)

createConnectionBetweenClusters(FOLLOWER, LEADER, "source")
val createIndex1Response = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndex1Response.isAcknowledged).isTrue()
val createIndex2Response = leaderClient.indices().create(CreateIndexRequest(leaderIndexName2), RequestOptions.DEFAULT)
assertThat(createIndex2Response.isAcknowledged).isTrue()
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName),
waitForRestore = true)
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName2, followerIndexName2),
waitForRestore = true)

val deleteResponse = leaderClient.indices().delete(DeleteIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(deleteResponse.isAcknowledged)

// Make sure follower index got deleted after it is deleted from the leader, and it didn't affect any other indexes
assertBusy({
Assert.assertFalse(followerClient.indices().exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT))
}, 30, TimeUnit.SECONDS)
Assert.assertTrue(followerClient.indices().exists(GetIndexRequest(followerIndexName2), RequestOptions.DEFAULT))
}

fun `test stop replication with stale replication settings at leader cluster`() {

Assume.assumeFalse(SNAPSHOTS_NOT_ACCESSIBLE_FOR_REMOTE_CLUSTERS, checkifIntegTestRemote())
Expand Down