diff --git a/src/core/Akka.Streams.Tests/Dsl/ObservableSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/ObservableSourceSpec.cs index 88214414cef..fc889c2ec9c 100644 --- a/src/core/Akka.Streams.Tests/Dsl/ObservableSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/ObservableSourceSpec.cs @@ -8,6 +8,7 @@ using System; using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.Streams.TestKit; using Akka.TestKit; @@ -70,35 +71,33 @@ public ObservableSourceSpec(ITestOutputHelper helper) : base(SpecConfig, helper) } [Fact] - public void An_ObservableSource_must_subscribe_to_an_observable() + public async Task An_ObservableSource_must_subscribe_to_an_observable() { - this.AssertAllStagesStopped(() => + await this.AssertAllStagesStoppedAsync(async() => { - var o = new TestObservable(); var s = this.CreateManualSubscriberProbe(); Source.FromObservable(o) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); o.Complete(); }, _materializer); } [Fact] - public void An_ObservableSource_must_receive_events_from_an_observable() + public async Task An_ObservableSource_must_receive_events_from_an_observable() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var o = new TestObservable(); var s = this.CreateManualSubscriberProbe(); Source.FromObservable(o) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); sub.Request(2); @@ -106,35 +105,34 @@ public void An_ObservableSource_must_receive_events_from_an_observable() o.Event(2); o.Event(3); - s.ExpectNext(1); - s.ExpectNext(2); + await s.ExpectNextAsync(1); + await s.ExpectNextAsync(2); s.ExpectNoMsg(); sub.Request(2); - s.ExpectNext(3); + await s.ExpectNextAsync(3); s.ExpectNoMsg(); o.Event(4); - s.ExpectNext(4); + await s.ExpectNextAsync(4); o.Complete(); }, _materializer); } [Fact(Skip = "Buggy")] - public void An_ObservableSource_must_receive_errors_from_an_observable() + public async Task An_ObservableSource_must_receive_errors_from_an_observable() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var o = new TestObservable(); var s = this.CreateManualSubscriberProbe(); Source.FromObservable(o) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); sub.Request(2); var e = new Exception("hello"); @@ -143,69 +141,66 @@ public void An_ObservableSource_must_receive_errors_from_an_observable() o.Error(e); o.Event(2); - s.ExpectNext(1); + await s.ExpectNextAsync(1); s.ExpectError().ShouldBe(e); s.ExpectNoMsg(); }, _materializer); } [Fact] - public void An_ObservableSource_must_receive_completion_from_an_observable() + public async Task An_ObservableSource_must_receive_completion_from_an_observable() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var o = new TestObservable(); var s = this.CreateManualSubscriberProbe(); Source.FromObservable(o) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); o.Event(1); o.Complete(); sub.Request(5); - - s.ExpectComplete(); + + await s.ExpectCompleteAsync(); }, _materializer); } [Fact] - public void An_ObservableSource_must_be_able_to_unsubscribe() + public async Task An_ObservableSource_must_be_able_to_unsubscribe() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var o = new TestObservable(); var s = this.CreateManualSubscriberProbe(); Source.FromObservable(o) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); o.Event(1); sub.Cancel(); - Thread.Sleep(100); + await Task.Delay(100); o.Subscribed.ShouldBeFalse(); }, _materializer); } [Fact] - public void An_ObservableSource_must_ignore_new_element_on_DropNew_overflow() + public async Task An_ObservableSource_must_ignore_new_element_on_DropNew_overflow() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var o = new TestObservable(); var s = this.CreateManualSubscriberProbe(); Source.FromObservable(o, maxBufferCapacity: 2, overflowStrategy: OverflowStrategy.DropNew) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); o.Event(1); o.Event(2); @@ -213,8 +208,8 @@ public void An_ObservableSource_must_ignore_new_element_on_DropNew_overflow() sub.Request(3); - s.ExpectNext(1); - s.ExpectNext(2); + await s.ExpectNextAsync(1); + await s.ExpectNextAsync(2); s.ExpectNoMsg(); sub.Cancel(); @@ -222,17 +217,16 @@ public void An_ObservableSource_must_ignore_new_element_on_DropNew_overflow() } [Fact] - public void An_ObservableSource_must_drop_oldest_element_on_DropHead_overflow() + public async Task An_ObservableSource_must_drop_oldest_element_on_DropHead_overflow() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var o = new TestObservable(); var s = this.CreateManualSubscriberProbe(); Source.FromObservable(o, maxBufferCapacity: 2, overflowStrategy: OverflowStrategy.DropHead) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); o.Event(1); // this should be dropped o.Event(2); @@ -240,8 +234,8 @@ public void An_ObservableSource_must_drop_oldest_element_on_DropHead_overflow() sub.Request(3); - s.ExpectNext(2); - s.ExpectNext(3); + await s.ExpectNextAsync(2); + await s.ExpectNextAsync(3); s.ExpectNoMsg(); sub.Cancel(); @@ -249,17 +243,16 @@ public void An_ObservableSource_must_drop_oldest_element_on_DropHead_overflow() } [Fact] - public void An_ObservableSource_must_drop_newest_element_on_DropTail_overflow() + public async Task An_ObservableSource_must_drop_newest_element_on_DropTail_overflow() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var o = new TestObservable(); var s = this.CreateManualSubscriberProbe(); Source.FromObservable(o, maxBufferCapacity: 2, overflowStrategy: OverflowStrategy.DropTail) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); o.Event(1); o.Event(2); // this should be dropped @@ -267,8 +260,8 @@ public void An_ObservableSource_must_drop_newest_element_on_DropTail_overflow() sub.Request(3); - s.ExpectNext(1); - s.ExpectNext(3); + await s.ExpectNextAsync(1); + await s.ExpectNextAsync(3); s.ExpectNoMsg(); sub.Cancel(); @@ -276,24 +269,23 @@ public void An_ObservableSource_must_drop_newest_element_on_DropTail_overflow() } [Fact] - public void An_ObservableSource_must_fail_on_Fail_overflow() + public async Task An_ObservableSource_must_fail_on_Fail_overflow() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var o = new TestObservable(); var s = this.CreateManualSubscriberProbe(); Source.FromObservable(o, maxBufferCapacity: 2, overflowStrategy: OverflowStrategy.Fail) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); o.Event(1); o.Event(2); o.Event(3); // this should cause an error sub.Request(3); - + s.ExpectError(); s.ExpectNoMsg();