Skip to content

Commit 8729c00

Browse files
authored
[16-74]FlowInterleaveSpec (#6560)
1 parent 23245ae commit 8729c00

File tree

1 file changed

+28
-27
lines changed

1 file changed

+28
-27
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowInterleaveSpec.cs

+28-27
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using System.Collections.Generic;
1010
using System.Linq;
1111
using System.Threading;
12+
using System.Threading.Tasks;
1213
using Akka.Streams.Dsl;
1314
using Akka.Streams.TestKit;
1415
using Akka.Util.Internal;
@@ -32,10 +33,9 @@ protected override TestSubscriber.Probe<int> Setup(IPublisher<int> p1, IPublishe
3233
}
3334

3435
[Fact]
35-
public void An_Interleave_for_Flow_must_work_in_the_happy_case()
36+
public async Task An_Interleave_for_Flow_must_work_in_the_happy_case()
3637
{
37-
this.AssertAllStagesStopped(() =>
38-
{
38+
await this.AssertAllStagesStoppedAsync(() => {
3939
var probe = this.CreateManualSubscriberProbe<int>();
4040

4141
Source.From(Enumerable.Range(0, 4))
@@ -52,61 +52,61 @@ public void An_Interleave_for_Flow_must_work_in_the_happy_case()
5252
collected.Add(probe.ExpectNext());
5353
}
5454

55-
collected.Should().BeEquivalentTo(new[] {0, 1, 4, 7, 8, 9, 5, 2, 3, 10, 11, 6});
55+
collected.Should().BeEquivalentTo(new[] { 0, 1, 4, 7, 8, 9, 5, 2, 3, 10, 11, 6 });
5656
probe.ExpectComplete();
57+
return Task.CompletedTask;
5758
}, Materializer);
5859
}
5960

6061
[Fact]
61-
public void An_Interleave_for_Flow_must_work_when_segmentSize_is_not_equal_elements_in_stream()
62+
public async Task An_Interleave_for_Flow_must_work_when_segmentSize_is_not_equal_elements_in_stream()
6263
{
63-
this.AssertAllStagesStopped(() =>
64-
{
64+
await this.AssertAllStagesStoppedAsync(() => {
6565
var probe = this.CreateManualSubscriberProbe<int>();
6666

6767
Source.From(Enumerable.Range(0, 3))
6868
.Interleave(Source.From(Enumerable.Range(3, 3)), 2)
6969
.RunWith(Sink.FromSubscriber(probe), Materializer);
7070

7171
probe.ExpectSubscription().Request(10);
72-
probe.ExpectNext( 0, 1, 3, 4, 2, 5);
72+
probe.ExpectNext(0, 1, 3, 4, 2, 5);
7373
probe.ExpectComplete();
74+
return Task.CompletedTask;
7475
}, Materializer);
7576
}
7677

7778
[Fact]
78-
public void An_Interleave_for_Flow_must_work_with_segmentSize_1()
79+
public async Task An_Interleave_for_Flow_must_work_with_segmentSize_1()
7980
{
80-
this.AssertAllStagesStopped(() =>
81-
{
81+
await this.AssertAllStagesStoppedAsync(() => {
8282
var probe = this.CreateManualSubscriberProbe<int>();
8383

8484
Source.From(Enumerable.Range(0, 3))
8585
.Interleave(Source.From(Enumerable.Range(3, 3)), 1)
8686
.RunWith(Sink.FromSubscriber(probe), Materializer);
8787

8888
probe.ExpectSubscription().Request(10);
89-
probe.ExpectNext( 0, 3, 1, 4, 2, 5);
89+
probe.ExpectNext(0, 3, 1, 4, 2, 5);
9090
probe.ExpectComplete();
91+
return Task.CompletedTask;
9192
}, Materializer);
9293
}
9394

9495
[Fact]
95-
public void An_Interleave_for_Flow_must_not_work_with_segmentSize_0()
96+
public async Task An_Interleave_for_Flow_must_not_work_with_segmentSize_0()
9697
{
97-
this.AssertAllStagesStopped(() =>
98-
{
98+
await this.AssertAllStagesStoppedAsync(() => {
9999
var source = Source.From(Enumerable.Range(0, 3));
100100
source.Invoking(s => s.Interleave(Source.From(Enumerable.Range(3, 3)), 0))
101101
.Should().Throw<ArgumentException>();
102+
return Task.CompletedTask;
102103
}, Materializer);
103104
}
104105

105106
[Fact]
106-
public void An_Interleave_for_Flow_must_work_when_segmentSize_is_greater_than_stream_elements()
107+
public async Task An_Interleave_for_Flow_must_work_when_segmentSize_is_greater_than_stream_elements()
107108
{
108-
this.AssertAllStagesStopped(() =>
109-
{
109+
await this.AssertAllStagesStoppedAsync(() => {
110110
var probe = this.CreateManualSubscriberProbe<int>();
111111
Source.From(Enumerable.Range(0, 3))
112112
.Interleave(Source.From(Enumerable.Range(3, 13)), 10)
@@ -126,14 +126,14 @@ public void An_Interleave_for_Flow_must_work_when_segmentSize_is_greater_than_st
126126
Enumerable.Range(21, 5).ForEach(i => probe2.ExpectNext(i));
127127
Enumerable.Range(11, 10).ForEach(i => probe2.ExpectNext(i));
128128
probe2.ExpectComplete();
129+
return Task.CompletedTask;
129130
}, Materializer);
130131
}
131132

132133
[Fact]
133-
public void An_Interleave_for_Flow_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
134+
public async Task An_Interleave_for_Flow_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
134135
{
135-
this.AssertAllStagesStopped(() =>
136-
{
136+
await this.AssertAllStagesStoppedAsync(() => {
137137
var subscriber1 = Setup(CompletedPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
138138
var subscription1 = subscriber1.ExpectSubscription();
139139
subscription1.Request(4);
@@ -146,14 +146,14 @@ public void An_Interleave_for_Flow_must_work_with_one_immediately_completed_and_
146146
subscription2.Request(4);
147147
Enumerable.Range(1, 4).ForEach(i => subscriber2.ExpectNext(i));
148148
subscriber2.ExpectComplete();
149+
return Task.CompletedTask;
149150
}, Materializer);
150151
}
151152

152153
[Fact]
153-
public void An_Interleave_for_Flow_must_work_with_one_delayed_completed_and_one_nonempty_publisher()
154+
public async Task An_Interleave_for_Flow_must_work_with_one_delayed_completed_and_one_nonempty_publisher()
154155
{
155-
this.AssertAllStagesStopped(() =>
156-
{
156+
await this.AssertAllStagesStoppedAsync(() => {
157157
var subscriber1 = Setup(SoonToCompletePublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
158158
var subscription1 = subscriber1.ExpectSubscription();
159159
subscription1.Request(4);
@@ -166,6 +166,7 @@ public void An_Interleave_for_Flow_must_work_with_one_delayed_completed_and_one_
166166
subscription2.Request(4);
167167
Enumerable.Range(1, 4).ForEach(i => subscriber2.ExpectNext(i));
168168
subscriber2.ExpectComplete();
169+
return Task.CompletedTask;
169170
}, Materializer);
170171
}
171172

@@ -209,10 +210,9 @@ public void An_Interleave_for_Flow_must_work_with_one_delayed_failed_and_one_non
209210
}
210211

211212
[Fact]
212-
public void An_Interleave_for_Flow_must_pass_along_early_cancellation()
213+
public async Task An_Interleave_for_Flow_must_pass_along_early_cancellation()
213214
{
214-
this.AssertAllStagesStopped(() =>
215-
{
215+
await this.AssertAllStagesStoppedAsync(() => {
216216
var up1 = this.CreateManualPublisherProbe<int>();
217217
var up2 = this.CreateManualPublisherProbe<int>();
218218
var down = this.CreateManualSubscriberProbe<int>();
@@ -230,6 +230,7 @@ public void An_Interleave_for_Flow_must_pass_along_early_cancellation()
230230
up2.Subscribe(graphSubscriber2);
231231
up1.ExpectSubscription().ExpectCancellation();
232232
up2.ExpectSubscription().ExpectCancellation();
233+
return Task.CompletedTask;
233234
}, Materializer);
234235
}
235236
}

0 commit comments

Comments
 (0)