Skip to content

Commit 7174005

Browse files
lenaschoenburggithub-actions[bot]
authored andcommitted
fix: notify new SnapshotReplicationListeners about ongoing replication
We are using `SnapshotReplicationListener`s to transition to inactive when a snapshot replication starts. Snapshot replication can start before any listeners have registered, which means that the listener will only be notified about snapshot replication finishing, triggering a transition to follower without first transitioning to inactive. Here we are keeping track of ongoing snapshot replication to immediately notify listeners when they are registering. (cherry picked from commit 3d1f6c5)
1 parent b4fad15 commit 7174005

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

atomix/cluster/src/main/java/io/atomix/raft/impl/RaftContext.java

+8
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ public class RaftContext implements AutoCloseable, HealthMonitorable {
127127
private PersistedSnapshot currentSnapshot;
128128

129129
private boolean ongoingTransition = false;
130+
private boolean ongoingSnapshotReplication = false;
130131

131132
@SuppressWarnings("java:S3077") // allow volatile here, health is immutable
132133
private volatile HealthReport health = HealthReport.healthy(this);
@@ -469,6 +470,11 @@ public long setCommitIndex(final long commitIndex) {
469470
public void addSnapshotReplicationListener(
470471
final SnapshotReplicationListener snapshotReplicationListener) {
471472
snapshotReplicationListeners.add(snapshotReplicationListener);
473+
if (ongoingSnapshotReplication) {
474+
// Notify listener immediately if it registered during an ongoing replication.
475+
// This is to prevent missing necessary state transitions.
476+
snapshotReplicationListener.onSnapshotReplicationStarted();
477+
}
472478
}
473479

474480
/**
@@ -482,11 +488,13 @@ public void removeSnapshotReplicationListener(
482488
}
483489

484490
public void notifySnapshotReplicationStarted() {
491+
ongoingSnapshotReplication = true;
485492
snapshotReplicationListeners.forEach(SnapshotReplicationListener::onSnapshotReplicationStarted);
486493
}
487494

488495
public void notifySnapshotReplicationCompleted() {
489496
snapshotReplicationListeners.forEach(l -> l.onSnapshotReplicationCompleted(term));
497+
ongoingSnapshotReplication = false;
490498
}
491499

492500
/**

atomix/cluster/src/test/java/io/atomix/raft/SnapshotReplicationListenerTest.java

+13
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
1919
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.never;
2021
import static org.mockito.Mockito.timeout;
2122
import static org.mockito.Mockito.verify;
2223

@@ -59,4 +60,16 @@ public void shouldNotifySnapshotReplicationListener() throws Throwable {
5960
verify(snapshotReplicationListener, timeout(1_000).times(1))
6061
.onSnapshotReplicationCompleted(follower.getTerm());
6162
}
63+
64+
@Test
65+
public void shouldNotifyOnRegisteringListener() {
66+
// given
67+
final var snapshotReplicationListener = mock(SnapshotReplicationListener.class);
68+
final var follower = raftRule.getFollower().orElseThrow();
69+
// then
70+
follower.getContext().notifySnapshotReplicationStarted();
71+
verify(snapshotReplicationListener, never()).onSnapshotReplicationStarted();
72+
follower.getContext().addSnapshotReplicationListener(snapshotReplicationListener);
73+
verify(snapshotReplicationListener).onSnapshotReplicationStarted();
74+
}
6275
}

0 commit comments

Comments
 (0)