-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Added spec to demonstrate failed snapshot-only recovery #6822
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Arkatufus
merged 2 commits into
akkadotnet:dev
from
Aaronontheweb:added-failed-snapshot-only-recovery-spec
Jun 29, 2023
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
|
||
<ItemGroup> | ||
<ProjectReference Include="..\..\contrib\serializers\Akka.Serialization.Hyperion\Akka.Serialization.Hyperion.csproj" /> | ||
<ProjectReference Include="..\Akka.Persistence.TestKit.Xunit2\Akka.Persistence.TestKit.Xunit2.csproj" /> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Referenced the Akka.Persistence.TestKit so I could more easily simulate journal / snapshot store failures |
||
<ProjectReference Include="..\Akka.Persistence\Akka.Persistence.csproj" /> | ||
<ProjectReference Include="..\Akka.Remote\Akka.Remote.csproj" /> | ||
<ProjectReference Include="..\Akka.Tests.Shared.Internals\Akka.Tests.Shared.Internals.csproj" /> | ||
|
150 changes: 150 additions & 0 deletions
150
src/core/Akka.Persistence.Tests/FailedSnapshotStoreRecoverySpec.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
//----------------------------------------------------------------------- | ||
// <copyright file="FailedSnapshotStoreRecoverySpec.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com> | ||
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// </copyright> | ||
//----------------------------------------------------------------------- | ||
|
||
using System; | ||
using System.Threading.Tasks; | ||
using Akka.Actor; | ||
using Akka.Configuration; | ||
using Akka.Event; | ||
using Akka.Persistence.TestKit; | ||
using Xunit; | ||
using Xunit.Abstractions; | ||
|
||
namespace Akka.Persistence.Tests; | ||
|
||
/// <summary> | ||
/// Scenario: actor that uses ONLY the SnapshotStore fails to recover - what happens? | ||
/// </summary> | ||
public class FailedSnapshotStoreRecoverySpec : PersistenceTestKit | ||
{ | ||
public enum FailureMode | ||
{ | ||
Explicit, | ||
Timeout | ||
} | ||
|
||
public static readonly Config Config = ConfigurationFactory.ParseString(@" | ||
# need to set recovery timeout to 1s | ||
akka.persistence.journal-plugin-fallback.recovery-event-timeout = 1s | ||
"); | ||
|
||
public FailedSnapshotStoreRecoverySpec(ITestOutputHelper output) : base(Config, output:output){} | ||
|
||
private record Save(string Data); | ||
|
||
private record Fetch(); | ||
|
||
private record SnapshotSaved(); | ||
private record RecoveryCompleted(); | ||
|
||
public sealed class PersistentActor : UntypedPersistentActor | ||
{ | ||
public PersistentActor(string persistenceId, IActorRef targetActor) | ||
{ | ||
PersistenceId = persistenceId; | ||
_targetActor = targetActor; | ||
} | ||
|
||
private readonly ILoggingAdapter _log = Context.GetLogger(); | ||
private readonly IActorRef _targetActor; | ||
|
||
public override string PersistenceId { get; } | ||
public string CurrentData { get; set; } = "none"; | ||
protected override void OnCommand(object message) | ||
{ | ||
switch (message) | ||
{ | ||
case Save s: | ||
{ | ||
CurrentData = s.Data; | ||
SaveSnapshot(CurrentData); | ||
Sender.Tell("ack"); | ||
break; | ||
} | ||
case Fetch: | ||
{ | ||
Sender.Tell(CurrentData); | ||
break; | ||
} | ||
case SaveSnapshotSuccess success: | ||
{ | ||
_log.Info("Snapshot saved"); | ||
_targetActor.Tell(new SnapshotSaved()); | ||
break; | ||
} | ||
case SaveSnapshotFailure failure: | ||
{ | ||
_log.Error(failure.Cause, "Snapshot failed"); | ||
break; | ||
} | ||
} | ||
} | ||
|
||
protected override void OnRecover(object message) | ||
{ | ||
switch (message) | ||
{ | ||
case SnapshotOffer { Snapshot: string str }: | ||
{ | ||
CurrentData = str; | ||
break; | ||
} | ||
} | ||
} | ||
|
||
protected override void OnReplaySuccess() | ||
{ | ||
_targetActor.Tell(new RecoveryCompleted()); | ||
} | ||
|
||
protected override void OnRecoveryFailure(Exception reason, object message = null) | ||
{ | ||
_log.Error(reason, "Recovery failed"); | ||
base.OnRecoveryFailure(reason, message); | ||
} | ||
} | ||
|
||
[Theory(DisplayName = "PersistentActor using Snapshots only must fail if snapshots are irrecoverable")] | ||
[InlineData(FailureMode.Explicit)] | ||
[InlineData(FailureMode.Timeout)] | ||
public async Task PersistentActor_using_Snapshots_only_must_fail_if_snapshots_irrecoverable(FailureMode mode) | ||
{ | ||
// arrange | ||
var probe = CreateTestProbe(); | ||
var actor = Sys.ActorOf(Props.Create(() => new PersistentActor("p1", probe.Ref))); | ||
await probe.ExpectMsgAsync<RecoveryCompleted>(); | ||
actor.Tell(new Save("a"), probe); | ||
await probe.ExpectMsgAsync("ack"); | ||
await probe.ExpectMsgAsync<SnapshotSaved>(); | ||
await actor.GracefulStop(RemainingOrDefault); | ||
|
||
Task SelectBehavior(SnapshotStoreLoadBehavior behavior) | ||
{ | ||
switch (mode) | ||
{ | ||
case FailureMode.Timeout: | ||
return behavior.FailWithDelay(TimeSpan.FromMinutes(1)); | ||
case FailureMode.Explicit: | ||
default: | ||
return behavior.Fail(); | ||
} | ||
} | ||
|
||
// act | ||
await WithSnapshotLoad(SelectBehavior, async () => | ||
{ | ||
await WithinAsync(RemainingOrDefault, async () => | ||
{ | ||
var actor2 = Sys.ActorOf(Props.Create(() => new PersistentActor("p1", probe.Ref))); | ||
Watch(actor2); | ||
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(150)); | ||
await ExpectTerminatedAsync(actor2); | ||
}); | ||
}); | ||
|
||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added dual-targeting to the Akka.Persistence.TestKit (
net6
)