Skip to content

Commit e4ac8fd

Browse files
authored
[33-74] FlowSumSpec (#6580)
1 parent 7e3befa commit e4ac8fd

File tree

1 file changed

+39
-41
lines changed

1 file changed

+39
-41
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowSumSpec.cs

+39-41
Original file line numberDiff line numberDiff line change
@@ -44,66 +44,63 @@ private static Flow<int, int, NotUsed> SumFlow
4444
private static Sink<int, Task<int>> SumSink => Sink.Sum<int>((i, i1) => i + i1);
4545

4646
[Fact]
47-
public void A_Sum_must_work_when_using_Source_RunSum()
47+
public async Task A_Sum_must_work_when_using_Source_RunSum()
4848
{
49-
this.AssertAllStagesStopped(() =>
50-
{
49+
await this.AssertAllStagesStoppedAsync(() => {
5150
var t = InputSource.RunSum((i, i1) => i + i1, Materializer);
5251
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
5352
t.Result.Should().Be(Expected);
53+
return Task.CompletedTask;
5454
}, Materializer);
5555
}
5656

5757
[Fact]
58-
public void A_Sum_must_work_when_using_Source_Sum()
58+
public async Task A_Sum_must_work_when_using_Source_Sum()
5959
{
60-
this.AssertAllStagesStopped(() =>
61-
{
60+
await this.AssertAllStagesStoppedAsync(() => {
6261
var t = SumSource.RunWith(Sink.First<int>(), Materializer);
6362
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
6463
t.Result.Should().Be(Expected);
64+
return Task.CompletedTask;
6565
}, Materializer);
6666
}
6767
[Fact]
68-
public void A_Sum_must_work_when_using_Sink_Sum()
68+
public async Task A_Sum_must_work_when_using_Sink_Sum()
6969
{
70-
this.AssertAllStagesStopped(() =>
71-
{
70+
await this.AssertAllStagesStoppedAsync(() => {
7271
var t = InputSource.RunWith(SumSink, Materializer);
7372
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
7473
t.Result.Should().Be(Expected);
75-
74+
return Task.CompletedTask;
7675
}, Materializer);
7776
}
7877

7978
[Fact]
80-
public void A_Sum_must_work_when_using_Flow_Sum()
79+
public async Task A_Sum_must_work_when_using_Flow_Sum()
8180
{
82-
this.AssertAllStagesStopped(() =>
83-
{
81+
await this.AssertAllStagesStoppedAsync(() => {
8482
var t = InputSource.Via(SumFlow).RunWith(Sink.First<int>(), Materializer);
8583
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
8684
t.Result.Should().Be(Expected);
85+
return Task.CompletedTask;
8786
}, Materializer);
8887
}
8988

9089
[Fact]
91-
public void A_Sum_must_work_when_using_Source_Sum_and_Flow_Sum_and_Sink_Sum()
90+
public async Task A_Sum_must_work_when_using_Source_Sum_and_Flow_Sum_and_Sink_Sum()
9291
{
93-
this.AssertAllStagesStopped(() =>
94-
{
92+
await this.AssertAllStagesStoppedAsync(() => {
9593
var t = SumSource.Via(SumFlow).RunWith(SumSink, Materializer);
9694
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
9795
t.Result.Should().Be(Expected);
98-
96+
return Task.CompletedTask;
9997
}, Materializer);
10098
}
10199

102100
[Fact]
103-
public void A_Sum_must_propagate_an_error()
101+
public async Task A_Sum_must_propagate_an_error()
104102
{
105-
this.AssertAllStagesStopped(() =>
106-
{
103+
await this.AssertAllStagesStoppedAsync(() => {
107104
var error = new TestException("test");
108105
var task = InputSource.Select(x =>
109106
{
@@ -113,14 +110,14 @@ public void A_Sum_must_propagate_an_error()
113110
}).RunSum((i, i1) => 0, Materializer);
114111

115112
task.Invoking(t => t.Wait(TimeSpan.FromSeconds(3))).Should().Throw<TestException>().WithMessage("test");
113+
return Task.CompletedTask;
116114
}, Materializer);
117115
}
118116

119117
[Fact]
120-
public void A_Sum_must_complete_task_with_failure_when_reduce_function_throws_and_the_supervisor_strategy_decides_to_stop()
118+
public async Task A_Sum_must_complete_task_with_failure_when_reduce_function_throws_and_the_supervisor_strategy_decides_to_stop()
121119
{
122-
this.AssertAllStagesStopped(() =>
123-
{
120+
await this.AssertAllStagesStoppedAsync(() => {
124121
var error = new TestException("test");
125122
var task = InputSource.RunSum((x, y) =>
126123
{
@@ -130,13 +127,14 @@ public void A_Sum_must_complete_task_with_failure_when_reduce_function_throws_an
130127
}, Materializer);
131128

132129
task.Invoking(t => t.Wait(TimeSpan.FromSeconds(3))).Should().Throw<TestException>().WithMessage("test");
130+
return Task.CompletedTask;
133131
}, Materializer);
134132
}
135133

136134
[Fact]
137-
public void A_Sum_must_resume_with_the_accumulated_state_when_the_reduce_funtion_throws_and_the_supervisor_strategy_decides_to_resume()
135+
public async Task A_Sum_must_resume_with_the_accumulated_state_when_the_reduce_funtion_throws_and_the_supervisor_strategy_decides_to_resume()
138136
{
139-
this.AssertAllStagesStopped(async() =>
137+
await this.AssertAllStagesStoppedAsync(async() =>
140138
{
141139
var error = new Exception("boom");
142140
var sum = Sink.Sum((int x, int y) =>
@@ -155,9 +153,9 @@ public void A_Sum_must_resume_with_the_accumulated_state_when_the_reduce_funtion
155153
}
156154

157155
[Fact]
158-
public void A_Aggregate_must_resume_and_reset_the_state_when_the_reduce_funtion_throws_and_the_supervisor_strategy_decides_to_restart()
156+
public async Task A_Aggregate_must_resume_and_reset_the_state_when_the_reduce_funtion_throws_and_the_supervisor_strategy_decides_to_restart()
159157
{
160-
this.AssertAllStagesStopped(async() =>
158+
await this.AssertAllStagesStoppedAsync(async() =>
161159
{
162160
var error = new Exception("boom");
163161
var sum = Sink.Sum((int x, int y) =>
@@ -176,44 +174,44 @@ public void A_Aggregate_must_resume_and_reset_the_state_when_the_reduce_funtion_
176174
}
177175

178176
[Fact]
179-
public void A_Sum_must_fail_on_Empty_stream_using_Source_RunSum()
177+
public async Task A_Sum_must_fail_on_Empty_stream_using_Source_RunSum()
180178
{
181-
this.AssertAllStagesStopped(() =>
182-
{
179+
await this.AssertAllStagesStoppedAsync(() => {
183180
var result = Source.Empty<int>().RunSum((i, i1) => i + i1, Materializer);
184181
result.Invoking(t => t.Wait(TimeSpan.FromSeconds(3)))
185182
.Should().Throw<NoSuchElementException>()
186183
.And.Message.Should()
187184
.Contain("empty stream");
185+
return Task.CompletedTask;
188186
}, Materializer);
189187
}
190188

191189
[Fact]
192-
public void A_Sum_must_fail_on_Empty_stream_using_Flow_Sum()
190+
public async Task A_Sum_must_fail_on_Empty_stream_using_Flow_Sum()
193191
{
194-
this.AssertAllStagesStopped(() =>
195-
{
196-
var result = Source.Empty<int>()
197-
.Via(SumFlow)
198-
.RunWith(Sink.Aggregate<int, int>(0, (i, i1) => i + i1), Materializer);
192+
await this.AssertAllStagesStoppedAsync(() => {
193+
var result = Source.Empty<int>()
194+
.Via(SumFlow)
195+
.RunWith(Sink.Aggregate<int, int>(0, (i, i1) => i + i1), Materializer);
199196
result.Invoking(t => t.Wait(TimeSpan.FromSeconds(3)))
200197
.Should().Throw<NoSuchElementException>()
201198
.And.Message.Should()
202199
.Contain("empty stream");
200+
return Task.CompletedTask;
203201
}, Materializer);
204202
}
205203

206204
[Fact]
207-
public void A_Sum_must_fail_on_Empty_stream_using_Sink_Sum()
205+
public async Task A_Sum_must_fail_on_Empty_stream_using_Sink_Sum()
208206
{
209-
this.AssertAllStagesStopped(() =>
210-
{
211-
var result = Source.Empty<int>()
212-
.RunWith(SumSink, Materializer);
207+
await this.AssertAllStagesStoppedAsync(() => {
208+
var result = Source.Empty<int>()
209+
.RunWith(SumSink, Materializer);
213210
result.Invoking(t => t.Wait(TimeSpan.FromSeconds(3)))
214211
.Should().Throw<NoSuchElementException>()
215212
.And.Message.Should()
216213
.Contain("empty stream");
214+
return Task.CompletedTask;
217215
}, Materializer);
218216
}
219217
}

0 commit comments

Comments
 (0)