14
14
using Akka . Streams . TestKit ;
15
15
using Akka . TestKit ;
16
16
using FluentAssertions ;
17
+ using Akka . TestKit . Extensions ;
17
18
using Xunit ;
18
19
using Xunit . Abstractions ;
20
+ using FluentAssertions . Extensions ;
19
21
20
22
namespace Akka . Streams . Tests . Dsl
21
23
{
@@ -38,50 +40,55 @@ public FlowAggregateSpec(ITestOutputHelper helper) : base(helper)
38
40
[ Fact ]
39
41
public void A_Aggregate_must_work_when_using_Source_RunAggregate ( )
40
42
{
41
- this . AssertAllStagesStopped ( ( ) =>
43
+ this . AssertAllStagesStopped ( async ( ) =>
42
44
{
43
45
var task = InputSource . RunAggregate ( 0 , ( sum , i ) => sum + i , Materializer ) ;
44
- task . AwaitResult ( ) . Should ( ) . Be ( Expected ) ;
46
+ var complete = await task . ShouldCompleteWithin ( 3 . Seconds ( ) ) ;
47
+ complete . Should ( ) . Be ( Expected ) ;
45
48
} , Materializer ) ;
46
49
}
47
50
48
51
[ Fact ]
49
52
public void A_Aggregate_must_work_when_using_Source_Aggregate ( )
50
53
{
51
- this . AssertAllStagesStopped ( ( ) =>
54
+ this . AssertAllStagesStopped ( async ( ) =>
52
55
{
53
56
var task = AggregateSource . RunWith ( Sink . First < int > ( ) , Materializer ) ;
54
- task . AwaitResult ( ) . Should ( ) . Be ( Expected ) ;
57
+ var complete = await task . ShouldCompleteWithin ( 3 . Seconds ( ) ) ;
58
+ complete . Should ( ) . Be ( Expected ) ;
55
59
} , Materializer ) ;
56
60
}
57
61
58
62
[ Fact ]
59
63
public void A_Aggregate_must_work_when_using_Sink_Aggregate ( )
60
64
{
61
- this . AssertAllStagesStopped ( ( ) =>
65
+ this . AssertAllStagesStopped ( async ( ) =>
62
66
{
63
67
var task = InputSource . RunWith ( AggregateSink , Materializer ) ;
64
- task . AwaitResult ( ) . Should ( ) . Be ( Expected ) ;
68
+ var complete = await task . ShouldCompleteWithin ( 3 . Seconds ( ) ) ;
69
+ complete . Should ( ) . Be ( Expected ) ;
65
70
} , Materializer ) ;
66
71
}
67
72
68
73
[ Fact ]
69
74
public void A_Aggregate_must_work_when_using_Flow_Aggregate ( )
70
75
{
71
- this . AssertAllStagesStopped ( ( ) =>
76
+ this . AssertAllStagesStopped ( async ( ) =>
72
77
{
73
78
var task = InputSource . Via ( AggregateFlow ) . RunWith ( Sink . First < int > ( ) , Materializer ) ;
74
- task . AwaitResult ( ) . Should ( ) . Be ( Expected ) ;
79
+ var complete = await task . ShouldCompleteWithin ( 3 . Seconds ( ) ) ;
80
+ complete . Should ( ) . Be ( Expected ) ;
75
81
} , Materializer ) ;
76
82
}
77
83
78
84
[ Fact ]
79
85
public void A_Aggregate_must_work_when_using_Source_Aggregate_and_Flow_Aggregate_and_Sink_Aggregate ( )
80
86
{
81
- this . AssertAllStagesStopped ( ( ) =>
87
+ this . AssertAllStagesStopped ( async ( ) =>
82
88
{
83
89
var task = AggregateSource . Via ( AggregateFlow ) . RunWith ( AggregateSink , Materializer ) ;
84
- task . AwaitResult ( ) . Should ( ) . Be ( Expected ) ;
90
+ var complete = await task . ShouldCompleteWithin ( 3 . Seconds ( ) ) ;
91
+ complete . Should ( ) . Be ( Expected ) ;
85
92
} , Materializer ) ;
86
93
}
87
94
@@ -129,7 +136,7 @@ public void
129
136
[ Fact ]
130
137
public void A_Aggregate_must_resume_with_the_accumulated_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_resume ( )
131
138
{
132
- this . AssertAllStagesStopped ( ( ) =>
139
+ this . AssertAllStagesStopped ( async ( ) =>
133
140
{
134
141
var error = new Exception ( "boom" ) ;
135
142
var aggregate = Sink . Aggregate ( 0 , ( int x , int y ) =>
@@ -142,14 +149,15 @@ public void A_Aggregate_must_resume_with_the_accumulated_state_when_the_aggregat
142
149
var task = InputSource . RunWith (
143
150
aggregate . WithAttributes ( ActorAttributes . CreateSupervisionStrategy ( Deciders . ResumingDecider ) ) ,
144
151
Materializer ) ;
145
- task . AwaitResult ( ) . Should ( ) . Be ( Expected - 50 ) ;
152
+ var complete = await task . ShouldCompleteWithin ( 3 . Seconds ( ) ) ;
153
+ complete . Should ( ) . Be ( Expected - 50 ) ;
146
154
} , Materializer ) ;
147
155
}
148
156
149
157
[ Fact ]
150
158
public void A_Aggregate_must_resume_and_reset_the_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_restart ( )
151
159
{
152
- this . AssertAllStagesStopped ( ( ) =>
160
+ this . AssertAllStagesStopped ( async ( ) =>
153
161
{
154
162
var error = new Exception ( "boom" ) ;
155
163
var aggregate = Sink . Aggregate ( 0 , ( int x , int y ) =>
@@ -162,18 +170,20 @@ public void A_Aggregate_must_resume_and_reset_the_state_when_the_aggregating_fun
162
170
var task = InputSource . RunWith (
163
171
aggregate . WithAttributes ( ActorAttributes . CreateSupervisionStrategy ( Deciders . RestartingDecider ) ) ,
164
172
Materializer ) ;
165
- task . AwaitResult ( ) . Should ( ) . Be ( Enumerable . Range ( 51 , 50 ) . Sum ( ) ) ;
173
+ var complete = await task . ShouldCompleteWithin ( 3 . Seconds ( ) ) ;
174
+ complete . Should ( ) . Be ( Enumerable . Range ( 51 , 50 ) . Sum ( ) ) ;
166
175
} , Materializer ) ;
167
176
}
168
177
169
178
[ Fact ]
170
179
public void A_Aggregate_must_complete_task_and_return_zero_given_an_empty_stream ( )
171
180
{
172
- this . AssertAllStagesStopped ( ( ) =>
181
+ this . AssertAllStagesStopped ( async ( ) =>
173
182
{
174
183
var task = Source . From ( Enumerable . Empty < int > ( ) )
175
184
. RunAggregate ( 0 , ( acc , element ) => acc + element , Materializer ) ;
176
- task . AwaitResult ( ) . ShouldBe ( 0 ) ;
185
+ var complete = await task . ShouldCompleteWithin ( 3 . Seconds ( ) ) ;
186
+ complete . Should ( ) . Be ( 0 ) ;
177
187
} , Materializer ) ;
178
188
}
179
189
}
0 commit comments