Skip to content

Commit 6234c39

Browse files
authored
[29-74] FlowSkipWhileSpec (#6576)
* [29-74] `FlowSkipWhileSpec` * Changes to `async` TestKit
1 parent 89dc26d commit 6234c39

File tree

1 file changed

+43
-46
lines changed

1 file changed

+43
-46
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowSkipWhileSpec.cs

+43-46
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
using System.Linq;
99
using System.Threading;
10+
using System.Threading.Tasks;
1011
using Akka.Streams.Dsl;
1112
using Akka.Streams.Supervision;
1213
using Akka.Streams.TestKit;
@@ -28,68 +29,64 @@ public FlowSkipWhileSpec(ITestOutputHelper helper) : base(helper)
2829
}
2930

3031
[Fact]
31-
public void A_SkipWhile_must_skip_while_predicate_is_true()
32+
public async Task A_SkipWhile_must_skip_while_predicate_is_true()
3233
{
33-
this.AssertAllStagesStopped(() =>
34-
{
35-
Source.From(Enumerable.Range(1, 4))
36-
.SkipWhile(x => x < 3)
37-
.RunWith(this.SinkProbe<int>(), Materializer)
38-
.Request(2)
39-
.ExpectNext( 3, 4)
40-
.ExpectComplete();
34+
await this.AssertAllStagesStoppedAsync(async() => {
35+
await Source.From(Enumerable.Range(1, 4))
36+
.SkipWhile(x => x < 3)
37+
.RunWith(this.SinkProbe<int>(), Materializer)
38+
.Request(2)
39+
.ExpectNext(3, 4)
40+
.ExpectCompleteAsync();
4141
}, Materializer);
4242
}
4343

4444
[Fact]
45-
public void A_SkipWhile_must_complete_the_future_for_an_empty_stream()
45+
public async Task A_SkipWhile_must_complete_the_future_for_an_empty_stream()
4646
{
47-
this.AssertAllStagesStopped(() =>
48-
{
49-
Source.Empty<int>()
50-
.SkipWhile(x => x < 2)
51-
.RunWith(this.SinkProbe<int>(), Materializer)
52-
.Request(1)
53-
.ExpectComplete();
47+
await this.AssertAllStagesStoppedAsync(async() => {
48+
await Source.Empty<int>()
49+
.SkipWhile(x => x < 2)
50+
.RunWith(this.SinkProbe<int>(), Materializer)
51+
.Request(1)
52+
.ExpectCompleteAsync();
5453
}, Materializer);
5554
}
5655

5756
[Fact]
58-
public void A_SkipWhile_must_continue_if_error()
57+
public async Task A_SkipWhile_must_continue_if_error()
5958
{
60-
this.AssertAllStagesStopped(() =>
61-
{
62-
Source.From(Enumerable.Range(1, 4)).SkipWhile(x =>
63-
{
64-
if (x < 3)
65-
return true;
66-
throw new TestException("");
67-
})
68-
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
69-
.RunWith(this.SinkProbe<int>(), Materializer)
70-
.Request(1)
71-
.ExpectComplete();
59+
await this.AssertAllStagesStoppedAsync(async() => {
60+
await Source.From(Enumerable.Range(1, 4)).SkipWhile(x =>
61+
{
62+
if (x < 3)
63+
return true;
64+
throw new TestException("");
65+
})
66+
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
67+
.RunWith(this.SinkProbe<int>(), Materializer)
68+
.Request(1)
69+
.ExpectCompleteAsync();
7270
}, Materializer);
7371
}
7472

7573
[Fact]
76-
public void A_SkipWhile_must_restart_with_strategy()
74+
public async Task A_SkipWhile_must_restart_with_strategy()
7775
{
78-
this.AssertAllStagesStopped(() =>
79-
{
80-
Source.From(Enumerable.Range(1, 4)).SkipWhile(x =>
81-
{
82-
if (x == 1 || x == 3)
83-
return true;
84-
if (x == 4)
85-
return false;
86-
throw new TestException("");
87-
})
88-
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider))
89-
.RunWith(this.SinkProbe<int>(), Materializer)
90-
.Request(1)
91-
.ExpectNext(4)
92-
.ExpectComplete();
76+
await this.AssertAllStagesStoppedAsync(async() => {
77+
await Source.From(Enumerable.Range(1, 4)).SkipWhile(x =>
78+
{
79+
if (x == 1 || x == 3)
80+
return true;
81+
if (x == 4)
82+
return false;
83+
throw new TestException("");
84+
})
85+
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider))
86+
.RunWith(this.SinkProbe<int>(), Materializer)
87+
.Request(1)
88+
.ExpectNext(4)
89+
.ExpectCompleteAsync();
9390
}, Materializer);
9491
}
9592
}

0 commit comments

Comments
 (0)