diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowIteratorSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowIteratorSpec.cs index 3b7a0b38b7f..6cefc43be77 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowIteratorSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowIteratorSpec.cs @@ -9,6 +9,7 @@ using System.Collections; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; using Akka.Pattern; using Akka.Streams.Dsl; using Akka.Streams.TestKit; @@ -40,7 +41,7 @@ protected override Source CreateSource(int elements) => Source.From(Enumerable.Range(1, elements)); [Fact] - public void A_Flow_based_on_an_iterable_must_produce_OnError_when_iterator_throws() + public async Task A_Flow_based_on_an_iterable_must_produce_OnError_when_iterator_throws() { var iterable = Enumerable.Range(1, 3).Select(x => { @@ -51,37 +52,37 @@ public void A_Flow_based_on_an_iterable_must_produce_OnError_when_iterator_throw var p = Source.From(iterable).RunWith(Sink.AsPublisher(false), Materializer); var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); sub.Request(1); - c.ExpectNext(1); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await c.ExpectNextAsync(1); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); EventFilter.Exception() .And.Exception("not two").ExpectOne(() => sub.Request(2)); var error = c.ExpectError().InnerException; error.Message.Should().Be("not two"); sub.Request(2); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); } [Fact] - public void A_Flow_based_on_an_iterable_must_produce_OnError_when_Source_construction_throws() + public async Task A_Flow_based_on_an_iterable_must_produce_OnError_when_Source_construction_throws() { var p = Source.From(new ThrowEnumerable()).RunWith(Sink.AsPublisher(false), Materializer); var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); c.ExpectSubscriptionAndError().Message.Should().Be("no good iterator"); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); } [Fact] - public void A_Flow_based_on_an_iterable_must_produce_OnError_when_MoveNext_throws() + public async Task A_Flow_based_on_an_iterable_must_produce_OnError_when_MoveNext_throws() { var p = Source.From(new ThrowEnumerable(false)).RunWith(Sink.AsPublisher(false), Materializer); var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); var error = c.ExpectSubscriptionAndError().InnerException; error.Message.Should().Be("no next"); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); } private sealed class ThrowEnumerable : IEnumerable @@ -145,158 +146,151 @@ protected AbstractFlowIteratorSpec(ITestOutputHelper helper) : base(helper) protected abstract Source CreateSource(int elements); [Fact] - public void A_Flow_based_on_an_iterable_must_produce_elements() + public async Task A_Flow_based_on_an_iterable_must_produce_elements() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var p = CreateSource(3).RunWith(Sink.AsPublisher(false), Materializer); var c = this.CreateManualSubscriberProbe(); - + p.Subscribe(c); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); sub.Request(1); - c.ExpectNext(1); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await c.ExpectNextAsync(1); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); sub.Request(3); - c.ExpectNext(2) + await c.ExpectNext(2) .ExpectNext(3) - .ExpectComplete(); + .ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_Flow_based_on_an_iterable_must_complete_empty() + public async Task A_Flow_based_on_an_iterable_must_complete_empty() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var p = CreateSource(0).RunWith(Sink.AsPublisher(false), Materializer); var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); - c.ExpectSubscriptionAndComplete(); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await c.ExpectSubscriptionAndCompleteAsync(); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); }, Materializer); } [Fact] - public void A_Flow_based_on_an_iterable_must_produce_elements_with_multiple_subscribers() + public async Task A_Flow_based_on_an_iterable_must_produce_elements_with_multiple_subscribers() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var p = CreateSource(3).RunWith(Sink.AsPublisher(true), Materializer); var c1 = this.CreateManualSubscriberProbe(); var c2 = this.CreateManualSubscriberProbe(); - + p.Subscribe(c1); p.Subscribe(c2); - var sub1 = c1.ExpectSubscription(); - var sub2 = c2.ExpectSubscription(); + var sub1 = await c1.ExpectSubscriptionAsync(); + var sub2 = await c2.ExpectSubscriptionAsync(); sub1.Request(1); sub2.Request(2); - c1.ExpectNext(1); - c2.ExpectNext(1); - c2.ExpectNext(2); - c1.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); - c2.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await c1.ExpectNextAsync(1); + await c2.ExpectNextAsync(1); + await c2.ExpectNextAsync(2); + await c1.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); + await c2.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); sub1.Request(2); sub2.Request(2); - c1.ExpectNext(2); - c1.ExpectNext(3); - c2.ExpectNext(3); - c1.ExpectComplete(); - c2.ExpectComplete(); + await c1.ExpectNextAsync(2); + await c1.ExpectNextAsync(3); + await c2.ExpectNextAsync(3); + await c1.ExpectCompleteAsync(); + await c2.ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_Flow_based_on_an_iterable_must_produce_elements_to_later_subscriber() + public async Task A_Flow_based_on_an_iterable_must_produce_elements_to_later_subscriber() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var p = CreateSource(3).RunWith(Sink.AsPublisher(true), Materializer); var c1 = this.CreateManualSubscriberProbe(); var c2 = this.CreateManualSubscriberProbe(); - + p.Subscribe(c1); - var sub1 = c1.ExpectSubscription(); + var sub1 = await c1.ExpectSubscriptionAsync(); sub1.Request(1); - c1.ExpectNext(1, TimeSpan.FromSeconds(60)); - c1.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await c1.ExpectNextAsync(1, TimeSpan.FromSeconds(60)); + await c1.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); p.Subscribe(c2); - var sub2 = c2.ExpectSubscription(); + var sub2 = await c2.ExpectSubscriptionAsync(); sub2.Request(3); //element 1 is already gone - c2.ExpectNext(2) + await c2.ExpectNext(2) .ExpectNext(3) - .ExpectComplete(); + .ExpectCompleteAsync(); sub1.Request(3); - c1.ExpectNext(2) + await c1.ExpectNext(2) .ExpectNext(3) - .ExpectComplete(); + .ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_Flow_based_on_an_iterable_must_produce_elements_with_one_transformation_step() + public async Task A_Flow_based_on_an_iterable_must_produce_elements_with_one_transformation_step() { - this.AssertAllStagesStopped(() => - { - var p = CreateSource(3) - .Select(x => x*2) - .RunWith(Sink.AsPublisher(false), Materializer); + await this.AssertAllStagesStoppedAsync(async () => { + var p = CreateSource(3) + .Select(x => x * 2) + .RunWith(Sink.AsPublisher(false), Materializer); var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); sub.Request(10); - c.ExpectNext(2) + await c.ExpectNext(2) .ExpectNext(4) .ExpectNext(6) - .ExpectComplete(); + .ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_Flow_based_on_an_iterable_must_produce_elements_with_two_transformation_steps() + public async Task A_Flow_based_on_an_iterable_must_produce_elements_with_two_transformation_steps() { - this.AssertAllStagesStopped(() => - { - var p = CreateSource(4) - .Where(x => x%2 == 0) - .Select(x => x*2) - .RunWith(Sink.AsPublisher(false), Materializer); + await this.AssertAllStagesStoppedAsync(async () => { + var p = CreateSource(4) + .Where(x => x % 2 == 0) + .Select(x => x * 2) + .RunWith(Sink.AsPublisher(false), Materializer); var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); sub.Request(10); - c.ExpectNext(4) + await c.ExpectNext(4) .ExpectNext(8) - .ExpectComplete(); + .ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_Flow_based_on_an_iterable_must_not_produce_after_cancel() + public async Task A_Flow_based_on_an_iterable_must_not_produce_after_cancel() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var p = CreateSource(3).RunWith(Sink.AsPublisher(false), Materializer); var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); sub.Request(1); - c.ExpectNext(1); + await c.ExpectNextAsync(1); sub.Cancel(); sub.Request(2); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); }, Materializer); } }