diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs index 74ee1622182..6a71ca201da 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs @@ -86,7 +86,7 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage( $"producer-{_idCount}"); // expecting 3 end messages, one for each entity: "entity-0", "entity-1", "entity-2" - await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(25)).ToListAsync(); + await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(5)).ToListAsync(); } [Fact] @@ -128,12 +128,13 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage_ $"p2-{_idCount}"); // expecting 3 end messages, one for each entity: "entity-0", "entity-1", "entity-2" - var endMessages = await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(25)).ToListAsync(); + var endMessages = await consumerEndProbe.ReceiveNAsync(3, TimeSpan.FromSeconds(5)).ToListAsync(); var producerIds = endMessages.Cast().SelectMany(c => c.ProducerIds).ToList(); producerIds .Should().BeEquivalentTo($"p1-{_idCount}-entity-0", $"p1-{_idCount}-entity-1", $"p1-{_idCount}-entity-2", $"p2-{_idCount}-entity-0", $"p2-{_idCount}-entity-1", $"p2-{_idCount}-entity-2"); + } [Fact] @@ -413,7 +414,7 @@ public async Task } // redeliver also when no more messages are sent to the entity - await consumerProbes[1].GracefulStop(RemainingOrDefault); + Sys.Stop(consumerProbes[1]); // don't wait for termination var delivery4b = await consumerProbes[2].ExpectMsgAsync>(); delivery4b.Message.Should().BeEquivalentTo(new Job("msg-4")); diff --git a/src/core/Akka.Tests/Delivery/ReliableDeliverySpecs.cs b/src/core/Akka.Tests/Delivery/ReliableDeliverySpecs.cs index cac683e1f94..16e4ff169ba 100644 --- a/src/core/Akka.Tests/Delivery/ReliableDeliverySpecs.cs +++ b/src/core/Akka.Tests/Delivery/ReliableDeliverySpecs.cs @@ -133,7 +133,7 @@ public async Task ReliableDelivery_must_allow_replacement_of_destination() var consumerEndProbe2 = CreateTestProbe(); var consumerController2 = Sys.ActorOf(ConsumerController.Create(Sys, Option.None), $"consumerController2-{_idCount}"); - var testConsumer2 = Sys.ActorOf(TestConsumer.PropsFor(DefaultConsumerDelay, 42, consumerEndProbe2.Ref, consumerController2), $"destination2-{_idCount}"); + var testConsumer2 = Sys.ActorOf(TestConsumer.PropsFor(DefaultConsumerDelay, 42, consumerEndProbe2.Ref, consumerController2, supportsRestarts:true), $"destination2-{_idCount}"); consumerController2.Tell(new ConsumerController.RegisterToProducerController(producerController)); diff --git a/src/core/Akka.Tests/Delivery/TestConsumer.cs b/src/core/Akka.Tests/Delivery/TestConsumer.cs index bd704478907..18def6ce752 100644 --- a/src/core/Akka.Tests/Delivery/TestConsumer.cs +++ b/src/core/Akka.Tests/Delivery/TestConsumer.cs @@ -34,15 +34,17 @@ public sealed class TestConsumer : ReceiveActor, IWithTimers private readonly ILoggingAdapter _log = Context.GetLogger(); private ImmutableHashSet<(string, long)> _processed = ImmutableHashSet<(string, long)>.Empty; + private readonly bool _supportRestarts = false; private int _messageCount = 0; public TestConsumer(TimeSpan delay, Func endCondition, IActorRef endReplyTo, - IActorRef consumerController) + IActorRef consumerController, bool supportRestarts = false) { Delay = delay; EndCondition = endCondition; EndReplyTo = endReplyTo; ConsumerController = consumerController; + _supportRestarts = supportRestarts; Active(); } @@ -73,12 +75,18 @@ private void Active() _log.Info("processed [{0}] [msg: {1}] from [{2}]", job.SeqNr, job.Msg.Payload, job.ProducerId); job.ConfirmTo.Tell(global::Akka.Delivery.ConsumerController.Confirmed.Instance); - if (EndCondition(job)) + if (EndCondition(job) && (_messageCount > 0 || _supportRestarts)) { _log.Debug("End at [{0}]", job.SeqNr); EndReplyTo.Tell(new Collected(_processed.Select(c => c.Item1).ToImmutableHashSet(), _messageCount + 1)); Context.Stop(Self); } + else if (!_supportRestarts && EndCondition(job)) + { + // BugFix: TestConsumer was recreated by a message sent by the Sharding system, but the EndCondition was already met + // and we don't want to send another Collected that is missing some of the figures. Ignore. + + } else { _processed = cleanProcessed.Add(nextMsg); @@ -181,12 +189,12 @@ public static ConsumerController.SequencedMessage SequencedMessage(string p private static Func ConsumerEndCondition(long seqNr) => msg => msg.SeqNr >= seqNr; - public static Props PropsFor(TimeSpan delay, long seqNr, IActorRef endReplyTo, IActorRef consumerController) => - Props.Create(() => new TestConsumer(delay, ConsumerEndCondition(seqNr), endReplyTo, consumerController)); + public static Props PropsFor(TimeSpan delay, long seqNr, IActorRef endReplyTo, IActorRef consumerController, bool supportsRestarts = false) => + Props.Create(() => new TestConsumer(delay, ConsumerEndCondition(seqNr), endReplyTo, consumerController, supportsRestarts)); public static Props PropsFor(TimeSpan delay, Func endCondition, IActorRef endReplyTo, - IActorRef consumerController) => - Props.Create(() => new TestConsumer(delay, endCondition, endReplyTo, consumerController)); + IActorRef consumerController, bool supportsRestarts = false) => + Props.Create(() => new TestConsumer(delay, endCondition, endReplyTo, consumerController, supportsRestarts)); public ITimerScheduler Timers { get; set; } = null!; }