diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowForeachSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowForeachSpec.cs index 726b31b560d..8c0d2fde31f 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowForeachSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowForeachSpec.cs @@ -7,6 +7,7 @@ using System; using System.Linq; +using System.Threading.Tasks; using Akka.Actor; using Akka.Streams.Dsl; using Akka.Streams.TestKit; @@ -27,62 +28,60 @@ public FlowForeachSpec(ITestOutputHelper helper) : base(helper) } [Fact] - public void A_Foreach_must_call_the_procedure_for_each_element() + public async Task A_Foreach_must_call_the_procedure_for_each_element() { - this.AssertAllStagesStopped(() => - { - Source.From(Enumerable.Range(1, 3)).RunForeach(i => TestActor.Tell(i), Materializer).ContinueWith( - task => - { - if(task.IsCompleted && task.Exception == null) - TestActor.Tell("done"); + await this.AssertAllStagesStoppedAsync(async() => { + await Source.From(Enumerable.Range(1, 3)).RunForeach(i => + TestActor.Tell(i), Materializer) + .ContinueWith( + task => + { + if (task.IsCompleted && task.Exception == null) + TestActor.Tell("done"); }); - - ExpectMsg(1); - ExpectMsg(2); - ExpectMsg(3); - ExpectMsg("done"); + await ExpectMsgAsync(1); + await ExpectMsgAsync(2); + await ExpectMsgAsync(3); + await ExpectMsgAsync("done"); }, Materializer); } [Fact] - public void A_Foreach_must_complete_the_future_for_an_empty_stream() + public async Task A_Foreach_must_complete_the_future_for_an_empty_stream() { - this.AssertAllStagesStopped(() => - { - Source.Empty().RunForeach(i => TestActor.Tell(i), Materializer).ContinueWith( - task => - { - if (task.IsCompleted && task.Exception == null) - TestActor.Tell("done"); + await this.AssertAllStagesStoppedAsync(async() => { + await Source.Empty().RunForeach(i => + TestActor.Tell(i), Materializer).ContinueWith( + task => + { + if (task.IsCompleted && task.Exception == null) + TestActor.Tell("done"); }); - ExpectMsg("done"); + await ExpectMsgAsync("done"); }, Materializer); } [Fact] - public void A_Foreach_must_yield_the_first_error() + public async Task A_Foreach_must_yield_the_first_error() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var p = this.CreateManualPublisherProbe(); Source.FromPublisher(p).RunForeach(i => TestActor.Tell(i), Materializer).ContinueWith(task => { if (task.Exception != null) TestActor.Tell(task.Exception.InnerException); }); - var proc = p.ExpectSubscription(); + var proc = await p.ExpectSubscriptionAsync(); var ex = new TestException("ex"); proc.SendError(ex); - ExpectMsg(ex); + await ExpectMsgAsync(ex); }, Materializer); } [Fact] - public void A_Foreach_must_complete_future_with_failure_when_function_throws() + public async Task A_Foreach_must_complete_future_with_failure_when_function_throws() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var error = new TestException("test"); var future = Source.Single(1).RunForeach(_ => { @@ -93,6 +92,7 @@ public void A_Foreach_must_complete_future_with_failure_when_function_throws() .Should().Throw() .And.Should() .Be(error); + return Task.CompletedTask; }, Materializer); } }