diff --git a/src/core/Akka.Streams.Tests/Dsl/LazySinkSpec.cs b/src/core/Akka.Streams.Tests/Dsl/LazySinkSpec.cs index 162ab389eb0..4a91c96c6b2 100644 --- a/src/core/Akka.Streams.Tests/Dsl/LazySinkSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/LazySinkSpec.cs @@ -44,53 +44,57 @@ private static Func Fallback() private static readonly Exception Ex = new TestException(""); [Fact] - public void A_LazySink_must_work_in_the_happy_case() + public async Task A_LazySink_must_work_in_the_happy_case() { - this.AssertAllStagesStopped(async() => + await this.AssertAllStagesStoppedAsync(async() => { var lazySink = Sink.LazyInitAsync(() => Task.FromResult(this.SinkProbe())); var taskProbe = Source.From(Enumerable.Range(0, 11)).RunWith(lazySink, Materializer); var probe = await taskProbe.ShouldCompleteWithin(RemainingOrDefault); probe.Value.Request(100); - Enumerable.Range(0, 11).ForEach(i => probe.Value.ExpectNext(i)); + foreach (var i in Enumerable.Range(0, 11)) + { + await probe.Value.ExpectNextAsync(i); + } }, Materializer); } [Fact] - public void A_LazySink_must_work_with_slow_sink_init() + public async Task A_LazySink_must_work_with_slow_sink_init() { - this.AssertAllStagesStopped(async() => + await this.AssertAllStagesStoppedAsync(async() => { var p = new TaskCompletionSource>>(); var sourceProbe = this.CreateManualPublisherProbe(); var taskProbe = Source.FromPublisher(sourceProbe) .RunWith(Sink.LazyInitAsync(() => p.Task), Materializer); - var sourceSub = sourceProbe.ExpectSubscription(); - sourceSub.ExpectRequest(1); + var sourceSub = await sourceProbe.ExpectSubscriptionAsync(); + await sourceSub.ExpectRequestAsync(1); sourceSub.SendNext(0); - sourceSub.ExpectRequest(1); - sourceProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + await sourceSub.ExpectRequestAsync(1); + await sourceProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); taskProbe.Wait(TimeSpan.FromMilliseconds(200)).ShouldBeFalse(); p.SetResult(this.SinkProbe()); var complete = await taskProbe.ShouldCompleteWithin(RemainingOrDefault); var probe = complete.Value; probe.Request(100); - probe.ExpectNext(0); - Enumerable.Range(1,10).ForEach(i => + await probe.ExpectNextAsync(0); + + foreach (var i in Enumerable.Range(1, 10)) { sourceSub.SendNext(i); - probe.ExpectNext(i); - }); + await probe.ExpectNextAsync(i); + } sourceSub.SendComplete(); }, Materializer); } [Fact] - public void A_LazySink_must_complete_when_there_was_no_elements_in_stream() + public async Task A_LazySink_must_complete_when_there_was_no_elements_in_stream() { - this.AssertAllStagesStopped(async() => + await this.AssertAllStagesStoppedAsync(async() => { var lazySink = Sink.LazyInitAsync(() => Task.FromResult(Sink.Aggregate(0, (int i, int i2) => i + i2))); var taskProbe = Source.Empty().RunWith(lazySink, Materializer); @@ -100,56 +104,55 @@ public void A_LazySink_must_complete_when_there_was_no_elements_in_stream() } [Fact] - public void A_LazySink_must_complete_normally_when_upstream_is_completed() + public async Task A_LazySink_must_complete_normally_when_upstream_is_completed() { - this.AssertAllStagesStopped(async() => + await this.AssertAllStagesStoppedAsync(async() => { var lazySink = Sink.LazyInitAsync(() => Task.FromResult(this.SinkProbe())); var taskProbe = Source.Single(1).RunWith(lazySink, Materializer); var taskResult = await taskProbe.ShouldCompleteWithin(RemainingOrDefault); - taskResult.Value.Request(1).ExpectNext(1).ExpectComplete(); + await taskResult.Value.Request(1).ExpectNext(1).ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_LazySink_must_fail_gracefully_when_sink_factory_method_failed() + public async Task A_LazySink_must_fail_gracefully_when_sink_factory_method_failed() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var sourceProbe = this.CreateManualPublisherProbe(); var taskProbe = Source.FromPublisher(sourceProbe).RunWith(Sink.LazyInitAsync(() => throw Ex), Materializer); - var sourceSub = sourceProbe.ExpectSubscription(); - sourceSub.ExpectRequest(1); + var sourceSub = await sourceProbe.ExpectSubscriptionAsync(); + await sourceSub.ExpectRequestAsync(1); sourceSub.SendNext(0); - sourceSub.ExpectCancellation(); + await sourceSub.ExpectCancellationAsync(); taskProbe.Invoking(t => t.Wait()).Should().Throw(); }, Materializer); } [Fact] - public void A_LazySink_must_fail_gracefully_when_upstream_failed() + public async Task A_LazySink_must_fail_gracefully_when_upstream_failed() { - this.AssertAllStagesStopped(async() => + await this.AssertAllStagesStoppedAsync(async() => { var sourceProbe = this.CreateManualPublisherProbe(); var lazySink = Sink.LazyInitAsync(() => Task.FromResult(this.SinkProbe())); var taskProbe = Source.FromPublisher(sourceProbe).RunWith(lazySink, Materializer); - var sourceSub = sourceProbe.ExpectSubscription(); - sourceSub.ExpectRequest(1); + var sourceSub = await sourceProbe.ExpectSubscriptionAsync(); + await sourceSub.ExpectRequestAsync(1); sourceSub.SendNext(0); var complete = await taskProbe.ShouldCompleteWithin(RemainingOrDefault); var probe = complete.Value; - probe.Request(1).ExpectNext(0); + await probe.Request(1).ExpectNextAsync(0); sourceSub.SendError(Ex); probe.ExpectError().Should().Be(Ex); }, Materializer); } [Fact] - public void A_LazySink_must_fail_gracefully_when_factory_task_failed() + public async Task A_LazySink_must_fail_gracefully_when_factory_task_failed() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async() => { var sourceProbe = this.CreateManualPublisherProbe(); var lazySink = Sink.LazyInitAsync(() => Task.FromException>>(Ex)); @@ -159,38 +162,38 @@ public void A_LazySink_must_fail_gracefully_when_factory_task_failed() .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.StoppingDecider)) .Run(Materializer); - var sourceSub = sourceProbe.ExpectSubscription(); - sourceSub.ExpectRequest(1); + var sourceSub = await sourceProbe.ExpectSubscriptionAsync(); + await sourceSub.ExpectRequestAsync(1); sourceSub.SendNext(0); taskProbe.Invoking(t => t.Wait(TimeSpan.FromMilliseconds(300))).Should().Throw(); + }, Materializer); } [Fact] - public void A_LazySink_must_cancel_upstream_when_internal_sink_is_cancelled() + public async Task A_LazySink_must_cancel_upstream_when_internal_sink_is_cancelled() { - this.AssertAllStagesStopped(async() => + await this.AssertAllStagesStoppedAsync(async() => { var sourceProbe = this.CreateManualPublisherProbe(); var lazySink = Sink.LazyInitAsync(() => Task.FromResult(this.SinkProbe())); var taskProbe = Source.FromPublisher(sourceProbe).RunWith(lazySink, Materializer); - var sourceSub = sourceProbe.ExpectSubscription(); - sourceSub.ExpectRequest(1); + var sourceSub = await sourceProbe.ExpectSubscriptionAsync(); + await sourceSub.ExpectRequestAsync(1); sourceSub.SendNext(0); - sourceSub.ExpectRequest(1); + await sourceSub.ExpectRequestAsync(1); var complete = await taskProbe.ShouldCompleteWithin(RemainingOrDefault); var probe = complete.Value; - probe.Request(1).ExpectNext(0); + await probe.Request(1).ExpectNextAsync(0); probe.Cancel(); - sourceSub.ExpectCancellation(); + await sourceSub.ExpectCancellationAsync(); }, Materializer); } [Fact] - public void A_LazySink_must_fail_correctly_when_materialization_of_inner_sink_fails() + public async Task A_LazySink_must_fail_correctly_when_materialization_of_inner_sink_fails() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var matFail = new TestException("fail!"); var task = Source.Single("whatever") @@ -205,7 +208,7 @@ public void A_LazySink_must_fail_correctly_when_materialization_of_inner_sink_fa task.IsFaulted.ShouldBe(true); task.Exception.ShouldNotBe(null); task.Exception.Flatten().InnerException.Should().BeEquivalentTo(matFail); - + return Task.CompletedTask; }, Materializer); }