Skip to content

Commit f5924ea

Browse files
eabaAaronontheweb
andauthored
[5-74]FlowAggregateSpec (#6544)
* [5-74]`FlowAggregateSpec`: Change `AssertAllStagesStopped` to `AssertAllStagesStoppedAsync` * Revert "[5-74]`FlowAggregateSpec`: Change `AssertAllStagesStopped` to `AssertAllStagesStoppedAsync`" This reverts commit 1adfbb4. * [5-74]`FlowAggregateSpec`: Change `AssertAllStagesStopped` to `AssertAllStagesStoppedAsync` --------- Co-authored-by: Aaron Stannard <[email protected]>
1 parent 29b56ab commit f5924ea

File tree

1 file changed

+22
-22
lines changed

1 file changed

+22
-22
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowAggregateSpec.cs

+22-22
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ public FlowAggregateSpec(ITestOutputHelper helper) : base(helper)
3838
private static Sink<int, Task<int>> AggregateSink => Sink.Aggregate<int, int>(0, (sum, i) => sum + i);
3939

4040
[Fact]
41-
public void A_Aggregate_must_work_when_using_Source_RunAggregate()
41+
public async Task A_Aggregate_must_work_when_using_Source_RunAggregate()
4242
{
43-
this.AssertAllStagesStopped(async() =>
43+
await this.AssertAllStagesStoppedAsync(async() =>
4444
{
4545
var task = InputSource.RunAggregate(0, (sum, i) => sum + i, Materializer);
4646
var complete = await task.ShouldCompleteWithin(3.Seconds());
@@ -49,9 +49,9 @@ public void A_Aggregate_must_work_when_using_Source_RunAggregate()
4949
}
5050

5151
[Fact]
52-
public void A_Aggregate_must_work_when_using_Source_Aggregate()
52+
public async Task A_Aggregate_must_work_when_using_Source_Aggregate()
5353
{
54-
this.AssertAllStagesStopped(async() =>
54+
await this.AssertAllStagesStoppedAsync(async() =>
5555
{
5656
var task = AggregateSource.RunWith(Sink.First<int>(), Materializer);
5757
var complete = await task.ShouldCompleteWithin(3.Seconds());
@@ -60,9 +60,9 @@ public void A_Aggregate_must_work_when_using_Source_Aggregate()
6060
}
6161

6262
[Fact]
63-
public void A_Aggregate_must_work_when_using_Sink_Aggregate()
63+
public async Task A_Aggregate_must_work_when_using_Sink_Aggregate()
6464
{
65-
this.AssertAllStagesStopped(async() =>
65+
await this.AssertAllStagesStoppedAsync(async() =>
6666
{
6767
var task = InputSource.RunWith(AggregateSink, Materializer);
6868
var complete = await task.ShouldCompleteWithin(3.Seconds());
@@ -71,9 +71,9 @@ public void A_Aggregate_must_work_when_using_Sink_Aggregate()
7171
}
7272

7373
[Fact]
74-
public void A_Aggregate_must_work_when_using_Flow_Aggregate()
74+
public async Task A_Aggregate_must_work_when_using_Flow_Aggregate()
7575
{
76-
this.AssertAllStagesStopped(async() =>
76+
await this.AssertAllStagesStoppedAsync(async() =>
7777
{
7878
var task = InputSource.Via(AggregateFlow).RunWith(Sink.First<int>(), Materializer);
7979
var complete = await task.ShouldCompleteWithin(3.Seconds());
@@ -82,9 +82,9 @@ public void A_Aggregate_must_work_when_using_Flow_Aggregate()
8282
}
8383

8484
[Fact]
85-
public void A_Aggregate_must_work_when_using_Source_Aggregate_and_Flow_Aggregate_and_Sink_Aggregate()
85+
public async Task A_Aggregate_must_work_when_using_Source_Aggregate_and_Flow_Aggregate_and_Sink_Aggregate()
8686
{
87-
this.AssertAllStagesStopped(async() =>
87+
await this.AssertAllStagesStoppedAsync(async() =>
8888
{
8989
var task = AggregateSource.Via(AggregateFlow).RunWith(AggregateSink, Materializer);
9090
var complete = await task.ShouldCompleteWithin(3.Seconds());
@@ -93,10 +93,9 @@ public void A_Aggregate_must_work_when_using_Source_Aggregate_and_Flow_Aggregate
9393
}
9494

9595
[Fact]
96-
public void A_Aggregate_must_propagate_an_error()
96+
public async Task A_Aggregate_must_propagate_an_error()
9797
{
98-
this.AssertAllStagesStopped(() =>
99-
{
98+
await this.AssertAllStagesStoppedAsync(() => {
10099
var error = new TestException("buh");
101100
var future = InputSource.Select(x =>
102101
{
@@ -109,15 +108,15 @@ public void A_Aggregate_must_propagate_an_error()
109108
.Should().Throw<TestException>()
110109
.And.Should()
111110
.Be(error);
111+
return Task.CompletedTask;
112112
}, Materializer);
113113
}
114114

115115
[Fact]
116-
public void
116+
public async Task
117117
A_Aggregate_must_complete_task_with_failure_when_the_aggregateing_function_throws_and_the_supervisor_strategy_decides_to_stop()
118118
{
119-
this.AssertAllStagesStopped(() =>
120-
{
119+
await this.AssertAllStagesStoppedAsync(() => {
121120
var error = new TestException("buh");
122121
var future = InputSource.RunAggregate(0, (x, y) =>
123122
{
@@ -130,13 +129,14 @@ public void
130129
.Should().Throw<TestException>()
131130
.And.Should()
132131
.Be(error);
132+
return Task.CompletedTask;
133133
}, Materializer);
134134
}
135135

136136
[Fact]
137-
public void A_Aggregate_must_resume_with_the_accumulated_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_resume()
137+
public async Task A_Aggregate_must_resume_with_the_accumulated_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_resume()
138138
{
139-
this.AssertAllStagesStopped(async() =>
139+
await this.AssertAllStagesStoppedAsync(async() =>
140140
{
141141
var error = new Exception("boom");
142142
var aggregate = Sink.Aggregate(0, (int x, int y) =>
@@ -155,9 +155,9 @@ public void A_Aggregate_must_resume_with_the_accumulated_state_when_the_aggregat
155155
}
156156

157157
[Fact]
158-
public void A_Aggregate_must_resume_and_reset_the_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_restart()
158+
public async Task A_Aggregate_must_resume_and_reset_the_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_restart()
159159
{
160-
this.AssertAllStagesStopped(async() =>
160+
await this.AssertAllStagesStoppedAsync(async() =>
161161
{
162162
var error = new Exception("boom");
163163
var aggregate = Sink.Aggregate(0, (int x, int y) =>
@@ -176,9 +176,9 @@ public void A_Aggregate_must_resume_and_reset_the_state_when_the_aggregating_fun
176176
}
177177

178178
[Fact]
179-
public void A_Aggregate_must_complete_task_and_return_zero_given_an_empty_stream()
179+
public async Task A_Aggregate_must_complete_task_and_return_zero_given_an_empty_stream()
180180
{
181-
this.AssertAllStagesStopped(async() =>
181+
await this.AssertAllStagesStoppedAsync(async() =>
182182
{
183183
var task = Source.From(Enumerable.Empty<int>())
184184
.RunAggregate(0, (acc, element) => acc + element, Materializer);

0 commit comments

Comments
 (0)