Skip to content

Commit c20e28f

Browse files
eabaAaronontheweb
andauthored
[8-74]FlowConcatAllSpec (#6549)
Co-authored-by: Aaron Stannard <[email protected]>
1 parent b07771a commit c20e28f

File tree

1 file changed

+32
-31
lines changed

1 file changed

+32
-31
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowConcatAllSpec.cs

+32-31
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
using Akka.Streams.Dsl.Internal;
1515
using Akka.TestKit;
1616
using Reactive.Streams;
17+
using System.Threading.Tasks;
1718

1819
// ReSharper disable InvokeAsExtensionMethod
1920

@@ -32,17 +33,16 @@ public FlowConcatAllSpec(ITestOutputHelper helper) : base(helper)
3233
private static readonly TestException TestException = new TestException("test");
3334

3435
[Fact]
35-
public void ConcatAll_must_work_in_the_happy_case()
36+
public async Task ConcatAll_must_work_in_the_happy_case()
3637
{
37-
this.AssertAllStagesStopped(() =>
38-
{
39-
var s1 = Source.From(new[] {1, 2});
40-
var s2 = Source.From(new int[] {});
41-
var s3 = Source.From(new[] {3});
42-
var s4 = Source.From(new[] {4, 5, 6});
43-
var s5 = Source.From(new[] {7, 8, 9, 10});
38+
await this.AssertAllStagesStoppedAsync(() => {
39+
var s1 = Source.From(new[] { 1, 2 });
40+
var s2 = Source.From(new int[] { });
41+
var s3 = Source.From(new[] { 3 });
42+
var s4 = Source.From(new[] { 4, 5, 6 });
43+
var s5 = Source.From(new[] { 7, 8, 9, 10 });
4444

45-
var main = Source.From(new[] {s1, s2, s3, s4, s5});
45+
var main = Source.From(new[] { s1, s2, s3, s4, s5 });
4646

4747
var subscriber = this.CreateManualSubscriberProbe<int>();
4848
main.ConcatMany(s => s).To(Sink.FromSubscriber(subscriber)).Run(Materializer);
@@ -53,6 +53,7 @@ public void ConcatAll_must_work_in_the_happy_case()
5353

5454
subscription.Request(1);
5555
subscriber.ExpectComplete();
56+
return Task.CompletedTask;
5657
}, Materializer);
5758
}
5859

@@ -75,10 +76,9 @@ public void ConcatAll_must_work_together_with_SplitWhen()
7576
subscriber.ExpectComplete();}
7677

7778
[Fact]
78-
public void ConcatAll_must_on_OnError_on_master_stream_cancel_the_current_open_substream_and_signal_error()
79+
public async Task ConcatAll_must_on_OnError_on_master_stream_cancel_the_current_open_substream_and_signal_error()
7980
{
80-
this.AssertAllStagesStopped(() =>
81-
{
81+
await this.AssertAllStagesStoppedAsync(() => {
8282
var publisher = this.CreateManualPublisherProbe<Source<int, NotUsed>>();
8383
var subscriber = this.CreateManualSubscriberProbe<int>();
8484
Source.FromPublisher(publisher)
@@ -99,14 +99,14 @@ public void ConcatAll_must_on_OnError_on_master_stream_cancel_the_current_open_s
9999
upstream.SendError(TestException);
100100
subscriber.ExpectError().Should().Be(TestException);
101101
subUpstream.ExpectCancellation();
102+
return Task.CompletedTask;
102103
}, Materializer);
103104
}
104105

105106
[Fact]
106-
public void ConcatAll_must_on_OnError_on_master_stream_cancel_the_currently_opening_substream_and_signal_error()
107+
public async Task ConcatAll_must_on_OnError_on_master_stream_cancel_the_currently_opening_substream_and_signal_error()
107108
{
108-
this.AssertAllStagesStopped(() =>
109-
{
109+
await this.AssertAllStagesStoppedAsync(() => {
110110
var publisher = this.CreateManualPublisherProbe<Source<int, NotUsed>>();
111111
var subscriber = this.CreateManualSubscriberProbe<int>();
112112
Source.FromPublisher(publisher)
@@ -130,18 +130,18 @@ public void ConcatAll_must_on_OnError_on_master_stream_cancel_the_currently_open
130130

131131
subscriber.ExpectError().Should().Be(TestException);
132132
subUpstream.ExpectCancellation();
133+
return Task.CompletedTask;
133134
}, Materializer);
134135
}
135136

136137
[Fact]
137-
public void ConcatAll_must_on_OnError_on_opening_substream_cancel_the_master_stream_and_signal_error()
138+
public async Task ConcatAll_must_on_OnError_on_opening_substream_cancel_the_master_stream_and_signal_error()
138139
{
139-
this.AssertAllStagesStopped(() =>
140-
{
140+
await this.AssertAllStagesStoppedAsync(() => {
141141
var publisher = this.CreateManualPublisherProbe<Source<int, NotUsed>>();
142142
var subscriber = this.CreateManualSubscriberProbe<int>();
143143
Source.FromPublisher(publisher)
144-
.ConcatMany<Source<int,NotUsed>,int,NotUsed>(x => { throw TestException; })
144+
.ConcatMany<Source<int, NotUsed>, int, NotUsed>(x => { throw TestException; })
145145
.To(Sink.FromSubscriber(subscriber))
146146
.Run(Materializer);
147147

@@ -155,14 +155,14 @@ public void ConcatAll_must_on_OnError_on_opening_substream_cancel_the_master_str
155155
upstream.SendNext(substreamFlow);
156156
subscriber.ExpectError().Should().Be(TestException);
157157
upstream.ExpectCancellation();
158+
return Task.CompletedTask;
158159
}, Materializer);
159160
}
160161

161162
[Fact]
162-
public void ConcatAll_must_on_OnError_on_open_substream_cancel_the_master_stream_and_signal_error()
163+
public async Task ConcatAll_must_on_OnError_on_open_substream_cancel_the_master_stream_and_signal_error()
163164
{
164-
this.AssertAllStagesStopped(() =>
165-
{
165+
await this.AssertAllStagesStoppedAsync(() => {
166166
var publisher = this.CreateManualPublisherProbe<Source<int, NotUsed>>();
167167
var subscriber = this.CreateManualSubscriberProbe<int>();
168168
Source.FromPublisher(publisher)
@@ -183,14 +183,14 @@ public void ConcatAll_must_on_OnError_on_open_substream_cancel_the_master_stream
183183
subUpstream.SendError(TestException);
184184
subscriber.ExpectError().Should().Be(TestException);
185185
upstream.ExpectCancellation();
186+
return Task.CompletedTask;
186187
}, Materializer);
187188
}
188189

189190
[Fact]
190-
public void ConcatAll_must_on_cancellation_cancel_the_current_open_substream_and_the_master_stream()
191+
public async Task ConcatAll_must_on_cancellation_cancel_the_current_open_substream_and_the_master_stream()
191192
{
192-
this.AssertAllStagesStopped(() =>
193-
{
193+
await this.AssertAllStagesStoppedAsync(() => {
194194
var publisher = this.CreateManualPublisherProbe<Source<int, NotUsed>>();
195195
var subscriber = this.CreateManualSubscriberProbe<int>();
196196
Source.FromPublisher(publisher)
@@ -212,14 +212,14 @@ public void ConcatAll_must_on_cancellation_cancel_the_current_open_substream_and
212212

213213
subUpstream.ExpectCancellation();
214214
upstream.ExpectCancellation();
215+
return Task.CompletedTask;
215216
}, Materializer);
216217
}
217218

218219
[Fact]
219-
public void ConcatAll_must_on_cancellation_cancel_the_currently_opening_substream_and_the_master_stream()
220+
public async Task ConcatAll_must_on_cancellation_cancel_the_currently_opening_substream_and_the_master_stream()
220221
{
221-
this.AssertAllStagesStopped(() =>
222-
{
222+
await this.AssertAllStagesStoppedAsync(() => {
223223
var publisher = this.CreateManualPublisherProbe<Source<int, NotUsed>>();
224224
var subscriber = this.CreateManualSubscriberProbe<int>();
225225
Source.FromPublisher(publisher)
@@ -243,14 +243,14 @@ public void ConcatAll_must_on_cancellation_cancel_the_currently_opening_substrea
243243

244244
subUpstream.ExpectCancellation();
245245
upstream.ExpectCancellation();
246+
return Task.CompletedTask;
246247
}, Materializer);
247248
}
248249

249250
[Fact]
250-
public void ConcatAll_must_pass_along_early_cancellation()
251+
public async Task ConcatAll_must_pass_along_early_cancellation()
251252
{
252-
this.AssertAllStagesStopped(() =>
253-
{
253+
await this.AssertAllStagesStoppedAsync(() => {
254254
var up = this.CreateManualPublisherProbe<Source<int, NotUsed>>();
255255
var down = this.CreateManualSubscriberProbe<int>();
256256

@@ -264,6 +264,7 @@ public void ConcatAll_must_pass_along_early_cancellation()
264264
up.Subscribe(flowSubscriber);
265265
var upSub = up.ExpectSubscription();
266266
upSub.ExpectCancellation();
267+
return Task.CompletedTask;
267268
}, Materializer);
268269
}
269270
}

0 commit comments

Comments
 (0)