diff --git a/src/core/Akka.Streams.Tests/Dsl/SubstreamSubscriptionTimeoutSpec.cs b/src/core/Akka.Streams.Tests/Dsl/SubstreamSubscriptionTimeoutSpec.cs index 07d96aa0675..209dad2791c 100644 --- a/src/core/Akka.Streams.Tests/Dsl/SubstreamSubscriptionTimeoutSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/SubstreamSubscriptionTimeoutSpec.cs @@ -7,6 +7,7 @@ using System; using System.Threading; +using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.Streams.Implementation; using Akka.Streams.TestKit; @@ -40,24 +41,23 @@ public SubstreamSubscriptionTimeoutSpec(ITestOutputHelper helper) : base(Config, } [LocalFact(SkipLocal = "Racy on Azure DevOps")] - public void GroupBy_and_SplitWhen_must_timeout_and_cancel_substream_publisher_when_no_one_subscribes_to_them_after_some_time() + public async Task GroupBy_and_SplitWhen_must_timeout_and_cancel_substream_publisher_when_no_one_subscribes_to_them_after_some_time() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async () => { var subscriber = this.CreateManualSubscriberProbe<(int, Source)>(); var publisherProbe = this.CreatePublisherProbe(); Source.FromPublisher(publisherProbe) - .GroupBy(3, x => x%3) - .Lift(x => x%3) + .GroupBy(3, x => x % 3) + .Lift(x => x % 3) .RunWith(Sink.FromSubscriber(subscriber), Materializer); - var downstreamSubscription = subscriber.ExpectSubscription(); + var downstreamSubscription = await subscriber.ExpectSubscriptionAsync(); downstreamSubscription.Request(100); - publisherProbe.SendNext(1); - publisherProbe.SendNext(2); - publisherProbe.SendNext(3); - + await publisherProbe.SendNextAsync(1); + await publisherProbe.SendNextAsync(2); + await publisherProbe.SendNextAsync(3); + /* * Why this spec is skipped: in the event that subscriber.ExpectSubscription() or (subscriber.ExpectNext() * + s1SubscriberProbe.ExpectSubscription()) exceeds 300ms, the next call to subscriber.ExpectNext will @@ -65,40 +65,44 @@ public void GroupBy_and_SplitWhen_must_timeout_and_cancel_substream_publisher_wh * it does validate that the underlying cancellation does work! */ - var s1 = subscriber.ExpectNext().Item2; + var item = await subscriber.ExpectNextAsync(); + var s1 = item.Item2; // should not break normal usage var s1SubscriberProbe = this.CreateManualSubscriberProbe(); s1.RunWith(Sink.FromSubscriber(s1SubscriberProbe), Materializer); - var s1Subscription = s1SubscriberProbe.ExpectSubscription(); + var s1Subscription = await s1SubscriberProbe.ExpectSubscriptionAsync(); s1Subscription.Request(100); - s1SubscriberProbe.ExpectNext().Should().Be(1); + var next = await s1SubscriberProbe.ExpectNextAsync(); + next.Should().Be(1); - var s2 = subscriber.ExpectNext().Item2; + item = await subscriber.ExpectNextAsync(); + var s2 = item.Item2; // should not break normal usage var s2SubscriberProbe = this.CreateManualSubscriberProbe(); s2.RunWith(Sink.FromSubscriber(s2SubscriberProbe), Materializer); - var s2Subscription = s2SubscriberProbe.ExpectSubscription(); + var s2Subscription = await s2SubscriberProbe.ExpectSubscriptionAsync(); s2Subscription.Request(100); - s2SubscriberProbe.ExpectNext().Should().Be(2); + next = await s2SubscriberProbe.ExpectNextAsync(); + next.Should().Be(2); - var s3 = subscriber.ExpectNext().Item2; + item = await subscriber.ExpectNextAsync(); + var s3 = item.Item2; // sleep long enough for it to be cleaned up - Thread.Sleep(1500); + await Task.Delay(1500); // Must be a Sink.seq, otherwise there is a race due to the concat in the `lift` implementation Action action = () => s3.RunWith(Sink.Seq(), Materializer).Wait(RemainingOrDefault); action.Should().Throw(); - publisherProbe.SendComplete(); + await publisherProbe.SendCompleteAsync(); }, Materializer); } [Fact] - public void GroupBy_and_SplitWhen_must_timeout_and_stop_groupBy_parent_actor_if_none_of_the_substreams_are_actually_consumed() + public async Task GroupBy_and_SplitWhen_must_timeout_and_stop_groupBy_parent_actor_if_none_of_the_substreams_are_actually_consumed() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var subscriber = this.CreateManualSubscriberProbe<(int, Source)>(); var publisherProbe = this.CreatePublisherProbe(); Source.FromPublisher(publisherProbe) @@ -106,21 +110,21 @@ public void GroupBy_and_SplitWhen_must_timeout_and_stop_groupBy_parent_actor_if_ .Lift(x => x % 2).RunWith(Sink.FromSubscriber(subscriber), Materializer); - var downstreamSubscription = subscriber.ExpectSubscription(); + var downstreamSubscription = await subscriber.ExpectSubscriptionAsync(); downstreamSubscription.Request(100); - publisherProbe.SendNext(1); - publisherProbe.SendNext(2); - publisherProbe.SendNext(3); - publisherProbe.SendComplete(); + await publisherProbe.SendNextAsync(1); + await publisherProbe.SendNextAsync(2); + await publisherProbe.SendNextAsync(3); + await publisherProbe.SendCompleteAsync(); - subscriber.ExpectNext(); - subscriber.ExpectNext(); + await subscriber.ExpectNextAsync(); + await subscriber.ExpectNextAsync(); }, Materializer); } [LocalFact(SkipLocal = "Racy on Azure DevOps")] - public void GroupBy_and_SplitWhen_must_not_timeout_and_cancel_substream_publisher_when_they_have_been_subscribed_to() + public async Task GroupBy_and_SplitWhen_must_not_timeout_and_cancel_substream_publisher_when_they_have_been_subscribed_to() { var subscriber = this.CreateManualSubscriberProbe<(int, Source)>(); var publisherProbe = this.CreatePublisherProbe(); @@ -128,31 +132,39 @@ public void GroupBy_and_SplitWhen_must_not_timeout_and_cancel_substream_publishe .GroupBy(2, x => x % 2) .Lift(x => x % 2).RunWith(Sink.FromSubscriber(subscriber), Materializer); - var downstreamSubscription = subscriber.ExpectSubscription(); + var downstreamSubscription = await subscriber.ExpectSubscriptionAsync(); downstreamSubscription.Request(10); - publisherProbe.SendNext(1); - publisherProbe.SendNext(2); + await publisherProbe.SendNextAsync(1); + await publisherProbe.SendNextAsync(2); - var s1 = subscriber.ExpectNext().Item2; + var item = await subscriber.ExpectNextAsync(); + var s1 = item.Item2; var s1SubscriberProbe = this.CreateManualSubscriberProbe(); s1.RunWith(Sink.FromSubscriber(s1SubscriberProbe), Materializer); - var s1Subscription = s1SubscriberProbe.ExpectSubscription(); + var s1Subscription = await s1SubscriberProbe.ExpectSubscriptionAsync(); s1Subscription.Request(1); - s1SubscriberProbe.ExpectNext().Should().Be(1); + var s1Subscriber = await s1SubscriberProbe.ExpectNextAsync(); + s1Subscriber.Should().Be(1); - var s2 = subscriber.ExpectNext().Item2; + item = await subscriber.ExpectNextAsync(); + var s2 = item.Item2; var s2SubscriberProbe = this.CreateManualSubscriberProbe(); s2.RunWith(Sink.FromSubscriber(s2SubscriberProbe), Materializer); - var s2Subscription = s2SubscriberProbe.ExpectSubscription(); - Thread.Sleep(1500); + var s2Subscription = await s2SubscriberProbe.ExpectSubscriptionAsync(); + await Task.Delay(1500); s2Subscription.Request(100); - s2SubscriberProbe.ExpectNext().Should().Be(2); + + var s2Subscriber = await s2SubscriberProbe.ExpectNextAsync(); + s2Subscriber.Should().Be(2); + s1Subscription.Request(100); - publisherProbe.SendNext(3); - publisherProbe.SendNext(4); - s1SubscriberProbe.ExpectNext().Should().Be(3); - s2SubscriberProbe.ExpectNext().Should().Be(4); + await publisherProbe.SendNextAsync(3); + await publisherProbe.SendNextAsync(4); + var s1S = await s1SubscriberProbe.ExpectNextAsync(); + s1S.Should().Be(3); + var s2S = await s2SubscriberProbe.ExpectNextAsync(); + s2S.Should().Be(4); } } }