@@ -38,9 +38,9 @@ public FlowAggregateSpec(ITestOutputHelper helper) : base(helper)
38
38
private static Sink < int , Task < int > > AggregateSink => Sink . Aggregate < int , int > ( 0 , ( sum , i ) => sum + i ) ;
39
39
40
40
[ Fact ]
41
- public void A_Aggregate_must_work_when_using_Source_RunAggregate ( )
41
+ public async Task A_Aggregate_must_work_when_using_Source_RunAggregate ( )
42
42
{
43
- this . AssertAllStagesStopped ( async ( ) =>
43
+ await this . AssertAllStagesStoppedAsync ( async ( ) =>
44
44
{
45
45
var task = InputSource . RunAggregate ( 0 , ( sum , i ) => sum + i , Materializer ) ;
46
46
var complete = await task . ShouldCompleteWithin ( 3 . Seconds ( ) ) ;
@@ -49,9 +49,9 @@ public void A_Aggregate_must_work_when_using_Source_RunAggregate()
49
49
}
50
50
51
51
[ Fact ]
52
- public void A_Aggregate_must_work_when_using_Source_Aggregate ( )
52
+ public async Task A_Aggregate_must_work_when_using_Source_Aggregate ( )
53
53
{
54
- this . AssertAllStagesStopped ( async ( ) =>
54
+ await this . AssertAllStagesStoppedAsync ( async ( ) =>
55
55
{
56
56
var task = AggregateSource . RunWith ( Sink . First < int > ( ) , Materializer ) ;
57
57
var complete = await task . ShouldCompleteWithin ( 3 . Seconds ( ) ) ;
@@ -60,9 +60,9 @@ public void A_Aggregate_must_work_when_using_Source_Aggregate()
60
60
}
61
61
62
62
[ Fact ]
63
- public void A_Aggregate_must_work_when_using_Sink_Aggregate ( )
63
+ public async Task A_Aggregate_must_work_when_using_Sink_Aggregate ( )
64
64
{
65
- this . AssertAllStagesStopped ( async ( ) =>
65
+ await this . AssertAllStagesStoppedAsync ( async ( ) =>
66
66
{
67
67
var task = InputSource . RunWith ( AggregateSink , Materializer ) ;
68
68
var complete = await task . ShouldCompleteWithin ( 3 . Seconds ( ) ) ;
@@ -71,9 +71,9 @@ public void A_Aggregate_must_work_when_using_Sink_Aggregate()
71
71
}
72
72
73
73
[ Fact ]
74
- public void A_Aggregate_must_work_when_using_Flow_Aggregate ( )
74
+ public async Task A_Aggregate_must_work_when_using_Flow_Aggregate ( )
75
75
{
76
- this . AssertAllStagesStopped ( async ( ) =>
76
+ await this . AssertAllStagesStoppedAsync ( async ( ) =>
77
77
{
78
78
var task = InputSource . Via ( AggregateFlow ) . RunWith ( Sink . First < int > ( ) , Materializer ) ;
79
79
var complete = await task . ShouldCompleteWithin ( 3 . Seconds ( ) ) ;
@@ -82,9 +82,9 @@ public void A_Aggregate_must_work_when_using_Flow_Aggregate()
82
82
}
83
83
84
84
[ Fact ]
85
- public void A_Aggregate_must_work_when_using_Source_Aggregate_and_Flow_Aggregate_and_Sink_Aggregate ( )
85
+ public async Task A_Aggregate_must_work_when_using_Source_Aggregate_and_Flow_Aggregate_and_Sink_Aggregate ( )
86
86
{
87
- this . AssertAllStagesStopped ( async ( ) =>
87
+ await this . AssertAllStagesStoppedAsync ( async ( ) =>
88
88
{
89
89
var task = AggregateSource . Via ( AggregateFlow ) . RunWith ( AggregateSink , Materializer ) ;
90
90
var complete = await task . ShouldCompleteWithin ( 3 . Seconds ( ) ) ;
@@ -93,10 +93,9 @@ public void A_Aggregate_must_work_when_using_Source_Aggregate_and_Flow_Aggregate
93
93
}
94
94
95
95
[ Fact ]
96
- public void A_Aggregate_must_propagate_an_error ( )
96
+ public async Task A_Aggregate_must_propagate_an_error ( )
97
97
{
98
- this . AssertAllStagesStopped ( ( ) =>
99
- {
98
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
100
99
var error = new TestException ( "buh" ) ;
101
100
var future = InputSource . Select ( x =>
102
101
{
@@ -109,15 +108,15 @@ public void A_Aggregate_must_propagate_an_error()
109
108
. Should ( ) . Throw < TestException > ( )
110
109
. And . Should ( )
111
110
. Be ( error ) ;
111
+ return Task . CompletedTask ;
112
112
} , Materializer ) ;
113
113
}
114
114
115
115
[ Fact ]
116
- public void
116
+ public async Task
117
117
A_Aggregate_must_complete_task_with_failure_when_the_aggregateing_function_throws_and_the_supervisor_strategy_decides_to_stop ( )
118
118
{
119
- this . AssertAllStagesStopped ( ( ) =>
120
- {
119
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
121
120
var error = new TestException ( "buh" ) ;
122
121
var future = InputSource . RunAggregate ( 0 , ( x , y ) =>
123
122
{
@@ -130,13 +129,14 @@ public void
130
129
. Should ( ) . Throw < TestException > ( )
131
130
. And . Should ( )
132
131
. Be ( error ) ;
132
+ return Task . CompletedTask ;
133
133
} , Materializer ) ;
134
134
}
135
135
136
136
[ Fact ]
137
- public void A_Aggregate_must_resume_with_the_accumulated_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_resume ( )
137
+ public async Task A_Aggregate_must_resume_with_the_accumulated_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_resume ( )
138
138
{
139
- this . AssertAllStagesStopped ( async ( ) =>
139
+ await this . AssertAllStagesStoppedAsync ( async ( ) =>
140
140
{
141
141
var error = new Exception ( "boom" ) ;
142
142
var aggregate = Sink . Aggregate ( 0 , ( int x , int y ) =>
@@ -155,9 +155,9 @@ public void A_Aggregate_must_resume_with_the_accumulated_state_when_the_aggregat
155
155
}
156
156
157
157
[ Fact ]
158
- public void A_Aggregate_must_resume_and_reset_the_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_restart ( )
158
+ public async Task A_Aggregate_must_resume_and_reset_the_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_restart ( )
159
159
{
160
- this . AssertAllStagesStopped ( async ( ) =>
160
+ await this . AssertAllStagesStoppedAsync ( async ( ) =>
161
161
{
162
162
var error = new Exception ( "boom" ) ;
163
163
var aggregate = Sink . Aggregate ( 0 , ( int x , int y ) =>
@@ -176,9 +176,9 @@ public void A_Aggregate_must_resume_and_reset_the_state_when_the_aggregating_fun
176
176
}
177
177
178
178
[ Fact ]
179
- public void A_Aggregate_must_complete_task_and_return_zero_given_an_empty_stream ( )
179
+ public async Task A_Aggregate_must_complete_task_and_return_zero_given_an_empty_stream ( )
180
180
{
181
- this . AssertAllStagesStopped ( async ( ) =>
181
+ await this . AssertAllStagesStoppedAsync ( async ( ) =>
182
182
{
183
183
var task = Source . From ( Enumerable . Empty < int > ( ) )
184
184
. RunAggregate ( 0 , ( acc , element ) => acc + element , Materializer ) ;
0 commit comments