From 804f0380e9e416ae238e58851f68259e26aea971 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 28 Jun 2023 15:33:30 -0500 Subject: [PATCH 1/2] Added spec to demonstrate failed snapshot-only recovery Tests whether a persistent actor will properly fail during a failed snapshot recovery or during a timed out one. This is all without ever having saved an event to the journal, so the `MaxSeqNo` is set to `0`. --- .../Akka.Persistence.TestKit.csproj | 2 +- .../Akka.Persistence.Tests.csproj | 1 + .../FailedSnapshotStoreRecoverySpec.cs | 147 ++++++++++++++++++ 3 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 src/core/Akka.Persistence.Tests/FailedSnapshotStoreRecoverySpec.cs diff --git a/src/core/Akka.Persistence.TestKit/Akka.Persistence.TestKit.csproj b/src/core/Akka.Persistence.TestKit/Akka.Persistence.TestKit.csproj index aeb2b29ff07..76a37ecadaf 100644 --- a/src/core/Akka.Persistence.TestKit/Akka.Persistence.TestKit.csproj +++ b/src/core/Akka.Persistence.TestKit/Akka.Persistence.TestKit.csproj @@ -3,7 +3,7 @@ Akka.Persistence.TestKit TestKit for writing tests for Akka.NET Persistance module. - $(NetStandardLibVersion) + $(NetStandardLibVersion);$(NetLibVersion) $(AkkaPackageTags);testkit;persistance true diff --git a/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj b/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj index c5fbe3c114c..0cc4319cc79 100644 --- a/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj +++ b/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj @@ -8,6 +8,7 @@ + diff --git a/src/core/Akka.Persistence.Tests/FailedSnapshotStoreRecoverySpec.cs b/src/core/Akka.Persistence.Tests/FailedSnapshotStoreRecoverySpec.cs new file mode 100644 index 00000000000..449634e7602 --- /dev/null +++ b/src/core/Akka.Persistence.Tests/FailedSnapshotStoreRecoverySpec.cs @@ -0,0 +1,147 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2023 Lightbend Inc. +// Copyright (C) 2013-2023 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.Event; +using Akka.Persistence.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Tests; + +/// +/// Scenario: actor that uses ONLY the SnapshotStore fails to recover - what happens? +/// +public class FailedSnapshotStoreRecoverySpec : PersistenceTestKit +{ + public enum FailureMode + { + Explicit, + Timeout + } + + public static readonly Config Config = ConfigurationFactory.ParseString(@" + # need to set recovery timeout to 1s + akka.persistence.journal-plugin-fallback.recovery-event-timeout = 1s + "); + + public FailedSnapshotStoreRecoverySpec(ITestOutputHelper output) : base(Config, output:output){} + + private record Save(string Data); + + private record Fetch(); + + private record SnapshotSaved(); + private record RecoveryCompleted(); + + public sealed class PersistentActor : UntypedPersistentActor + { + public PersistentActor(string persistenceId, IActorRef targetActor) + { + PersistenceId = persistenceId; + _targetActor = targetActor; + } + + private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly IActorRef _targetActor; + + public override string PersistenceId { get; } + public string CurrentData { get; set; } = "none"; + protected override void OnCommand(object message) + { + switch (message) + { + case Save s: + { + CurrentData = s.Data; + SaveSnapshot(CurrentData); + Sender.Tell("ack"); + break; + } + case Fetch: + { + Sender.Tell(CurrentData); + break; + } + case SaveSnapshotSuccess success: + { + _log.Info("Snapshot saved"); + _targetActor.Tell(new SnapshotSaved()); + break; + } + case SaveSnapshotFailure failure: + { + _log.Error(failure.Cause, "Snapshot failed"); + break; + } + } + } + + protected override void OnRecover(object message) + { + switch (message) + { + case SnapshotOffer { Snapshot: string str }: + { + CurrentData = str; + break; + } + } + } + + protected override void OnReplaySuccess() + { + _targetActor.Tell(new RecoveryCompleted()); + } + + protected override void OnRecoveryFailure(Exception reason, object message = null) + { + _log.Error(reason, "Recovery failed"); + base.OnRecoveryFailure(reason, message); + } + } + + [Theory(DisplayName = "PersistentActor using Snapshots only must fail if snapshots are irrecoverable")] + [InlineData(FailureMode.Explicit)] + [InlineData(FailureMode.Timeout)] + public async Task PersistentActor_using_Snapshots_only_must_fail_if_snapshots_irrecoverable(FailureMode mode) + { + // arrange + var probe = CreateTestProbe(); + var actor = Sys.ActorOf(Props.Create(() => new PersistentActor("p1", probe.Ref))); + await probe.ExpectMsgAsync(); + actor.Tell(new Save("a"), probe); + await probe.ExpectMsgAsync("ack"); + await probe.ExpectMsgAsync(); + await actor.GracefulStop(RemainingOrDefault); + + Task SelectBehavior(SnapshotStoreLoadBehavior behavior) + { + switch (mode) + { + case FailureMode.Timeout: + return behavior.FailWithDelay(TimeSpan.FromMinutes(1)); + case FailureMode.Explicit: + default: + return behavior.Fail(); + } + } + + // act + await WithSnapshotLoad(SelectBehavior, async () => + { + var actor2 = Sys.ActorOf(Props.Create(() => new PersistentActor("p1", probe.Ref))); + Watch(actor2); + await probe.ExpectNoMsgAsync(); + await ExpectTerminatedAsync(actor2); + }); + + } +} \ No newline at end of file From 51a9a4cfaea0608c30338c1fda8d805251808b02 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 28 Jun 2023 15:38:48 -0500 Subject: [PATCH 2/2] accelerated timeframes --- .../FailedSnapshotStoreRecoverySpec.cs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/core/Akka.Persistence.Tests/FailedSnapshotStoreRecoverySpec.cs b/src/core/Akka.Persistence.Tests/FailedSnapshotStoreRecoverySpec.cs index 449634e7602..6fa437f75f7 100644 --- a/src/core/Akka.Persistence.Tests/FailedSnapshotStoreRecoverySpec.cs +++ b/src/core/Akka.Persistence.Tests/FailedSnapshotStoreRecoverySpec.cs @@ -137,10 +137,13 @@ Task SelectBehavior(SnapshotStoreLoadBehavior behavior) // act await WithSnapshotLoad(SelectBehavior, async () => { - var actor2 = Sys.ActorOf(Props.Create(() => new PersistentActor("p1", probe.Ref))); - Watch(actor2); - await probe.ExpectNoMsgAsync(); - await ExpectTerminatedAsync(actor2); + await WithinAsync(RemainingOrDefault, async () => + { + var actor2 = Sys.ActorOf(Props.Create(() => new PersistentActor("p1", probe.Ref))); + Watch(actor2); + await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(150)); + await ExpectTerminatedAsync(actor2); + }); }); }