Skip to content

Commit c083fcd

Browse files
authored
[23-74] FlowRecoverSpec (#6571)
* [23-74]`FlowRecoverSpec` * Changes to `async` TestKit
1 parent b1f6e2d commit c083fcd

File tree

1 file changed

+47
-50
lines changed

1 file changed

+47
-50
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowRecoverSpec.cs

+47-50
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
using System.Linq;
99
using System.Threading;
10+
using System.Threading.Tasks;
1011
using Akka.Streams.Dsl;
1112
using Akka.Streams.TestKit;
1213
using Akka.Streams.Util;
@@ -31,72 +32,68 @@ public FlowRecoverSpec(ITestOutputHelper helper) : base(helper)
3132
private static readonly TestException Ex = new TestException("test");
3233

3334
[Fact]
34-
public void A_Recover_must_recover_when_there_is_a_handler()
35+
public async Task A_Recover_must_recover_when_there_is_a_handler()
3536
{
36-
this.AssertAllStagesStopped(() =>
37-
{
38-
Source.From(Enumerable.Range(1, 4)).Select(x =>
39-
{
40-
if (x == 3)
41-
throw Ex;
42-
return x;
43-
})
44-
.Recover(_ => Option<int>.Create(0))
45-
.RunWith(this.SinkProbe<int>(), Materializer)
46-
.RequestNext(1)
47-
.RequestNext(2)
48-
.RequestNext(0)
49-
.Request(1)
50-
.ExpectComplete();
51-
37+
await this.AssertAllStagesStoppedAsync(async() => {
38+
await Source.From(Enumerable.Range(1, 4)).Select(x =>
39+
{
40+
if (x == 3)
41+
throw Ex;
42+
return x;
43+
})
44+
.Recover(_ => Option<int>.Create(0))
45+
.RunWith(this.SinkProbe<int>(), Materializer)
46+
.RequestNext(1)
47+
.RequestNext(2)
48+
.RequestNext(0)
49+
.Request(1)
50+
.ExpectCompleteAsync();
5251
}, Materializer);
5352
}
5453

5554
[Fact]
56-
public void A_Recover_must_failed_stream_if_handler_is_not_for_such_exception_type()
55+
public async Task A_Recover_must_failed_stream_if_handler_is_not_for_such_exception_type()
5756
{
58-
this.AssertAllStagesStopped(() =>
59-
{
60-
Source.From(Enumerable.Range(1, 3)).Select(x =>
61-
{
62-
if (x == 2)
63-
throw Ex;
64-
return x;
65-
})
66-
.Recover(_ => Option<int>.None)
67-
.RunWith(this.SinkProbe<int>(), Materializer)
68-
.RequestNext(1)
69-
.Request(1)
70-
.ExpectError().Should().Be(Ex);
57+
await this.AssertAllStagesStoppedAsync(() => {
58+
Source.From(Enumerable.Range(1, 3)).Select(x =>
59+
{
60+
if (x == 2)
61+
throw Ex;
62+
return x;
63+
})
64+
.Recover(_ => Option<int>.None)
65+
.RunWith(this.SinkProbe<int>(), Materializer)
66+
.RequestNext(1)
67+
.Request(1)
68+
.ExpectError().Should().Be(Ex);
69+
return Task.CompletedTask;
7170
}, Materializer);
7271
}
7372

7473
[Fact]
75-
public void A_Recover_must_not_influence_stream_when_there_is_no_exception()
74+
public async Task A_Recover_must_not_influence_stream_when_there_is_no_exception()
7675
{
77-
this.AssertAllStagesStopped(() =>
78-
{
79-
Source.From(Enumerable.Range(1, 3))
80-
.Select(x => x)
81-
.Recover(_ => Option<int>.Create(0))
82-
.RunWith(this.SinkProbe<int>(), Materializer)
83-
.Request(3)
84-
.ExpectNext( 1, 2, 3)
85-
.ExpectComplete();
76+
await this.AssertAllStagesStoppedAsync(async() => {
77+
await Source.From(Enumerable.Range(1, 3))
78+
.Select(x => x)
79+
.Recover(_ => Option<int>.Create(0))
80+
.RunWith(this.SinkProbe<int>(), Materializer)
81+
.Request(3)
82+
.ExpectNext(1, 2, 3)
83+
.ExpectCompleteAsync();
8684
}, Materializer);
8785
}
8886

8987
[Fact]
90-
public void A_Recover_must_finish_stream_if_it_is_empty()
88+
public async Task A_Recover_must_finish_stream_if_it_is_empty()
9189
{
92-
this.AssertAllStagesStopped(() =>
93-
{
94-
Source.Empty<int>()
95-
.Select(x => x)
96-
.Recover(_ => Option<int>.Create(0))
97-
.RunWith(this.SinkProbe<int>(), Materializer)
98-
.Request(1)
99-
.ExpectComplete();
90+
await this.AssertAllStagesStoppedAsync(async() => {
91+
await Source.Empty<int>()
92+
.Select(x => x)
93+
.Recover(_ => Option<int>.Create(0))
94+
.RunWith(this.SinkProbe<int>(), Materializer)
95+
.Request(1)
96+
.ExpectCompleteAsync();
10097
}, Materializer);
10198
}
10299
}

0 commit comments

Comments
 (0)