Skip to content

Commit 952d0ff

Browse files
harden ReliableDeliveryShardingSpecs (#6750)
1 parent bf9c1ce commit 952d0ff

File tree

3 files changed

+19
-10
lines changed

3 files changed

+19
-10
lines changed

src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs

+4-3
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage(
8686
$"producer-{_idCount}");
8787

8888
// expecting 3 end messages, one for each entity: "entity-0", "entity-1", "entity-2"
89-
await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(25)).ToListAsync();
89+
await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(5)).ToListAsync();
9090
}
9191

9292
[Fact]
@@ -128,12 +128,13 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage_
128128
$"p2-{_idCount}");
129129

130130
// expecting 3 end messages, one for each entity: "entity-0", "entity-1", "entity-2"
131-
var endMessages = await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(25)).ToListAsync();
131+
var endMessages = await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(5)).ToListAsync();
132132

133133
var producerIds = endMessages.Cast<Collected>().SelectMany(c => c.ProducerIds).ToList();
134134
producerIds
135135
.Should().BeEquivalentTo($"p1-{_idCount}-entity-0", $"p1-{_idCount}-entity-1", $"p1-{_idCount}-entity-2",
136136
$"p2-{_idCount}-entity-0", $"p2-{_idCount}-entity-1", $"p2-{_idCount}-entity-2");
137+
137138
}
138139

139140
[Fact]
@@ -413,7 +414,7 @@ public async Task
413414
}
414415

415416
// redeliver also when no more messages are sent to the entity
416-
await consumerProbes[1].GracefulStop(RemainingOrDefault);
417+
Sys.Stop(consumerProbes[1]); // don't wait for termination
417418

418419
var delivery4b = await consumerProbes[2].ExpectMsgAsync<ConsumerController.Delivery<Job>>();
419420
delivery4b.Message.Should().BeEquivalentTo(new Job("msg-4"));

src/core/Akka.Tests/Delivery/ReliableDeliverySpecs.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public async Task ReliableDelivery_must_allow_replacement_of_destination()
133133

134134
var consumerEndProbe2 = CreateTestProbe();
135135
var consumerController2 = Sys.ActorOf(ConsumerController.Create<Job>(Sys, Option<IActorRef>.None), $"consumerController2-{_idCount}");
136-
var testConsumer2 = Sys.ActorOf(TestConsumer.PropsFor(DefaultConsumerDelay, 42, consumerEndProbe2.Ref, consumerController2), $"destination2-{_idCount}");
136+
var testConsumer2 = Sys.ActorOf(TestConsumer.PropsFor(DefaultConsumerDelay, 42, consumerEndProbe2.Ref, consumerController2, supportsRestarts:true), $"destination2-{_idCount}");
137137

138138
consumerController2.Tell(new ConsumerController.RegisterToProducerController<Job>(producerController));
139139

src/core/Akka.Tests/Delivery/TestConsumer.cs

+14-6
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,17 @@ public sealed class TestConsumer : ReceiveActor, IWithTimers
3434

3535
private readonly ILoggingAdapter _log = Context.GetLogger();
3636
private ImmutableHashSet<(string, long)> _processed = ImmutableHashSet<(string, long)>.Empty;
37+
private readonly bool _supportRestarts = false;
3738
private int _messageCount = 0;
3839

3940
public TestConsumer(TimeSpan delay, Func<SomeAsyncJob, bool> endCondition, IActorRef endReplyTo,
40-
IActorRef consumerController)
41+
IActorRef consumerController, bool supportRestarts = false)
4142
{
4243
Delay = delay;
4344
EndCondition = endCondition;
4445
EndReplyTo = endReplyTo;
4546
ConsumerController = consumerController;
47+
_supportRestarts = supportRestarts;
4648

4749
Active();
4850
}
@@ -73,12 +75,18 @@ private void Active()
7375
_log.Info("processed [{0}] [msg: {1}] from [{2}]", job.SeqNr, job.Msg.Payload, job.ProducerId);
7476
job.ConfirmTo.Tell(global::Akka.Delivery.ConsumerController.Confirmed.Instance);
7577

76-
if (EndCondition(job))
78+
if (EndCondition(job) && (_messageCount > 0 || _supportRestarts))
7779
{
7880
_log.Debug("End at [{0}]", job.SeqNr);
7981
EndReplyTo.Tell(new Collected(_processed.Select(c => c.Item1).ToImmutableHashSet(), _messageCount + 1));
8082
Context.Stop(Self);
8183
}
84+
else if (!_supportRestarts && EndCondition(job))
85+
{
86+
// BugFix: TestConsumer was recreated by a message sent by the Sharding system, but the EndCondition was already met
87+
// and we don't want to send another Collected that is missing some of the figures. Ignore.
88+
89+
}
8290
else
8391
{
8492
_processed = cleanProcessed.Add(nextMsg);
@@ -181,12 +189,12 @@ public static ConsumerController.SequencedMessage<Job> SequencedMessage(string p
181189

182190
private static Func<SomeAsyncJob, bool> ConsumerEndCondition(long seqNr) => msg => msg.SeqNr >= seqNr;
183191

184-
public static Props PropsFor(TimeSpan delay, long seqNr, IActorRef endReplyTo, IActorRef consumerController) =>
185-
Props.Create(() => new TestConsumer(delay, ConsumerEndCondition(seqNr), endReplyTo, consumerController));
192+
public static Props PropsFor(TimeSpan delay, long seqNr, IActorRef endReplyTo, IActorRef consumerController, bool supportsRestarts = false) =>
193+
Props.Create(() => new TestConsumer(delay, ConsumerEndCondition(seqNr), endReplyTo, consumerController, supportsRestarts));
186194

187195
public static Props PropsFor(TimeSpan delay, Func<SomeAsyncJob, bool> endCondition, IActorRef endReplyTo,
188-
IActorRef consumerController) =>
189-
Props.Create(() => new TestConsumer(delay, endCondition, endReplyTo, consumerController));
196+
IActorRef consumerController, bool supportsRestarts = false) =>
197+
Props.Create(() => new TestConsumer(delay, endCondition, endReplyTo, consumerController, supportsRestarts));
190198

191199
public ITimerScheduler Timers { get; set; } = null!;
192200
}

0 commit comments

Comments
 (0)