1
+ //-----------------------------------------------------------------------
2
+ // <copyright file="FailedSnapshotStoreRecoverySpec.cs" company="Akka.NET Project">
3
+ // Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
4
+ // Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
5
+ // </copyright>
6
+ //-----------------------------------------------------------------------
7
+
8
+ using System ;
9
+ using System . Threading . Tasks ;
10
+ using Akka . Actor ;
11
+ using Akka . Configuration ;
12
+ using Akka . Event ;
13
+ using Akka . Persistence . TestKit ;
14
+ using Xunit ;
15
+ using Xunit . Abstractions ;
16
+
17
+ namespace Akka . Persistence . Tests ;
18
+
19
+ /// <summary>
20
+ /// Scenario: actor that uses ONLY the SnapshotStore fails to recover - what happens?
21
+ /// </summary>
22
+ public class FailedSnapshotStoreRecoverySpec : PersistenceTestKit
23
+ {
24
+ public enum FailureMode
25
+ {
26
+ Explicit ,
27
+ Timeout
28
+ }
29
+
30
+ public static readonly Config Config = ConfigurationFactory . ParseString ( @"
31
+ # need to set recovery timeout to 1s
32
+ akka.persistence.journal-plugin-fallback.recovery-event-timeout = 1s
33
+ " ) ;
34
+
35
+ public FailedSnapshotStoreRecoverySpec ( ITestOutputHelper output ) : base ( Config , output : output ) { }
36
+
37
+ private record Save ( string Data ) ;
38
+
39
+ private record Fetch ( ) ;
40
+
41
+ private record SnapshotSaved ( ) ;
42
+ private record RecoveryCompleted ( ) ;
43
+
44
+ public sealed class PersistentActor : UntypedPersistentActor
45
+ {
46
+ public PersistentActor ( string persistenceId , IActorRef targetActor )
47
+ {
48
+ PersistenceId = persistenceId ;
49
+ _targetActor = targetActor ;
50
+ }
51
+
52
+ private readonly ILoggingAdapter _log = Context . GetLogger ( ) ;
53
+ private readonly IActorRef _targetActor ;
54
+
55
+ public override string PersistenceId { get ; }
56
+ public string CurrentData { get ; set ; } = "none" ;
57
+ protected override void OnCommand ( object message )
58
+ {
59
+ switch ( message )
60
+ {
61
+ case Save s :
62
+ {
63
+ CurrentData = s . Data ;
64
+ SaveSnapshot ( CurrentData ) ;
65
+ Sender . Tell ( "ack" ) ;
66
+ break ;
67
+ }
68
+ case Fetch :
69
+ {
70
+ Sender . Tell ( CurrentData ) ;
71
+ break ;
72
+ }
73
+ case SaveSnapshotSuccess success :
74
+ {
75
+ _log . Info ( "Snapshot saved" ) ;
76
+ _targetActor . Tell ( new SnapshotSaved ( ) ) ;
77
+ break ;
78
+ }
79
+ case SaveSnapshotFailure failure :
80
+ {
81
+ _log . Error ( failure . Cause , "Snapshot failed" ) ;
82
+ break ;
83
+ }
84
+ }
85
+ }
86
+
87
+ protected override void OnRecover ( object message )
88
+ {
89
+ switch ( message )
90
+ {
91
+ case SnapshotOffer { Snapshot : string str } :
92
+ {
93
+ CurrentData = str ;
94
+ break ;
95
+ }
96
+ }
97
+ }
98
+
99
+ protected override void OnReplaySuccess ( )
100
+ {
101
+ _targetActor . Tell ( new RecoveryCompleted ( ) ) ;
102
+ }
103
+
104
+ protected override void OnRecoveryFailure ( Exception reason , object message = null )
105
+ {
106
+ _log . Error ( reason , "Recovery failed" ) ;
107
+ base . OnRecoveryFailure ( reason , message ) ;
108
+ }
109
+ }
110
+
111
+ [ Theory ( DisplayName = "PersistentActor using Snapshots only must fail if snapshots are irrecoverable" ) ]
112
+ [ InlineData ( FailureMode . Explicit ) ]
113
+ [ InlineData ( FailureMode . Timeout ) ]
114
+ public async Task PersistentActor_using_Snapshots_only_must_fail_if_snapshots_irrecoverable ( FailureMode mode )
115
+ {
116
+ // arrange
117
+ var probe = CreateTestProbe ( ) ;
118
+ var actor = Sys . ActorOf ( Props . Create ( ( ) => new PersistentActor ( "p1" , probe . Ref ) ) ) ;
119
+ await probe . ExpectMsgAsync < RecoveryCompleted > ( ) ;
120
+ actor . Tell ( new Save ( "a" ) , probe ) ;
121
+ await probe . ExpectMsgAsync ( "ack" ) ;
122
+ await probe . ExpectMsgAsync < SnapshotSaved > ( ) ;
123
+ await actor . GracefulStop ( RemainingOrDefault ) ;
124
+
125
+ Task SelectBehavior ( SnapshotStoreLoadBehavior behavior )
126
+ {
127
+ switch ( mode )
128
+ {
129
+ case FailureMode . Timeout :
130
+ return behavior . FailWithDelay ( TimeSpan . FromMinutes ( 1 ) ) ;
131
+ case FailureMode . Explicit :
132
+ default :
133
+ return behavior . Fail ( ) ;
134
+ }
135
+ }
136
+
137
+ // act
138
+ await WithSnapshotLoad ( SelectBehavior , async ( ) =>
139
+ {
140
+ await WithinAsync ( RemainingOrDefault , async ( ) =>
141
+ {
142
+ var actor2 = Sys . ActorOf ( Props . Create ( ( ) => new PersistentActor ( "p1" , probe . Ref ) ) ) ;
143
+ Watch ( actor2 ) ;
144
+ await probe . ExpectNoMsgAsync ( TimeSpan . FromMilliseconds ( 150 ) ) ;
145
+ await ExpectTerminatedAsync ( actor2 ) ;
146
+ } ) ;
147
+ } ) ;
148
+
149
+ }
150
+ }
0 commit comments