Skip to content

Commit d925fd5

Browse files
authored
[44-74] GraphMergeSpec (#6591)
* [44-74] `GraphMergeSpec` * Changes to `async` TestKit
1 parent 5d68c3a commit d925fd5

File tree

1 file changed

+37
-42
lines changed

1 file changed

+37
-42
lines changed

src/core/Akka.Streams.Tests/Dsl/GraphMergeSpec.cs

+37-42
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using System.Collections.Generic;
1010
using System.Linq;
1111
using System.Threading;
12+
using System.Threading.Tasks;
1213
using Akka.Streams.Dsl;
1314
using Akka.Streams.TestKit;
1415
using Akka.TestKit;
@@ -46,10 +47,9 @@ public MergeFixture(GraphDsl.Builder<NotUsed> builder) : base(builder)
4647
}
4748

4849
[Fact]
49-
public void A_Merge_must_work_in_the_happy_case()
50+
public async Task A_Merge_must_work_in_the_happy_case()
5051
{
51-
this.AssertAllStagesStopped(() =>
52-
{
52+
await this.AssertAllStagesStoppedAsync(async() => {
5353
// Different input sizes(4 and 6)
5454
var source1 = Source.From(Enumerable.Range(0, 4));
5555
var source2 = Source.From(Enumerable.Range(4, 6));
@@ -63,27 +63,27 @@ public void A_Merge_must_work_in_the_happy_case()
6363
var sink = Sink.FromSubscriber(probe);
6464

6565
b.From(source1).To(m1.In(0));
66-
b.From(m1.Out).Via(Flow.Create<int>().Select(x => x*2)).To(m2.In(0));
67-
b.From(m2.Out).Via(Flow.Create<int>().Select(x => x / 2).Select(x=>x+1)).To(sink);
66+
b.From(m1.Out).Via(Flow.Create<int>().Select(x => x * 2)).To(m2.In(0));
67+
b.From(m2.Out).Via(Flow.Create<int>().Select(x => x / 2).Select(x => x + 1)).To(sink);
6868
b.From(source2).To(m1.In(1));
6969
b.From(source3).To(m2.In(1));
7070

7171
return ClosedShape.Instance;
7272
})).Run(Materializer);
7373

74-
var subscription = probe.ExpectSubscription();
74+
var subscription = await probe.ExpectSubscriptionAsync();
7575
var collected = new List<int>();
7676
for (var i = 1; i <= 10; i++)
7777
{
7878
subscription.Request(1);
79-
collected.Add(probe.ExpectNext());
79+
collected.Add(await probe.ExpectNextAsync());
8080
}
8181

8282
collected.Where(i => i <= 4).ShouldOnlyContainInOrder(1, 2, 3, 4);
8383
collected.Where(i => i >= 5).ShouldOnlyContainInOrder(5, 6, 7, 8, 9, 10);
8484

8585
collected.Should().BeEquivalentTo(Enumerable.Range(1, 10).ToArray());
86-
probe.ExpectComplete();
86+
await probe.ExpectCompleteAsync();
8787
}, Materializer);
8888
}
8989

@@ -109,7 +109,7 @@ public void A_Merge_must_work_with_one_way_merge()
109109
}
110110

111111
[Fact]
112-
public void A_Merge_must_work_with_n_way_merge()
112+
public async Task A_Merge_must_work_with_n_way_merge()
113113
{
114114
var source1 = Source.Single(1);
115115
var source2 = Source.Single(2);
@@ -135,76 +135,71 @@ public void A_Merge_must_work_with_n_way_merge()
135135
return ClosedShape.Instance;
136136
})).Run(Materializer);
137137

138-
var subscription = probe.ExpectSubscription();
138+
var subscription = await probe.ExpectSubscriptionAsync();
139139

140140
var collected = new List<int>();
141141
for (var i = 1; i <= 5; i++)
142142
{
143143
subscription.Request(1);
144-
collected.Add(probe.ExpectNext());
144+
collected.Add(await probe.ExpectNextAsync());
145145
}
146146

147147
collected.Should().BeEquivalentTo(Enumerable.Range(1, 5));
148-
probe.ExpectComplete();
148+
await probe.ExpectCompleteAsync();
149149
}
150150

151151
[Fact]
152-
public void A_Merge_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
152+
public async Task A_Merge_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
153153
{
154-
this.AssertAllStagesStopped(() =>
155-
{
154+
await this.AssertAllStagesStoppedAsync(async() => {
156155
var subscriber1 = Setup(CompletedPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
157-
var subscription1 = subscriber1.ExpectSubscription();
156+
var subscription1 = await subscriber1.ExpectSubscriptionAsync();
158157
subscription1.Request(4);
159-
subscriber1.ExpectNext( 1, 2, 3, 4).ExpectComplete();
158+
await subscriber1.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();
160159

161160
var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), CompletedPublisher<int>());
162-
var subscription2 = subscriber2.ExpectSubscription();
161+
var subscription2 = await subscriber2.ExpectSubscriptionAsync();
163162
subscription2.Request(4);
164-
subscriber2.ExpectNext( 1, 2, 3, 4).ExpectComplete();
163+
await subscriber2.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();
165164
}, Materializer);
166165
}
167166

168167
[Fact]
169-
public void A_Merge_must_work_with_one_delayed_completed_and_one_nonempty_publisher()
168+
public async Task A_Merge_must_work_with_one_delayed_completed_and_one_nonempty_publisher()
170169
{
171-
this.AssertAllStagesStopped(() =>
172-
{
170+
await this.AssertAllStagesStoppedAsync(async() => {
173171
var subscriber1 = Setup(SoonToCompletePublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
174-
var subscription1 = subscriber1.ExpectSubscription();
172+
var subscription1 = await subscriber1.ExpectSubscriptionAsync();
175173
subscription1.Request(4);
176-
subscriber1.ExpectNext( 1, 2, 3, 4).ExpectComplete();
174+
await subscriber1.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();
177175

178176
var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), SoonToCompletePublisher<int>());
179-
var subscription2 = subscriber2.ExpectSubscription();
177+
var subscription2 = await subscriber2.ExpectSubscriptionAsync();
180178
subscription2.Request(4);
181-
subscriber2.ExpectNext( 1, 2, 3, 4).ExpectComplete();
179+
await subscriber2.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();
182180
}, Materializer);
183181
}
184182

185183
[Fact(Skip = "This is nondeterministic, multiple scenarios can happen")]
186-
public void A_Merge_must_work_with_one_immediately_failed_and_one_nonempty_publisher()
184+
public async Task A_Merge_must_work_with_one_immediately_failed_and_one_nonempty_publisher()
187185
{
188-
this.AssertAllStagesStopped(() =>
189-
{
190-
186+
await this.AssertAllStagesStoppedAsync(() => {
187+
return Task.CompletedTask;
191188
}, Materializer);
192189
}
193190

194191
[Fact(Skip = "This is nondeterministic, multiple scenarios can happen")]
195-
public void A_Merge_must_work_with_one_delayed_failed_and_one_nonempty_publisher()
192+
public async Task A_Merge_must_work_with_one_delayed_failed_and_one_nonempty_publisher()
196193
{
197-
this.AssertAllStagesStopped(() =>
198-
{
199-
194+
await this.AssertAllStagesStoppedAsync(() => {
195+
return Task.CompletedTask;
200196
}, Materializer);
201197
}
202198

203199
[Fact]
204-
public void A_Merge_must_pass_along_early_cancellation()
200+
public async Task A_Merge_must_pass_along_early_cancellation()
205201
{
206-
this.AssertAllStagesStopped(() =>
207-
{
202+
await this.AssertAllStagesStoppedAsync(async() => {
208203
var up1 = this.CreateManualPublisherProbe<int>();
209204
var up2 = this.CreateManualPublisherProbe<int>();
210205
var down = this.CreateManualSubscriberProbe<int>();
@@ -224,14 +219,14 @@ public void A_Merge_must_pass_along_early_cancellation()
224219
return ClosedShape.Instance;
225220
})).Run(Materializer);
226221

227-
var downstream = down.ExpectSubscription();
222+
var downstream = await down.ExpectSubscriptionAsync();
228223
downstream.Cancel();
229224
up1.Subscribe(t.Item1);
230225
up2.Subscribe(t.Item2);
231-
var upSub1 = up1.ExpectSubscription();
232-
upSub1.ExpectCancellation();
233-
var upSub2 = up2.ExpectSubscription();
234-
upSub2.ExpectCancellation();
226+
var upSub1 = await up1.ExpectSubscriptionAsync();
227+
await upSub1.ExpectCancellationAsync();
228+
var upSub2 = await up2.ExpectSubscriptionAsync();
229+
await upSub2.ExpectCancellationAsync();
235230
}, Materializer);
236231
}
237232
}

0 commit comments

Comments
 (0)