Skip to content

Commit f2e81c5

Browse files
Akka.Cluster.Tools.Singleton / Akka.Cluster.Sharding: fix duplicate shards caused by incorrect ClusterSingletonManager HandOver (#7297)
* close #6973 - eliminate duplicate shards Eliminates the source of #6793, which was caused by using the incorrect ordering methodology when it came to determining which `ClusterSingletonManager` to hand-over to during member state transitions. close #6973 close #7196 * fixed build warnings
1 parent f4501e8 commit f2e81c5

File tree

6 files changed

+31
-310
lines changed

6 files changed

+31
-310
lines changed

src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonLeaseSpec.cs

-5
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,6 @@ public TestException(string message, Exception innerEx)
7171
: base(message, innerEx)
7272
{
7373
}
74-
75-
protected TestException(SerializationInfo info, StreamingContext context)
76-
: base(info, context)
77-
{
78-
}
7974
}
8075

8176
private readonly Cluster _cluster;

src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart3Spec.cs

-135
This file was deleted.

src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/MemberAgeOrderingSpec.cs

-89
This file was deleted.

src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs

+2-4
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public static Props Props(string singletonManagerPath, ClusterSingletonProxySett
7676
.WithDeploy(Deploy.Local);
7777
}
7878

79-
private readonly MemberAgeOrdering _memberAgeComparer;
79+
private readonly IComparer<Member> _memberAgeComparer;
8080
private readonly ClusterSingletonProxySettings _settings;
8181
private readonly Cluster _cluster = Cluster.Get(Context.System);
8282
private readonly Queue<KeyValuePair<object, IActorRef>> _buffer = new(); // queue seems to fit better
@@ -99,9 +99,7 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS
9999
_singletonPath = (singletonManagerPath + "/" + settings.SingletonName).Split('/');
100100
_identityId = CreateIdentifyId(_identityCounter);
101101

102-
_memberAgeComparer = settings.ConsiderAppVersion
103-
? MemberAgeOrdering.DescendingWithAppVersion
104-
: MemberAgeOrdering.Descending;
102+
_memberAgeComparer = Member.AgeOrdering;
105103
_membersByAge = ImmutableSortedSet<Member>.Empty.WithComparer(_memberAgeComparer);
106104

107105
Receive<ClusterEvent.CurrentClusterState>(s => HandleInitial(s));

src/contrib/cluster/Akka.Cluster.Tools/Singleton/MemberAgeOrdering.cs

-57
This file was deleted.

src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs

+29-20
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public OldestChanged(UniqueAddress oldest)
9292

9393
#endregion
9494

95-
private readonly MemberAgeOrdering _memberAgeComparer;
95+
private readonly IComparer<Member> _memberAgeComparer;
9696
private readonly CoordinatedShutdown _coordShutdown = CoordinatedShutdown.Get(Context.System);
9797

9898
/// <summary>
@@ -103,9 +103,7 @@ public OldestChanged(UniqueAddress oldest)
103103
public OldestChangedBuffer(string role, bool considerAppVersion)
104104
{
105105
_role = role;
106-
_memberAgeComparer = considerAppVersion
107-
? MemberAgeOrdering.DescendingWithAppVersion
108-
: MemberAgeOrdering.Descending;
106+
_memberAgeComparer = Member.AgeOrdering;
109107
_membersByAge = ImmutableSortedSet<Member>.Empty.WithComparer(_memberAgeComparer);
110108

111109
SetupCoordinatedShutdown();
@@ -147,8 +145,7 @@ private void TrackChanges(Action block)
147145
var before = _membersByAge.FirstOrDefault();
148146
block();
149147
var after = _membersByAge.FirstOrDefault();
150-
151-
// todo: fix neq comparison
148+
152149
if (!Equals(before, after))
153150
_changes = _changes.Enqueue(new OldestChanged(after?.UniqueAddress));
154151
}
@@ -221,21 +218,33 @@ protected override void PostStop()
221218
/// <inheritdoc cref="UntypedActor.OnReceive"/>
222219
protected override void OnReceive(object message)
223220
{
224-
if (message is ClusterEvent.CurrentClusterState state) HandleInitial(state);
225-
else if (message is ClusterEvent.MemberUp up) Add(up.Member);
226-
else if (message is ClusterEvent.MemberRemoved removed) Remove(removed.Member);
227-
else if (message is ClusterEvent.MemberExited exited && exited.Member.UniqueAddress != _cluster.SelfUniqueAddress)
228-
Remove(exited.Member);
229-
else if (message is SelfExiting)
221+
switch (message)
230222
{
231-
Remove(_cluster.ReadView.Self);
232-
Sender.Tell(Done.Instance); // reply to ask
233-
}
234-
else if (message is GetNext && _changes.IsEmpty) Context.BecomeStacked(OnDeliverNext);
235-
else if (message is GetNext) SendFirstChange();
236-
else
237-
{
238-
Unhandled(message);
223+
case ClusterEvent.CurrentClusterState state:
224+
HandleInitial(state);
225+
break;
226+
case ClusterEvent.MemberUp up:
227+
Add(up.Member);
228+
break;
229+
case ClusterEvent.MemberRemoved removed:
230+
Remove(removed.Member);
231+
break;
232+
case ClusterEvent.MemberExited exited when exited.Member.UniqueAddress != _cluster.SelfUniqueAddress:
233+
Remove(exited.Member);
234+
break;
235+
case SelfExiting:
236+
Remove(_cluster.ReadView.Self);
237+
Sender.Tell(Done.Instance); // reply to ask
238+
break;
239+
case GetNext when _changes.IsEmpty:
240+
Context.BecomeStacked(OnDeliverNext);
241+
break;
242+
case GetNext:
243+
SendFirstChange();
244+
break;
245+
default:
246+
Unhandled(message);
247+
break;
239248
}
240249
}
241250

0 commit comments

Comments
 (0)