diff --git a/src/core/Akka.Streams.Tests/Dsl/GraphBroadcastSpec.cs b/src/core/Akka.Streams.Tests/Dsl/GraphBroadcastSpec.cs index eb0a9d02bc5..f907b9a728e 100644 --- a/src/core/Akka.Streams.Tests/Dsl/GraphBroadcastSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/GraphBroadcastSpec.cs @@ -31,13 +31,12 @@ public GraphBroadcastSpec(ITestOutputHelper helper) : base(helper) } [Fact] - public void A_Broadcast_must_broadcast_to_other_subscriber() + public async Task A_Broadcast_must_broadcast_to_other_subscriber() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var c1 = this.CreateManualSubscriberProbe(); var c2 = this.CreateManualSubscriberProbe(); - RunnableGraph.FromGraph(GraphDsl.Create (b => + RunnableGraph.FromGraph(GraphDsl.Create(b => { var broadcast = b.Add(new Broadcast(2)); var source = Source.From(Enumerable.Range(1, 3)); @@ -51,50 +50,49 @@ public void A_Broadcast_must_broadcast_to_other_subscriber() return ClosedShape.Instance; })).Run(Materializer); - var sub1 = c1.ExpectSubscription(); - var sub2 = c2.ExpectSubscription(); + var sub1 = await c1.ExpectSubscriptionAsync(); + var sub2 = await c2.ExpectSubscriptionAsync(); sub1.Request(1); sub2.Request(2); - c1.ExpectNext(1).ExpectNoMsg(TimeSpan.FromMilliseconds(100)); - c2.ExpectNext( 1, 2).ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await c1.ExpectNext(1).ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); + await c2.ExpectNext(1, 2).ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); sub1.Request(3); - c1.ExpectNext( 2, 3).ExpectComplete(); + await c1.ExpectNext(2, 3).ExpectCompleteAsync(); sub2.Request(3); - c2.ExpectNext(3).ExpectComplete(); + await c2.ExpectNext(3).ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_Broadcast_must_work_with_one_way_broadcast() + public async Task A_Broadcast_must_work_with_one_way_broadcast() { - this.AssertAllStagesStopped(() => - { - var t = Source.FromGraph(GraphDsl.Create(b => - { - var broadcast = b.Add(new Broadcast(1)); + await this.AssertAllStagesStoppedAsync(() => { + var t = Source.FromGraph(GraphDsl.Create(b => + { + var broadcast = b.Add(new Broadcast(1)); var source = b.Add(Source.From(Enumerable.Range(1, 3))); - + b.From(source).To(broadcast.In); - - return new SourceShape(broadcast.Out(0)); - })).RunAggregate(new List(), (list, i) => - { - list.Add(i); - return list; + + return new SourceShape(broadcast.Out(0)); + })).RunAggregate(new List(), (list, i) => + { + list.Add(i); + return list; }, Materializer); t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - t.Result.Should().BeEquivalentTo(new[] {1, 2, 3}); + t.Result.Should().BeEquivalentTo(new[] { 1, 2, 3 }); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_Broadcast_must_work_with_n_way_broadcast() + public async Task A_Broadcast_must_work_with_n_way_broadcast() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var headSink = Sink.First>(); var t = RunnableGraph.FromGraph(GraphDsl.Create(headSink, headSink, headSink, headSink, headSink, ValueTuple.Create, @@ -116,14 +114,14 @@ public void A_Broadcast_must_work_with_n_way_broadcast() task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); foreach (var list in task.Result) list.Should().BeEquivalentTo(new[] { 1, 2, 3 }); + return Task.CompletedTask; }, Materializer); } [Fact(Skip="We don't have enough overloads for GraphDsl.Create")] - public void A_Broadcast_must_with_22_way_broadcast() + public async Task A_Broadcast_must_with_22_way_broadcast() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { //var headSink = Sink.First>(); //var t = RunnableGraph.FromGraph(GraphDsl.Create(headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, ValueTuple.Create, @@ -162,15 +160,14 @@ public void A_Broadcast_must_with_22_way_broadcast() //task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); //foreach (var list in task.Result) // list.Should().BeEquivalentTo(new[] { 1, 2, 3 }); - + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_Broadcast_must_produce_to_other_even_though_downstream_cancels() + public async Task A_Broadcast_must_produce_to_other_even_though_downstream_cancels() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var c1 = this.CreateManualSubscriberProbe(); var c2 = this.CreateManualSubscriberProbe(); RunnableGraph.FromGraph(GraphDsl.Create(b => @@ -187,20 +184,19 @@ public void A_Broadcast_must_produce_to_other_even_though_downstream_cancels() return ClosedShape.Instance; })).Run(Materializer); - var sub1 = c1.ExpectSubscription(); + var sub1 = await c1.ExpectSubscriptionAsync(); sub1.Cancel(); - var sub2 = c2.ExpectSubscription(); + var sub2 = await c2.ExpectSubscriptionAsync(); sub2.Request(3); - c2.ExpectNext( 1, 2, 3); - c2.ExpectComplete(); + c2.ExpectNext(1, 2, 3); + await c2.ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_Broadcast_must_produce_to_downstream_even_though_other_cancels() + public async Task A_Broadcast_must_produce_to_downstream_even_though_other_cancels() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var c1 = this.CreateManualSubscriberProbe(); var c2 = this.CreateManualSubscriberProbe(); RunnableGraph.FromGraph(GraphDsl.Create(b => @@ -217,20 +213,19 @@ public void A_Broadcast_must_produce_to_downstream_even_though_other_cancels() return ClosedShape.Instance; })).Run(Materializer); - var sub1 = c1.ExpectSubscription(); - var sub2 = c2.ExpectSubscription(); + var sub1 = await c1.ExpectSubscriptionAsync(); + var sub2 = await c2.ExpectSubscriptionAsync(); sub2.Cancel(); sub1.Request(3); - c1.ExpectNext( 1, 2, 3); - c1.ExpectComplete(); + c1.ExpectNext(1, 2, 3); + await c1.ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_Broadcast_must_cancel_upstream_when_downstreams_cancel() + public async Task A_Broadcast_must_cancel_upstream_when_downstreams_cancel() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var p1 = this.CreateManualPublisherProbe(); var c1 = this.CreateManualSubscriberProbe(); var c2 = this.CreateManualSubscriberProbe(); @@ -248,30 +243,29 @@ public void A_Broadcast_must_cancel_upstream_when_downstreams_cancel() return ClosedShape.Instance; })).Run(Materializer); - var bSub = p1.ExpectSubscription(); - var sub1 = c1.ExpectSubscription(); - var sub2 = c2.ExpectSubscription(); + var bSub = await p1.ExpectSubscriptionAsync(); + var sub1 = await c1.ExpectSubscriptionAsync(); + var sub2 = await c2.ExpectSubscriptionAsync(); sub1.Request(3); sub2.Request(3); - p1.ExpectRequest(bSub, 16); + await p1.ExpectRequestAsync(bSub, 16); bSub.SendNext(1); - c1.ExpectNext(1); - c2.ExpectNext(1); + await c1.ExpectNextAsync(1); + await c2.ExpectNextAsync(1); bSub.SendNext(2); - c1.ExpectNext(2); - c2.ExpectNext(2); + await c1.ExpectNextAsync(2); + await c2.ExpectNextAsync(2); sub1.Cancel(); sub2.Cancel(); - bSub.ExpectCancellation(); + await bSub.ExpectCancellationAsync(); }, Materializer); } [Fact] - public void A_Broadcast_must_pass_along_early_cancellation() + public async Task A_Broadcast_must_pass_along_early_cancellation() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var c1 = this.CreateManualSubscriberProbe(); var c2 = this.CreateManualSubscriberProbe(); @@ -287,22 +281,21 @@ public void A_Broadcast_must_pass_along_early_cancellation() var up = this.CreateManualPublisherProbe(); - var downSub1 = c1.ExpectSubscription(); - var downSub2 = c2.ExpectSubscription(); + var downSub1 = await c1.ExpectSubscriptionAsync(); + var downSub2 = await c2.ExpectSubscriptionAsync(); downSub1.Cancel(); downSub2.Cancel(); up.Subscribe(s); - var upSub = up.ExpectSubscription(); - upSub.ExpectCancellation(); + var upSub = await up.ExpectSubscriptionAsync(); + await upSub.ExpectCancellationAsync(); }, Materializer); } [Fact] - public void A_Broadcast_must_AltoTo_must_broadcast() + public async Task A_Broadcast_must_AltoTo_must_broadcast() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var p = this.SinkProbe(); var p2 = this.SinkProbe(); @@ -315,20 +308,19 @@ public void A_Broadcast_must_AltoTo_must_broadcast() var ps1 = t.Item1; var ps2 = t.Item2; - ps1.Request(6); - ps2.Request(6); - ps1.ExpectNext( 1, 2, 3, 4, 5, 6); - ps2.ExpectNext( 1, 2, 3, 4, 5, 6); - ps1.ExpectComplete(); - ps2.ExpectComplete(); + await ps1.RequestAsync(6); + await ps2.RequestAsync(6); + ps1.ExpectNext(1, 2, 3, 4, 5, 6); + ps2.ExpectNext(1, 2, 3, 4, 5, 6); + await ps1.ExpectCompleteAsync(); + await ps2.ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_Broadcast_must_AlsoTo_must_continue_if_sink_cancels() + public async Task A_Broadcast_must_AlsoTo_must_continue_if_sink_cancels() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var p = this.SinkProbe(); var p2 = this.SinkProbe(); @@ -340,11 +332,11 @@ public void A_Broadcast_must_AlsoTo_must_continue_if_sink_cancels() var ps1 = t.Item1; var ps2 = t.Item2; - - ps2.Request(6); + + await ps2.RequestAsync(6); ps1.Cancel(); - ps2.ExpectNext( 1, 2, 3, 4, 5, 6); - ps2.ExpectComplete(); + ps2.ExpectNext(1, 2, 3, 4, 5, 6); + await ps2.ExpectCompleteAsync(); }, Materializer); } }