Skip to content

Commit c0dc716

Browse files
authored
Add Cluster.Sharding DData backward compatibility wire format mode (#6775)
* Add Cluster.Sharding DData backward compatibility wire format mode * Add unit test * Update API Verify list
1 parent 8d550bf commit c0dc716

File tree

8 files changed

+73
-3
lines changed

8 files changed

+73
-3
lines changed

src/contrib/cluster/Akka.Cluster.Sharding/Properties/AssemblyInfo.cs

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
// associated with an assembly.
1515
[assembly: InternalsVisibleTo("Akka.Cluster.Sharding.Tests")]
1616
[assembly: InternalsVisibleTo("Akka.Cluster.Sharding.Tests.MultiNode")]
17+
[assembly: InternalsVisibleTo("Akka.DistributedData.Tests")]
1718

1819
// Setting ComVisible to false makes the types in this assembly not visible
1920
// to COM components. If you need to access a type in this assembly from

src/contrib/cluster/Akka.Cluster.Sharding/Serialization/ClusterShardingMessageSerializer.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ private static EventSourcedRememberEntitiesShardStore.EntitiesStopped EntitiesSt
446446
}
447447

448448
//
449-
// ShardCoordinator.State
449+
// ShardCoordinator.CoordinatorState
450450
//
451451
private static Proto.Msg.CoordinatorState CoordinatorStateToProto(ShardCoordinator.CoordinatorState state)
452452
{

src/contrib/cluster/Akka.Cluster.Sharding/reference.conf

+4
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,10 @@ akka.cluster.sharding {
214214
# can become to large if including to many in same message. Limit to
215215
# the same number as the number of ORSet per shard.
216216
max-delta-elements = 5
217+
218+
# Turn on backward compatibility wire format mode that allows Akka.Cluster.Sharding
219+
# v1.5.8 distributed data to communicate with v1.4.x
220+
backward-compatible-wire-format = false
217221
}
218222
# The id of the dispatcher to use for ClusterSharding actors.
219223
# If not specified, the internal dispatcher is used.

src/contrib/cluster/Akka.DistributedData.Tests/Akka.DistributedData.Tests.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11+
<ProjectReference Include="..\Akka.Cluster.Sharding\Akka.Cluster.Sharding.csproj" />
1112
<ProjectReference Include="..\Akka.DistributedData.LightningDB\Akka.DistributedData.LightningDB.csproj" />
1213
<ProjectReference Include="..\Akka.DistributedData\Akka.DistributedData.csproj" />
1314
<ProjectReference Include="..\..\..\core\Akka.Tests.Shared.Internals\Akka.Tests.Shared.Internals.csproj" />
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// //-----------------------------------------------------------------------
2+
// // <copyright file="ReplicatedDataSerializerBackCompatSpec.cs" company="Akka.NET Project">
3+
// // Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
4+
// // Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// // </copyright>
6+
// //-----------------------------------------------------------------------
7+
8+
using Akka.Actor;
9+
using Akka.Cluster.Sharding;
10+
using Akka.DistributedData.Serialization;
11+
using Akka.DistributedData.Serialization.Proto.Msg;
12+
using FluentAssertions;
13+
using Xunit;
14+
using UniqueAddress = Akka.Cluster.UniqueAddress;
15+
using static FluentAssertions.FluentActions;
16+
17+
namespace Akka.DistributedData.Tests.Serialization;
18+
19+
public class ReplicatedDataSerializerBackCompatSpec
20+
{
21+
private readonly ReplicatedDataSerializer _serializer;
22+
private readonly UniqueAddress _address;
23+
24+
public ReplicatedDataSerializerBackCompatSpec()
25+
{
26+
var sys = ActorSystem.Create("test", @"
27+
akka.actor.provider = cluster
28+
akka.cluster.sharding.distributed-data.backward-compatible-wire-format = true");
29+
_serializer = new ReplicatedDataSerializer((ExtendedActorSystem)sys);
30+
_address = Cluster.Cluster.Get(sys).SelfUniqueAddress;
31+
sys.Terminate();
32+
}
33+
34+
[Fact(DisplayName = "DData replicated data serializer should serialize and deserialize correct backward compatible proto message")]
35+
public void SerializeTest()
36+
{
37+
var lwwReg = new LWWRegister<ShardCoordinator.CoordinatorState>(_address, ShardCoordinator.CoordinatorState.Empty);
38+
var bytes = _serializer.ToBinary(lwwReg);
39+
var proto = LWWRegister.Parser.ParseFrom(bytes);
40+
41+
// Serialized type name should be equal to the old v1.4 coordinator state FQCN
42+
proto.TypeInfo.TypeName.Should().Be("Akka.Cluster.Sharding.PersistentShardCoordinator+State, Akka.Cluster.Sharding");
43+
44+
// Deserializing the same message should succeed
45+
Invoking(() => _serializer.FromBinary(bytes, _serializer.Manifest(lwwReg)))
46+
.Should().NotThrow()
47+
.And.Subject().Should().BeOfType<LWWRegister<ShardCoordinator.CoordinatorState>>();
48+
}
49+
}

src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs

+15-2
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,13 @@ public sealed class ReplicatedDataSerializer : SerializerWithStringManifest
6666

6767
private readonly byte[] _emptyArray = Array.Empty<byte>();
6868

69+
private readonly bool _backwardCompatWireFormat;
70+
6971
public ReplicatedDataSerializer(ExtendedActorSystem system) : base(system)
7072
{
7173
_ser = new SerializationSupport(system);
74+
_backwardCompatWireFormat =
75+
system.Settings.Config.GetBoolean("akka.cluster.sharding.distributed-data.backward-compatible-wire-format");
7276
}
7377

7478

@@ -735,6 +739,11 @@ private Proto.Msg.LWWRegister LWWToProto<T>(ILWWRegister r)
735739
pLww.State = _ser.OtherMessageToProto(register.Value);
736740
pLww.Timestamp = register.Timestamp;
737741
pLww.TypeInfo = GetTypeDescriptor(r.RegisterType);
742+
743+
// HACK: Really really ugly hack to make sure that v1.5 DData cluster sharding works with v1.4
744+
if(_backwardCompatWireFormat && pLww.TypeInfo.TypeName == "Akka.Cluster.Sharding.ShardCoordinator+CoordinatorState, Akka.Cluster.Sharding")
745+
pLww.TypeInfo.TypeName = "Akka.Cluster.Sharding.PersistentShardCoordinator+State, Akka.Cluster.Sharding";
746+
738747
return pLww;
739748
}
740749

@@ -766,9 +775,13 @@ private ILWWRegister LWWRegisterFromProto(Proto.Msg.LWWRegister proto)
766775
}
767776
case ValType.Other:
768777
{
778+
// HACK: Really really ugly hack to make sure that v1.5 DData cluster sharding works with v1.4
779+
var typeName = proto.TypeInfo.TypeName;
780+
if (typeName == "Akka.Cluster.Sharding.PersistentShardCoordinator+State, Akka.Cluster.Sharding")
781+
typeName = "Akka.Cluster.Sharding.ShardCoordinator+CoordinatorState, Akka.Cluster.Sharding";
782+
769783
// runtime type - enter horrible dynamic serialization stuff
770-
771-
var setContentType = Type.GetType(proto.TypeInfo.TypeName);
784+
var setContentType = Type.GetType(typeName);
772785

773786
var setType = LWWRegisterMaker.MakeGenericMethod(setContentType);
774787
return (ILWWRegister)setType.Invoke(this, new object[] { proto });

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")]
22
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests")]
33
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests.MultiNode")]
4+
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.DistributedData.Tests")]
45
[assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)]
56
[assembly: System.Runtime.InteropServices.GuidAttribute("a05c31e8-0246-46a1-b3bc-4d6fe7a9aa49")]
67
[assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETCoreApp,Version=v6.0", FrameworkDisplayName=".NET 6.0")]

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")]
22
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests")]
33
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests.MultiNode")]
4+
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.DistributedData.Tests")]
45
[assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)]
56
[assembly: System.Runtime.InteropServices.GuidAttribute("a05c31e8-0246-46a1-b3bc-4d6fe7a9aa49")]
67
[assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETStandard,Version=v2.0", FrameworkDisplayName=".NET Standard 2.0")]

0 commit comments

Comments
 (0)