Skip to content

Commit 96cd8db

Browse files
feature: Akka.Delivery - reliable point-to-point delivery + message chunking (#6720)
* feature: Akka.Delivery Added core Akka.Delivery classes: * `ProducerController` * `ConsumerController` * `ChunkedMessage` * fixed missing curly brace * enabled `#nullable enable` on new APIs * checking in API changes with nullability enabled * fixed some XML-DOC errors * added Akka.Delivery protobuf definitions * added Akka.Cluster.Sharding components * finished integrating Akka.Cluster.Sharding.Delivery tests * fixed XML-DOC error * added type manifest data to ReliableDelivery protos * working on generics serialization * fix compilation error * have sequenced message serialization down * fleshing out more serialized types * added chunk detection to MessageSent * completed serializer * moved reliable delivery serializer to Akka.Cluster * fixed serialization namespaces * defining ReliableDeliverySerializerSpecs * finished all serialization specs * added API approvals * added event-sourced durable producer queue * finished event sourced queue implementation - added API approvals * cleanedup persistence code * fixed some EventSourcedProducerQueue bugs * finished all EventSourcedProducerQueue specs * fix DocFx issue * added reliable delivery specs on top of event-sourced queue * increased timeouts reviewed the logs - the tests just took longer to run on AzDo than my development machine. Everything was still operating normally but SeqNr 42 hadn't been hit yet by any of the consumers at the time the test terminated (they were at 37 / 38) * reformat * adding DurableShardingSpec * fixed durable state bug in ShardingProducerControllerImpl * finished final spec * updating ShoppingCart sample * made Akka.Remote "Failed to write message to the transport" more explicit * adding ShardingEnvelope to ClusterMessageSerializer * finished `ShardingEnvelope` serialization support * fixed Uri encoding error for sharded consumer controllers * fixed code sample * added API approvals * fix serialization error during chunking * added test to validate error with chunking and acks * disable chunking support for Akka.Cluster.Sharding * fixed config loading error * fixed compilation error * fixes and final API approvals
1 parent 899e62f commit 96cd8db

File tree

64 files changed

+13172
-100
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+13172
-100
lines changed

build.fsx

+1-1
Original file line numberDiff line numberDiff line change
@@ -439,12 +439,12 @@ Target "Protobuf" <| fun _ ->
439439
let protoFiles = [
440440
("WireFormats.proto", "/src/core/Akka.Remote/Serialization/Proto/");
441441
("ContainerFormats.proto", "/src/core/Akka.Remote/Serialization/Proto/");
442-
("ContainerFormats.proto", "/src/core/Akka.Remote/Serialization/Proto/");
443442
("SystemMessageFormats.proto", "/src/core/Akka.Remote/Serialization/Proto/");
444443
("ClusterMessages.proto", "/src/core/Akka.Cluster/Serialization/Proto/");
445444
("ClusterClientMessages.proto", "/src/contrib/cluster/Akka.Cluster.Tools/Client/Serialization/Proto/");
446445
("DistributedPubSubMessages.proto", "/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Serialization/Proto/");
447446
("ClusterShardingMessages.proto", "/src/contrib/cluster/Akka.Cluster.Sharding/Serialization/Proto/");
447+
("ReliableDelivery.proto", "/src/core/Akka.Cluster/Serialization/Proto/");
448448
("TestConductorProtocol.proto", "/src/core/Akka.Remote.TestKit/Proto/");
449449
("Persistence.proto", "/src/core/Akka.Persistence/Serialization/Proto/");
450450
("StreamRefMessages.proto", "/src/core/Akka.Streams/Serialization/Proto/");

src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/AsyncWriteProxyEx.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public override void AroundPreStart()
124124
/// <param name="receive">TBD</param>
125125
/// <param name="message">TBD</param>
126126
/// <returns>TBD</returns>
127-
protected override bool AroundReceive(Receive receive, object message)
127+
protected internal override bool AroundReceive(Receive receive, object message)
128128
{
129129
if (_isInitialized)
130130
{

src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/SnapshotStoreProxy.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public override void AroundPreStart()
6363
/// <param name="receive">TBD</param>
6464
/// <param name="message">TBD</param>
6565
/// <returns>TBD</returns>
66-
protected override bool AroundReceive(Receive receive, object message)
66+
protected internal override bool AroundReceive(Receive receive, object message)
6767
{
6868
if (_isInitialized)
6969
{

src/contrib/cluster/Akka.Cluster.Sharding.Tests/Akka.Cluster.Sharding.Tests.csproj

+6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@
77
</PropertyGroup>
88

99
<ItemGroup>
10+
<Compile Include="..\..\..\core\Akka.Tests\Delivery\TestConsumer.cs">
11+
<Link>Delivery\TestConsumer.cs</Link>
12+
</Compile>
13+
<Compile Include="..\..\..\core\Akka.Tests\Delivery\TestProducer.cs">
14+
<Link>Delivery\TestProducer.cs</Link>
15+
</Compile>
1016
<Compile Include="..\Akka.Cluster.Sharding.Tests.MultiNode\AsyncWriteProxyEx.cs" Link="AsyncWriteProxyEx.cs" />
1117
<Compile Include="..\Akka.Cluster.Sharding.Tests.MultiNode\MemoryJournalShared.cs" Link="MemoryJournalShared.cs" />
1218
<Compile Include="..\Akka.Cluster.Sharding.Tests.MultiNode\MemorySnapshotStoreShared.cs" Link="MemorySnapshotStoreShared.cs" />

src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingMessageSerializerSpec.cs

+17-2
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,16 @@
88
using System;
99
using System.Collections.Immutable;
1010
using Akka.Actor;
11+
using Akka.Actor.Dsl;
12+
using Akka.Cluster.Configuration;
1113
using Akka.Cluster.Sharding.Internal;
1214
using Akka.Cluster.Sharding.Serialization;
1315
using Akka.Cluster.Tools.Singleton;
1416
using Akka.Configuration;
17+
using Akka.Delivery;
1518
using Akka.Serialization;
1619
using Akka.TestKit;
20+
using Akka.TestKit.TestActors;
1721
using Akka.Util.Internal;
1822
using FluentAssertions;
1923
using Google.Protobuf;
@@ -30,7 +34,7 @@ public class ClusterShardingMessageSerializerSpec : AkkaSpec
3034
private IActorRef regionProxy2;
3135

3236
private static Config SpecConfig =>
33-
ClusterSingletonManager.DefaultConfig().WithFallback(ClusterSharding.DefaultConfig());
37+
ClusterSingletonManager.DefaultConfig().WithFallback(ClusterSharding.DefaultConfig()).WithFallback(ClusterConfigFactory.Default());
3438

3539
public ClusterShardingMessageSerializerSpec() : base(SpecConfig)
3640
{
@@ -47,7 +51,7 @@ private void CheckSerialization(object obj)
4751
serializer.Should().BeOfType<ClusterShardingMessageSerializer>();
4852
var blob = serializer.ToBinary(obj);
4953
var reference = serializer.FromBinary(blob, serializer.Manifest(obj));
50-
reference.Should().Be(obj);
54+
reference.Should().BeEquivalentTo(obj);
5155
}
5256

5357
[Fact]
@@ -211,5 +215,16 @@ public void ClusterShardingMessageSerializer_must_serialize_ShardRegionQuery()
211215
ImmutableHashSet.Create("14", "15")
212216
));
213217
}
218+
219+
[Fact]
220+
public void ClusterShardingMessageSerializer_must_serialize_ShardingEnvelope()
221+
{
222+
var producer = Sys.ActorOf(BlackHoleActor.Props, "fakeProducer");
223+
CheckSerialization(new ShardingEnvelope("entity-1", 11));
224+
CheckSerialization(new ShardingEnvelope("entity-1", new ConsumerController.SequencedMessage<string>("p1", 11, "msg-1", true, false, producer)));
225+
CheckSerialization(new ShardingEnvelope("entity-1", new ConsumerController.SequencedMessage<TestJob>("p1", 11, new TestJob("foo1"), true, false, producer)));
226+
}
227+
228+
private record TestJob(string Job);
214229
}
215230
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="DurableShardingSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Threading.Tasks;
10+
using Akka.Actor;
11+
using Akka.Actor.Dsl;
12+
using Akka.Cluster.Sharding.Delivery;
13+
using Akka.Configuration;
14+
using Akka.Delivery;
15+
using Akka.Event;
16+
using Akka.Persistence.Delivery;
17+
using Akka.TestKit;
18+
using Xunit;
19+
using Xunit.Abstractions;
20+
using FluentAssertions;
21+
using static Akka.Tests.Delivery.TestConsumer;
22+
23+
namespace Akka.Cluster.Sharding.Tests.Delivery;
24+
25+
public class DurableShardingSpec : AkkaSpec
26+
{
27+
public static readonly Config Config = @"
28+
akka.actor.provider = cluster
29+
akka.remote.dot-netty.tcp.port = 0
30+
akka.reliable-delivery.consumer-controller.flow-control-window = 20
31+
akka.persistence.journal.plugin = ""akka.persistence.journal.inmem""
32+
akka.persistence.snapshot-store.plugin = ""akka.persistence.snapshot-store.inmem""
33+
";
34+
35+
public DurableShardingSpec(ITestOutputHelper output) : base(Config, output)
36+
{
37+
// TODO: add journal operations subscriptions, once that's properly supported in Akka.Persistence
38+
}
39+
40+
private int _idCount;
41+
42+
private string ProducerId => $"p-{_idCount}";
43+
44+
private int NextId()
45+
{
46+
return _idCount++;
47+
}
48+
49+
private async Task JoinCluster()
50+
{
51+
var cluster = Cluster.Get(Sys);
52+
await cluster.JoinAsync(cluster.SelfAddress);
53+
await AwaitAssertAsync(() => Assert.True(cluster.IsUp));
54+
}
55+
56+
[Fact]
57+
public async Task ReliableDelivery_with_sharding_and_durable_queue_must_load_initial_state_and_resend_unconfirmed()
58+
{
59+
await JoinCluster();
60+
NextId();
61+
62+
var consumerProbe = CreateTestProbe();
63+
var sharding = await ClusterSharding.Get(Sys).StartAsync($"TestConsumer-{_idCount}", s =>
64+
ShardingConsumerController.Create<Job>(c =>
65+
Props.Create(() => new Consumer(c, consumerProbe)),
66+
ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys),
67+
HashCodeMessageExtractor.Create(10,
68+
o =>
69+
{
70+
if (o is ShardingEnvelope se)
71+
return se.EntityId;
72+
return string.Empty;
73+
}, o =>
74+
{
75+
if (o is ShardingEnvelope se)
76+
return se.Message;
77+
return o;
78+
}));
79+
80+
var durableQueueProps = EventSourcedProducerQueue.Create<Job>(ProducerId, Sys);
81+
var shardingProducerController =
82+
Sys.ActorOf(
83+
ShardingProducerController.Create<Job>(ProducerId, sharding, durableQueueProps,
84+
ShardingProducerController.Settings.Create(Sys)), $"shardingProducerController-{_idCount}");
85+
var producerProbe = CreateTestProbe();
86+
shardingProducerController.Tell(new ShardingProducerController.Start<Job>(producerProbe.Ref));
87+
88+
for (var i = 1; i <= 4; i++)
89+
{
90+
(await producerProbe.ExpectMsgAsync<ShardingProducerController.RequestNext<Job>>()).SendNextTo.Tell(
91+
new ShardingEnvelope("entity-1", new Job($"msg-{i}")));
92+
// TODO: need journal operations queries here to verify that the message was persisted
93+
}
94+
95+
var delivery1 = await consumerProbe.ExpectMsgAsync<JobDelivery>();
96+
delivery1.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
97+
// TODO: need journal operations queries here to verify that the Confirmed was persisted
98+
99+
var delivery2 = await consumerProbe.ExpectMsgAsync<JobDelivery>();
100+
delivery2.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
101+
// TODO: need journal operations queries here to verify that the Confirmed was persisted
102+
103+
await producerProbe.ExpectMsgAsync<ShardingProducerController.RequestNext<Job>>();
104+
105+
// let the initial messages reach the ShardingConsumerController before stopping ShardingProducerController
106+
var delivery3 = await consumerProbe.ExpectMsgAsync<JobDelivery>();
107+
delivery3.Msg.Should().Be(new Job("msg-3"));
108+
delivery3.SeqNr.Should().Be(3);
109+
110+
await Task.Delay(1000);
111+
112+
Sys.Log.Info("Stopping [{0}]", shardingProducerController);
113+
Watch(shardingProducerController);
114+
Sys.Stop(shardingProducerController);
115+
await ExpectTerminatedAsync(shardingProducerController);
116+
117+
var shardingProducerController2 =
118+
Sys.ActorOf(
119+
ShardingProducerController.Create<Job>(ProducerId, sharding, durableQueueProps,
120+
ShardingProducerController.Settings.Create(Sys)), $"shardingProducerController2-{_idCount}");
121+
shardingProducerController2.Tell(new ShardingProducerController.Start<Job>(producerProbe.Ref));
122+
123+
// delivery3 and delivery4 are still from old shardingProducerController, that were queued in ConsumerController
124+
delivery3.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
125+
// that confirmation goes to old dead shardingProducerController, and therefore not stored
126+
// TODO: need journal operations queries here to verify that the Confirmed WAS NOT persisted
127+
128+
var delivery4 = await consumerProbe.ExpectMsgAsync<JobDelivery>();
129+
delivery4.Msg.Should().Be(new Job("msg-4"));
130+
delivery4.SeqNr.Should().Be(4);
131+
delivery4.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
132+
// that confirmation goes to old dead shardingProducerController, and therefore not stored
133+
// TODO: need journal operations queries here to verify that the Confirmed WAS NOT persisted
134+
135+
// now the unconfirmed are re-delivered
136+
var redelivery3 = await consumerProbe.ExpectMsgAsync<JobDelivery>();
137+
redelivery3.Msg.Should().Be(new Job("msg-3"));
138+
redelivery3.SeqNr.Should().Be(1); // new ProducerController and there starting at 1
139+
redelivery3.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
140+
// TODO: need journal operations queries here to verify that the Confirmed was persisted
141+
142+
var redelivery4 = await consumerProbe.ExpectMsgAsync<JobDelivery>();
143+
redelivery4.Msg.Should().Be(new Job("msg-4"));
144+
redelivery4.SeqNr.Should().Be(2);
145+
redelivery4.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
146+
// TODO: need journal operations queries here to verify that the Confirmed was persisted
147+
148+
var next5 = await producerProbe.ExpectMsgAsync<ShardingProducerController.RequestNext<Job>>();
149+
next5.SendNextTo.Tell(new ShardingEnvelope("entity-1", new Job("msg-5")));
150+
// TODO: need journal operations queries here to verify that the message was persisted
151+
152+
153+
// the consumer controller may have stopped after msg-5, so allow for resend on timeout (10-15s)
154+
var delivery5 = await consumerProbe.ExpectMsgAsync<JobDelivery>(TimeSpan.FromSeconds(20));
155+
delivery5.Msg.Should().Be(new Job("msg-5"));
156+
delivery5.SeqNr.Should().Be(3); // 3, instead of 5, because SeqNr reset upon ProducerController restart
157+
delivery5.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
158+
// TODO: need journal operations queries here to verify that the Confirmed was persisted
159+
}
160+
161+
[Fact]
162+
public async Task ReliableDelivery_with_sharding_and_durable_queue_must_reply_to_MessageWithConfirmation_after_storage()
163+
{
164+
await JoinCluster();
165+
NextId();
166+
167+
var consumerProbe = CreateTestProbe();
168+
var sharding = await ClusterSharding.Get(Sys).StartAsync($"TestConsumer-{_idCount}", s =>
169+
ShardingConsumerController.Create<Job>(c =>
170+
Props.Create(() => new Consumer(c, consumerProbe)),
171+
ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys),
172+
HashCodeMessageExtractor.Create(10,
173+
o =>
174+
{
175+
if (o is ShardingEnvelope se)
176+
return se.EntityId;
177+
return string.Empty;
178+
}, o =>
179+
{
180+
if (o is ShardingEnvelope se)
181+
return se.Message;
182+
return o;
183+
}));
184+
185+
var durableQueueProps = EventSourcedProducerQueue.Create<Job>(ProducerId, Sys);
186+
var shardingProducerController =
187+
Sys.ActorOf(
188+
ShardingProducerController.Create<Job>(ProducerId, sharding, durableQueueProps,
189+
ShardingProducerController.Settings.Create(Sys)), $"shardingProducerController-{_idCount}");
190+
var producerProbe = CreateTestProbe();
191+
shardingProducerController.Tell(new ShardingProducerController.Start<Job>(producerProbe.Ref));
192+
193+
var replyProbe = CreateTestProbe();
194+
(await producerProbe.ExpectMsgAsync<ShardingProducerController.RequestNext<Job>>())
195+
.AskNextTo(new ShardingProducerController.MessageWithConfirmation<Job>("entity-1", new Job("msg-1"),
196+
replyProbe.Ref));
197+
await replyProbe.ExpectMsgAsync<Done>();
198+
199+
(await producerProbe.ExpectMsgAsync<ShardingProducerController.RequestNext<Job>>())
200+
.AskNextTo(new ShardingProducerController.MessageWithConfirmation<Job>("entity-2", new Job("msg-2"),
201+
replyProbe.Ref));
202+
await replyProbe.ExpectMsgAsync<Done>();
203+
}
204+
205+
private class Consumer : ReceiveActor
206+
{
207+
private readonly TestProbe _consumerProbe;
208+
private readonly IActorRef _consumerController;
209+
private readonly IActorRef _deliveryAdapter;
210+
211+
public Consumer(IActorRef consumerController, TestProbe consumerProbe)
212+
{
213+
_consumerController = consumerController;
214+
_consumerProbe = consumerProbe;
215+
216+
var self = Self;
217+
_deliveryAdapter = Context.ActorOf(
218+
act =>
219+
{
220+
act.Receive<ConsumerController.Delivery<Job>>((delivery, ctx) =>
221+
{
222+
self.Forward(new JobDelivery(delivery.Message, delivery.ConfirmTo, delivery.ProducerId,
223+
delivery.SeqNr));
224+
});
225+
}, "delivery-adapter");
226+
227+
Receive<JobDelivery>(job => { _consumerProbe.Ref.Tell(job); });
228+
}
229+
230+
protected override void PreStart()
231+
{
232+
_consumerController.Tell(new ConsumerController.Start<Job>(_deliveryAdapter));
233+
}
234+
}
235+
}

0 commit comments

Comments
 (0)