diff --git a/src/core/Akka.Streams.Tests/Implementation/GraphStageLogicSpec.cs b/src/core/Akka.Streams.Tests/Implementation/GraphStageLogicSpec.cs index 07bd4d7d141..568708f77f0 100644 --- a/src/core/Akka.Streams.Tests/Implementation/GraphStageLogicSpec.cs +++ b/src/core/Akka.Streams.Tests/Implementation/GraphStageLogicSpec.cs @@ -8,6 +8,7 @@ using System; using System.Linq; using System.Threading; +using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; using Akka.Event; @@ -273,37 +274,34 @@ public GraphStageLogicSpec(ITestOutputHelper output) : base(output, Config) } [Fact] - public void A_GraphStageLogic_must_read_N_and_emit_N_before_completing() + public async Task A_GraphStageLogic_must_read_N_and_emit_N_before_completing() { - this.AssertAllStagesStopped(() => - { - Source.From(Enumerable.Range(1, 10)) - .Via(new ReadNEmitN(2)) - .RunWith(this.SinkProbe(), Materializer) - .Request(10) - .ExpectNext( 1, 2) - .ExpectComplete(); + await this.AssertAllStagesStoppedAsync(async() => { + await Source.From(Enumerable.Range(1, 10)) + .Via(new ReadNEmitN(2)) + .RunWith(this.SinkProbe(), Materializer) + .Request(10) + .ExpectNext(1, 2) + .ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_completes_before_N_is_sent() + public async Task A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_completes_before_N_is_sent() { - this.AssertAllStagesStopped(() => - { - Source.From(Enumerable.Range(1, 5)) - .Via(new ReadNEmitN(6)) - .RunWith(this.SinkProbe(), Materializer) - .Request(10) - .ExpectComplete(); + await this.AssertAllStagesStoppedAsync(async() => { + await Source.From(Enumerable.Range(1, 5)) + .Via(new ReadNEmitN(6)) + .RunWith(this.SinkProbe(), Materializer) + .Request(10) + .ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_fails_before_N_is_sent() + public async Task A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_fails_before_N_is_sent() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var error = new ArgumentException("Don't argue like that!"); Source.From(Enumerable.Range(1, 5)) .Select(x => @@ -316,49 +314,47 @@ public void A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_fails_befo .RunWith(this.SinkProbe(), Materializer) .Request(10) .ExpectError().Should().Be(error); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_GraphStageLogic_must_read_N_should_provide_elements_read_if_OnComplete_happens_before_N_elements_have_been_seen() + public async Task A_GraphStageLogic_must_read_N_should_provide_elements_read_if_OnComplete_happens_before_N_elements_have_been_seen() { - this.AssertAllStagesStopped(() => - { - Source.From(Enumerable.Range(1, 5)) - .Via(new ReadNEmitRestOnComplete(6)) - .RunWith(this.SinkProbe(), Materializer) - .Request(10) - .ExpectNext( 1, 2, 3, 4, 5) - .ExpectComplete(); + await this.AssertAllStagesStoppedAsync(async() => { + await Source.From(Enumerable.Range(1, 5)) + .Via(new ReadNEmitRestOnComplete(6)) + .RunWith(this.SinkProbe(), Materializer) + .Request(10) + .ExpectNext(1, 2, 3, 4, 5) + .ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_GraphStageLogic_must_emit_all_things_before_completing() + public async Task A_GraphStageLogic_must_emit_all_things_before_completing() { - this.AssertAllStagesStopped(() => - { - Source.Empty() - .Via(new Emit1234().Named("testStage")) - .RunWith(this.SinkProbe(), Materializer) - .Request(5) - .ExpectNext(1) - //emitting with callback gives nondeterminism whether 2 or 3 will be pushed first - .ExpectNextUnordered(2, 3) - .ExpectNext(4) - .ExpectComplete(); + await this.AssertAllStagesStoppedAsync(async() => { + await Source.Empty() + .Via(new Emit1234().Named("testStage")) + .RunWith(this.SinkProbe(), Materializer) + .Request(5) + .ExpectNext(1) + //emitting with callback gives nondeterminism whether 2 or 3 will be pushed first + .ExpectNextUnordered(2, 3) + .ExpectNext(4) + .ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_GraphStageLogic_must_emit_all_things_before_completing_with_two_fused_stages() + public async Task A_GraphStageLogic_must_emit_all_things_before_completing_with_two_fused_stages() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var flow = Flow.Create().Via(new Emit1234()).Via(new Emit5678()); var g = Streams.Implementation.Fusing.Fusing.Aggressive(flow); - Source.Empty() + await Source.Empty() .Via(g) .RunWith(this.SinkProbe(), Materializer) .Request(9) @@ -370,19 +366,18 @@ public void A_GraphStageLogic_must_emit_all_things_before_completing_with_two_fu //emitting with callback gives nondeterminism whether 6 or 7 will be pushed first .ExpectNextUnordered(6, 7) .ExpectNext(8) - .ExpectComplete(); + .ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_GraphStageLogic_must_emit_all_things_before_completing_with_three_fused_stages() + public async Task A_GraphStageLogic_must_emit_all_things_before_completing_with_three_fused_stages() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var flow = Flow.Create().Via(new Emit1234()).Via(new PassThrough()).Via(new Emit5678()); var g = Streams.Implementation.Fusing.Fusing.Aggressive(flow); - Source.Empty() + await Source.Empty() .Via(g) .RunWith(this.SinkProbe(), Materializer) .Request(9) @@ -394,20 +389,20 @@ public void A_GraphStageLogic_must_emit_all_things_before_completing_with_three_ //emitting with callback gives nondeterminism whether 6 or 7 will be pushed first .ExpectNextUnordered(6, 7) .ExpectNext(8) - .ExpectComplete(); + .ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_GraphStageLogic_must_emit_properly_after_empty_iterable() + public async Task A_GraphStageLogic_must_emit_properly_after_empty_iterable() { - this.AssertAllStagesStopped(() => - { - Source.FromGraph(new EmitEmptyIterable()) - .RunWith(Sink.Seq(), Materializer) - .Result.Should() - .HaveCount(1) - .And.OnlyContain(x => x == 42); + await this.AssertAllStagesStoppedAsync(() => { + Source.FromGraph(new EmitEmptyIterable()) + .RunWith(Sink.Seq(), Materializer) + .Result.Should() + .HaveCount(1) + .And.OnlyContain(x => x == 42); + return Task.CompletedTask; }, Materializer); } @@ -425,17 +420,15 @@ public void A_GraphStageLogic_must_support_logging_in_custom_graphstage() } [Fact] - public void A_GraphStageLogic_must_invoke_livecycle_hooks_in_the_right_order() + public async Task A_GraphStageLogic_must_invoke_livecycle_hooks_in_the_right_order() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var g = new LifecycleStage(TestActor); - Source.Single(1).Via(g).RunWith(Sink.Ignore(), Materializer); - ExpectMsg("preStart"); - ExpectMsg("pulled"); - ExpectMsg("postStop"); - + await Source.Single(1).Via(g).RunWith(Sink.Ignore(), Materializer); + await ExpectMsgAsync("preStart"); + await ExpectMsgAsync("pulled"); + await ExpectMsgAsync("postStop"); }, Materializer); } @@ -492,14 +485,14 @@ public void A_GraphStageLogic_must_not_double_terminate_a_single_stage() { WithBaseBuilderSetup( new GraphStage>[] {new DoubleTerminateStage(TestActor), new PassThrough()}, - interpreter => + async interpreter => { interpreter.Complete(interpreter.Connections[0]); interpreter.Cancel(interpreter.Connections[1], SubscriptionWithCancelException.NoMoreElementsNeeded.Instance); interpreter.Execute(2); - ExpectMsg("postStop2"); - ExpectNoMsg(0); + await ExpectMsgAsync("postStop2"); + await ExpectNoMsgAsync(0); interpreter.IsCompleted.Should().BeFalse(); interpreter.IsSuspended.Should().BeFalse();