diff --git a/src/core/Akka.Streams.Tests/Dsl/GraphUnzipSpec.cs b/src/core/Akka.Streams.Tests/Dsl/GraphUnzipSpec.cs index 7cbffc47537..e0414b6ff25 100644 --- a/src/core/Akka.Streams.Tests/Dsl/GraphUnzipSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/GraphUnzipSpec.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Generic; using System.Threading; +using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.Streams.TestKit; using Akka.TestKit; @@ -28,10 +29,9 @@ public GraphUnzipSpec(ITestOutputHelper helper) : base (helper) } [Fact] - public void A_Unzip_must_unzip_to_two_subscribers() + public async Task A_Unzip_must_unzip_to_two_subscribers() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var c1 = this.CreateManualSubscriberProbe(); var c2 = this.CreateManualSubscriberProbe(); @@ -45,10 +45,10 @@ public void A_Unzip_must_unzip_to_two_subscribers() new KeyValuePair(2, "b"), new KeyValuePair(3, "c") }); - + b.From(source).To(unzip.In); b.From(unzip.Out0) - .Via(Flow.Create().Buffer(16, OverflowStrategy.Backpressure).Select(x => x*2)) + .Via(Flow.Create().Buffer(16, OverflowStrategy.Backpressure).Select(x => x * 2)) .To(Sink.FromSubscriber(c1)); b.From(unzip.Out1) .Via(Flow.Create().Buffer(16, OverflowStrategy.Backpressure)) @@ -57,29 +57,28 @@ public void A_Unzip_must_unzip_to_two_subscribers() return ClosedShape.Instance; })).Run(Materializer); - var sub1 = c1.ExpectSubscription(); - var sub2 = c2.ExpectSubscription(); + var sub1 = await c1.ExpectSubscriptionAsync(); + var sub2 = await c2.ExpectSubscriptionAsync(); sub1.Request(1); sub2.Request(2); - c1.ExpectNext(1*2); - c1.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); - c2.ExpectNext( "a", "b"); - c2.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await c1.ExpectNextAsync(1 * 2); + await c1.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); + c2.ExpectNext("a", "b"); + await c2.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); sub1.Request(3); - c1.ExpectNext( 2*2, 3*2); - c1.ExpectComplete(); + c1.ExpectNext(2 * 2, 3 * 2); + await c1.ExpectCompleteAsync(); sub2.Request(3); - c2.ExpectNext("c"); - c2.ExpectComplete(); + await c2.ExpectNextAsync("c"); + await c2.ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_Unzip_must_produce_to_right_downstream_even_though_left_downstream_cancels() + public async Task A_Unzip_must_produce_to_right_downstream_even_though_left_downstream_cancels() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var c1 = this.CreateManualSubscriberProbe(); var c2 = this.CreateManualSubscriberProbe(); @@ -101,21 +100,20 @@ public void A_Unzip_must_produce_to_right_downstream_even_though_left_downstream return ClosedShape.Instance; })).Run(Materializer); - var sub1 = c1.ExpectSubscription(); - var sub2 = c2.ExpectSubscription(); + var sub1 = await c1.ExpectSubscriptionAsync(); + var sub2 = await c2.ExpectSubscriptionAsync(); sub1.Cancel(); sub2.Request(3); - c2.ExpectNext( "a", "b", "c"); - c2.ExpectComplete(); + c2.ExpectNext("a", "b", "c"); + await c2.ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_Unzip_must_produce_to_left_downstream_even_though_right_downstream_cancels() + public async Task A_Unzip_must_produce_to_left_downstream_even_though_right_downstream_cancels() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var c1 = this.CreateManualSubscriberProbe(); var c2 = this.CreateManualSubscriberProbe(); @@ -137,19 +135,20 @@ public void A_Unzip_must_produce_to_left_downstream_even_though_right_downstream return ClosedShape.Instance; })).Run(Materializer); - var sub1 = c1.ExpectSubscription(); - var sub2 = c2.ExpectSubscription(); + var sub1 = await c1.ExpectSubscriptionAsync(); + var sub2 = await c2.ExpectSubscriptionAsync(); sub2.Cancel(); sub1.Request(3); - c1.ExpectNext( 1, 2, 3); - c1.ExpectComplete(); + c1.ExpectNext(1, 2, 3); + await c1.ExpectCompleteAsync(); + }, Materializer); } [Fact] - public void A_Unzip_must_not_push_twice_when_pull_is_followed_by_cancel_before_element_has_been_pushed() + public async Task A_Unzip_must_not_push_twice_when_pull_is_followed_by_cancel_before_element_has_been_pushed() { var c1 = this.CreateManualSubscriberProbe(); var c2 = this.CreateManualSubscriberProbe(); @@ -170,17 +169,17 @@ public void A_Unzip_must_not_push_twice_when_pull_is_followed_by_cancel_before_e return ClosedShape.Instance; })).Run(Materializer); - var sub1 = c1.ExpectSubscription(); - var sub2 = c2.ExpectSubscription(); + var sub1 = await c1.ExpectSubscriptionAsync(); + var sub2 = await c2.ExpectSubscriptionAsync(); sub2.Request(3); sub1.Request(3); sub2.Cancel(); c1.ExpectNext( 1,2,3); - c1.ExpectComplete(); + await c1.ExpectCompleteAsync(); } [Fact] - public void A_Unzip_must_not_loose_elements_when_pull_is_followed_by_cancel_before_other_sink_has_requested() + public async Task A_Unzip_must_not_loose_elements_when_pull_is_followed_by_cancel_before_other_sink_has_requested() { var c1 = this.CreateManualSubscriberProbe(); var c2 = this.CreateManualSubscriberProbe(); @@ -201,20 +200,19 @@ public void A_Unzip_must_not_loose_elements_when_pull_is_followed_by_cancel_befo return ClosedShape.Instance; })).Run(Materializer); - var sub1 = c1.ExpectSubscription(); - var sub2 = c2.ExpectSubscription(); + var sub1 = await c1.ExpectSubscriptionAsync(); + var sub2 = await c2.ExpectSubscriptionAsync(); sub2.Request(3); sub2.Cancel(); sub1.Request(3); - c1.ExpectNext( 1, 2, 3); - c1.ExpectComplete(); + c1.ExpectNext(1, 2, 3); + await c1.ExpectCompleteAsync(); } [Fact] - public void A_Unzip_must_cancel_upstream_when_downstream_cancel() + public async Task A_Unzip_must_cancel_upstream_when_downstream_cancel() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async () => { var p1 = this.CreateManualPublisherProbe>(); var c1 = this.CreateManualSubscriberProbe(); var c2 = this.CreateManualSubscriberProbe(); @@ -231,30 +229,29 @@ public void A_Unzip_must_cancel_upstream_when_downstream_cancel() return ClosedShape.Instance; })).Run(Materializer); - var p1Sub = p1.ExpectSubscription(); - var sub1 = c1.ExpectSubscription(); - var sub2 = c2.ExpectSubscription(); + var p1Sub = await p1.ExpectSubscriptionAsync(); + var sub1 = await c1.ExpectSubscriptionAsync(); + var sub2 = await c2.ExpectSubscriptionAsync(); sub1.Request(3); sub2.Request(3); - p1.ExpectRequest(p1Sub, 16); + await p1.ExpectRequestAsync(p1Sub, 16); p1Sub.SendNext(new KeyValuePair(1, "a")); - c1.ExpectNext(1); - c2.ExpectNext("a"); + await c1.ExpectNextAsync(1); + await c2.ExpectNextAsync("a"); p1Sub.SendNext(new KeyValuePair(2, "b")); - c1.ExpectNext(2); - c2.ExpectNext("b"); + await c1.ExpectNextAsync(2); + await c2.ExpectNextAsync("b"); sub1.Cancel(); sub2.Cancel(); - p1Sub.ExpectCancellation(); + await p1Sub.ExpectCancellationAsync(); }, Materializer); } [Fact] - public void A_Unzip_must_work_with_Zip() + public async Task A_Unzip_must_work_with_Zip() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var c1 = this.CreateManualSubscriberProbe<(int, string)>(); RunnableGraph.FromGraph(GraphDsl.Create(b => @@ -277,12 +274,12 @@ public void A_Unzip_must_work_with_Zip() return ClosedShape.Instance; })).Run(Materializer); - var sub1 = c1.ExpectSubscription(); + var sub1 = await c1.ExpectSubscriptionAsync(); sub1.Request(5); - c1.ExpectNext((1, "a")); - c1.ExpectNext((2, "b")); - c1.ExpectNext((3, "c")); - c1.ExpectComplete(); + await c1.ExpectNextAsync((1, "a")); + await c1.ExpectNextAsync((2, "b")); + await c1.ExpectNextAsync((3, "c")); + await c1.ExpectCompleteAsync(); }, Materializer); } }