Skip to content

Commit e57af3b

Browse files
author
Sachin Kale
committed
Revert "Extract replicator logic from SegmentReplicationTargetService (#15511)"
This reverts commit 2224d48.
1 parent 54e32f5 commit e57af3b

File tree

3 files changed

+115
-223
lines changed

3 files changed

+115
-223
lines changed

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

Lines changed: 114 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
14+
import org.apache.lucene.index.CorruptIndexException;
1415
import org.opensearch.ExceptionsHelper;
16+
import org.opensearch.OpenSearchCorruptionException;
1517
import org.opensearch.action.support.ChannelActionListener;
1618
import org.opensearch.cluster.ClusterChangedEvent;
1719
import org.opensearch.cluster.ClusterStateListener;
@@ -22,6 +24,7 @@
2224
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
2325
import org.opensearch.common.settings.Settings;
2426
import org.opensearch.common.util.CancellableThreads;
27+
import org.opensearch.common.util.concurrent.AbstractRunnable;
2528
import org.opensearch.common.util.concurrent.ConcurrentCollections;
2629
import org.opensearch.core.action.ActionListener;
2730
import org.opensearch.core.index.shard.ShardId;
@@ -30,6 +33,7 @@
3033
import org.opensearch.index.shard.IndexEventListener;
3134
import org.opensearch.index.shard.IndexShard;
3235
import org.opensearch.index.shard.IndexShardState;
36+
import org.opensearch.index.store.Store;
3337
import org.opensearch.indices.IndicesService;
3438
import org.opensearch.indices.recovery.FileChunkRequest;
3539
import org.opensearch.indices.recovery.ForceSyncRequest;
@@ -57,7 +61,7 @@
5761
import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT;
5862

5963
/**
60-
* Service class that handles incoming checkpoints to initiate replication events on replicas.
64+
* Service class that orchestrates replication events on replicas.
6165
*
6266
* @opensearch.internal
6367
*/
@@ -68,14 +72,17 @@ public class SegmentReplicationTargetService extends AbstractLifecycleComponent
6872
private final ThreadPool threadPool;
6973
private final RecoverySettings recoverySettings;
7074

75+
private final ReplicationCollection<SegmentReplicationTarget> onGoingReplications;
76+
77+
private final Map<ShardId, SegmentReplicationState> completedReplications = ConcurrentCollections.newConcurrentMap();
78+
7179
private final SegmentReplicationSourceFactory sourceFactory;
7280

7381
protected final Map<ShardId, ReplicationCheckpoint> latestReceivedCheckpoint = ConcurrentCollections.newConcurrentMap();
7482

7583
private final IndicesService indicesService;
7684
private final ClusterService clusterService;
7785
private final TransportService transportService;
78-
private final SegmentReplicator replicator;
7986

8087
/**
8188
* The internal actions
@@ -87,7 +94,6 @@ public static class Actions {
8794
public static final String FORCE_SYNC = "internal:index/shard/replication/segments_sync";
8895
}
8996

90-
@Deprecated
9197
public SegmentReplicationTargetService(
9298
final ThreadPool threadPool,
9399
final RecoverySettings recoverySettings,
@@ -107,7 +113,6 @@ public SegmentReplicationTargetService(
107113
);
108114
}
109115

110-
@Deprecated
111116
public SegmentReplicationTargetService(
112117
final ThreadPool threadPool,
113118
final RecoverySettings recoverySettings,
@@ -116,34 +121,14 @@ public SegmentReplicationTargetService(
116121
final IndicesService indicesService,
117122
final ClusterService clusterService,
118123
final ReplicationCollection<SegmentReplicationTarget> ongoingSegmentReplications
119-
) {
120-
this(
121-
threadPool,
122-
recoverySettings,
123-
transportService,
124-
sourceFactory,
125-
indicesService,
126-
clusterService,
127-
new SegmentReplicator(threadPool)
128-
);
129-
}
130-
131-
public SegmentReplicationTargetService(
132-
final ThreadPool threadPool,
133-
final RecoverySettings recoverySettings,
134-
final TransportService transportService,
135-
final SegmentReplicationSourceFactory sourceFactory,
136-
final IndicesService indicesService,
137-
final ClusterService clusterService,
138-
final SegmentReplicator replicator
139124
) {
140125
this.threadPool = threadPool;
141126
this.recoverySettings = recoverySettings;
127+
this.onGoingReplications = ongoingSegmentReplications;
142128
this.sourceFactory = sourceFactory;
143129
this.indicesService = indicesService;
144130
this.clusterService = clusterService;
145131
this.transportService = transportService;
146-
this.replicator = replicator;
147132

148133
transportService.registerRequestHandler(
149134
Actions.FILE_CHUNK,
@@ -169,7 +154,7 @@ protected void doStart() {
169154
@Override
170155
protected void doStop() {
171156
if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
172-
assert replicator.size() == 0 : "Replication collection should be empty on shutdown";
157+
assert onGoingReplications.size() == 0 : "Replication collection should be empty on shutdown";
173158
clusterService.removeListener(this);
174159
}
175160
}
@@ -214,7 +199,7 @@ public void clusterChanged(ClusterChangedEvent event) {
214199
@Override
215200
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
216201
if (indexShard != null && indexShard.indexSettings().isSegRepEnabledOrRemoteNode()) {
217-
replicator.cancel(indexShard.shardId(), "Shard closing");
202+
onGoingReplications.cancelForShard(indexShard.shardId(), "Shard closing");
218203
latestReceivedCheckpoint.remove(shardId);
219204
}
220205
}
@@ -239,7 +224,7 @@ public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting ol
239224
&& indexShard.indexSettings().isSegRepEnabledOrRemoteNode()
240225
&& oldRouting.primary() == false
241226
&& newRouting.primary()) {
242-
replicator.cancel(indexShard.shardId(), "Shard has been promoted to primary");
227+
onGoingReplications.cancelForShard(indexShard.shardId(), "Shard has been promoted to primary");
243228
latestReceivedCheckpoint.remove(indexShard.shardId());
244229
}
245230
}
@@ -249,15 +234,17 @@ public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting ol
249234
*/
250235
@Nullable
251236
public SegmentReplicationState getOngoingEventSegmentReplicationState(ShardId shardId) {
252-
return Optional.ofNullable(replicator.get(shardId)).map(SegmentReplicationTarget::state).orElse(null);
237+
return Optional.ofNullable(onGoingReplications.getOngoingReplicationTarget(shardId))
238+
.map(SegmentReplicationTarget::state)
239+
.orElse(null);
253240
}
254241

255242
/**
256243
* returns SegmentReplicationState of latest completed segment replication events.
257244
*/
258245
@Nullable
259246
public SegmentReplicationState getlatestCompletedEventSegmentReplicationState(ShardId shardId) {
260-
return replicator.getCompleted(shardId);
247+
return completedReplications.get(shardId);
261248
}
262249

263250
/**
@@ -270,11 +257,11 @@ public SegmentReplicationState getSegmentReplicationState(ShardId shardId) {
270257
}
271258

272259
public ReplicationRef<SegmentReplicationTarget> get(long replicationId) {
273-
return replicator.get(replicationId);
260+
return onGoingReplications.get(replicationId);
274261
}
275262

276263
public SegmentReplicationTarget get(ShardId shardId) {
277-
return replicator.get(shardId);
264+
return onGoingReplications.getOngoingReplicationTarget(shardId);
278265
}
279266

280267
/**
@@ -298,7 +285,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
298285
// checkpoint to be replayed once the shard is Active.
299286
if (replicaShard.state().equals(IndexShardState.STARTED) == true) {
300287
// Checks if received checkpoint is already present and ahead then it replaces old received checkpoint
301-
SegmentReplicationTarget ongoingReplicationTarget = replicator.get(replicaShard.shardId());
288+
SegmentReplicationTarget ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(replicaShard.shardId());
302289
if (ongoingReplicationTarget != null) {
303290
if (ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) {
304291
logger.debug(
@@ -517,12 +504,28 @@ public SegmentReplicationTarget startReplication(
517504
final ReplicationCheckpoint checkpoint,
518505
final SegmentReplicationListener listener
519506
) {
520-
return replicator.startReplication(indexShard, checkpoint, sourceFactory.get(indexShard), listener);
507+
final SegmentReplicationTarget target = new SegmentReplicationTarget(
508+
indexShard,
509+
checkpoint,
510+
sourceFactory.get(indexShard),
511+
listener
512+
);
513+
startReplication(target);
514+
return target;
521515
}
522516

523517
// pkg-private for integration tests
524518
void startReplication(final SegmentReplicationTarget target) {
525-
replicator.startReplication(target, recoverySettings.activityTimeout());
519+
final long replicationId;
520+
try {
521+
replicationId = onGoingReplications.startSafe(target, recoverySettings.activityTimeout());
522+
} catch (ReplicationFailedException e) {
523+
// replication already running for shard.
524+
target.fail(e, false);
525+
return;
526+
}
527+
logger.trace(() -> new ParameterizedMessage("Added new replication to collection {}", target.description()));
528+
threadPool.generic().execute(new ReplicationRunner(replicationId));
526529
}
527530

528531
/**
@@ -547,14 +550,89 @@ default void onFailure(ReplicationState state, ReplicationFailedException e, boo
547550
void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure);
548551
}
549552

553+
/**
554+
* Runnable implementation to trigger a replication event.
555+
*/
556+
private class ReplicationRunner extends AbstractRunnable {
557+
558+
final long replicationId;
559+
560+
public ReplicationRunner(long replicationId) {
561+
this.replicationId = replicationId;
562+
}
563+
564+
@Override
565+
public void onFailure(Exception e) {
566+
onGoingReplications.fail(replicationId, new ReplicationFailedException("Unexpected Error during replication", e), false);
567+
}
568+
569+
@Override
570+
public void doRun() {
571+
start(replicationId);
572+
}
573+
}
574+
575+
private void start(final long replicationId) {
576+
final SegmentReplicationTarget target;
577+
try (ReplicationRef<SegmentReplicationTarget> replicationRef = onGoingReplications.get(replicationId)) {
578+
// This check is for handling edge cases where the reference is removed before the ReplicationRunner is started by the
579+
// threadpool.
580+
if (replicationRef == null) {
581+
return;
582+
}
583+
target = replicationRef.get();
584+
}
585+
target.startReplication(new ActionListener<>() {
586+
@Override
587+
public void onResponse(Void o) {
588+
logger.debug(() -> new ParameterizedMessage("Finished replicating {} marking as done.", target.description()));
589+
onGoingReplications.markAsDone(replicationId);
590+
if (target.state().getIndex().recoveredFileCount() != 0 && target.state().getIndex().recoveredBytes() != 0) {
591+
completedReplications.put(target.shardId(), target.state());
592+
}
593+
}
594+
595+
@Override
596+
public void onFailure(Exception e) {
597+
logger.debug("Replication failed {}", target.description());
598+
if (isStoreCorrupt(target) || e instanceof CorruptIndexException || e instanceof OpenSearchCorruptionException) {
599+
onGoingReplications.fail(replicationId, new ReplicationFailedException("Store corruption during replication", e), true);
600+
return;
601+
}
602+
onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), false);
603+
}
604+
});
605+
}
606+
607+
private boolean isStoreCorrupt(SegmentReplicationTarget target) {
608+
// ensure target is not already closed. In that case
609+
// we can assume the store is not corrupt and that the replication
610+
// event completed successfully.
611+
if (target.refCount() > 0) {
612+
final Store store = target.store();
613+
if (store.tryIncRef()) {
614+
try {
615+
return store.isMarkedCorrupted();
616+
} catch (IOException ex) {
617+
logger.warn("Unable to determine if store is corrupt", ex);
618+
return false;
619+
} finally {
620+
store.decRef();
621+
}
622+
}
623+
}
624+
// store already closed.
625+
return false;
626+
}
627+
550628
private class FileChunkTransportRequestHandler implements TransportRequestHandler<FileChunkRequest> {
551629

552630
// How many bytes we've copied since we last called RateLimiter.pause
553631
final AtomicLong bytesSinceLastPause = new AtomicLong();
554632

555633
@Override
556634
public void messageReceived(final FileChunkRequest request, TransportChannel channel, Task task) throws Exception {
557-
try (ReplicationRef<SegmentReplicationTarget> ref = replicator.get(request.recoveryId(), request.shardId())) {
635+
try (ReplicationRef<SegmentReplicationTarget> ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) {
558636
final SegmentReplicationTarget target = ref.get();
559637
final ActionListener<Void> listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
560638
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.replicationRateLimiter(), listener);

0 commit comments

Comments
 (0)