From 4fb230d76620f687fb3659d896524fdd3fbfe553 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Sun, 26 Mar 2023 09:19:26 +0100 Subject: [PATCH 1/2] [58-74] `ObservableSinkSpec` --- .../Dsl/ObservableSinkSpec.cs | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/ObservableSinkSpec.cs b/src/core/Akka.Streams.Tests/Dsl/ObservableSinkSpec.cs index 0b4005cf224..9a997e63583 100644 --- a/src/core/Akka.Streams.Tests/Dsl/ObservableSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/ObservableSinkSpec.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; using Akka.Actor; using Akka.Streams.Actors; using Akka.Streams.Dsl; @@ -55,10 +56,9 @@ public ObservableSinkSpec(ITestOutputHelper helper) : base(SpecConfig, helper) } [LocalFact(SkipLocal = "Racy on Azure DevOps")] - public void An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only_once() + public async Task An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only_once() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var probe = new TestObserver(this); var observable = Source.From(new[] { 1, 2, 3 }) .RunWith(Sink.AsObservable(), Materializer); @@ -73,15 +73,14 @@ public void An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only probe.ExpectEvent(3); probe.ExpectCompleted(); probe.ExpectNoMsg(); - + return Task.CompletedTask; }, Materializer); } [LocalFact(SkipLocal = "Racy on Azure DevOps")] - public void An_ObservableSink_must_propagate_events_to_all_observers() + public async Task An_ObservableSink_must_propagate_events_to_all_observers() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var probe1 = new TestObserver(this); var probe2 = new TestObserver(this); var observable = Source.From(new[] { 1, 2 }) @@ -99,15 +98,14 @@ public void An_ObservableSink_must_propagate_events_to_all_observers() probe2.ExpectEvent(2); probe2.ExpectCompleted(); probe2.ExpectNoMsg(); - + return Task.CompletedTask; }, Materializer); } [LocalFact(SkipLocal = "Racy on Azure DevOps")] - public void An_ObservableSink_must_propagate_error_to_all_observers() + public async Task An_ObservableSink_must_propagate_error_to_all_observers() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var e = new Exception("boom"); var probe1 = new TestObserver(this); var probe2 = new TestObserver(this); @@ -122,7 +120,7 @@ public void An_ObservableSink_must_propagate_error_to_all_observers() probe2.ExpectError(e); probe2.ExpectNoMsg(); - + return Task.CompletedTask; }, Materializer); } From 8914285c8f764c72bd8f1ce1bf4ba018e6a50cd0 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Fri, 31 Mar 2023 09:48:41 +0100 Subject: [PATCH 2/2] Changes to `async` TestKit --- .../Dsl/ObservableSinkSpec.cs | 63 +++++++++---------- 1 file changed, 29 insertions(+), 34 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/ObservableSinkSpec.cs b/src/core/Akka.Streams.Tests/Dsl/ObservableSinkSpec.cs index 9a997e63583..5ae008cb7fc 100644 --- a/src/core/Akka.Streams.Tests/Dsl/ObservableSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/ObservableSinkSpec.cs @@ -6,7 +6,6 @@ //----------------------------------------------------------------------- using System; -using System.Collections.Generic; using System.Threading.Tasks; using Akka.Actor; using Akka.Streams.Actors; @@ -14,7 +13,6 @@ using Akka.Streams.TestKit; using Akka.TestKit; using Akka.TestKit.Xunit2.Attributes; -using FluentAssertions.Execution; using Xunit; using Xunit.Abstractions; @@ -38,10 +36,10 @@ public TestObserver(AkkaSpec spec) public void OnError(Exception error) => _probe.Ref.Tell(new OnError(error), ActorRefs.NoSender); public void OnCompleted() => _probe.Ref.Tell(OnComplete.Instance, ActorRefs.NoSender); - public T ExpectEvent(T expected) => (T)_probe.ExpectMsg(x => Equals(x.Element, expected)).Element; - public TError ExpectError(TError error) where TError : Exception => (TError)_probe.ExpectMsg(x => Equals(x.Cause, error)).Cause; - public void ExpectCompleted() => _probe.ExpectMsg(); - public void ExpectNoMsg() => _probe.ExpectNoMsg(); + public async Task ExpectEventAsync(T expected) => (T)(await _probe.ExpectMsgAsync(x => Equals(x.Element, expected))).Element; + public async Task ExpectErrorAsync(TError error) where TError : Exception => (TError)(await _probe.ExpectMsgAsync(x => Equals(x.Cause, error))).Cause; + public async Task ExpectCompletedAsync() => await _probe.ExpectMsgAsync(); + public async Task ExpectNoMsgAsync() => await _probe.ExpectNoMsgAsync(); } #endregion @@ -58,7 +56,7 @@ public ObservableSinkSpec(ITestOutputHelper helper) : base(SpecConfig, helper) [LocalFact(SkipLocal = "Racy on Azure DevOps")] public async Task An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only_once() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var probe = new TestObserver(this); var observable = Source.From(new[] { 1, 2, 3 }) .RunWith(Sink.AsObservable(), Materializer); @@ -68,19 +66,18 @@ await this.AssertAllStagesStoppedAsync(() => { d1.ShouldBe(d2); - probe.ExpectEvent(1); - probe.ExpectEvent(2); - probe.ExpectEvent(3); - probe.ExpectCompleted(); - probe.ExpectNoMsg(); - return Task.CompletedTask; + await probe.ExpectEventAsync(1); + await probe.ExpectEventAsync(2); + await probe.ExpectEventAsync(3); + await probe.ExpectCompletedAsync(); + await probe.ExpectNoMsgAsync(); }, Materializer); } [LocalFact(SkipLocal = "Racy on Azure DevOps")] public async Task An_ObservableSink_must_propagate_events_to_all_observers() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var probe1 = new TestObserver(this); var probe2 = new TestObserver(this); var observable = Source.From(new[] { 1, 2 }) @@ -89,23 +86,22 @@ await this.AssertAllStagesStoppedAsync(() => { var d1 = observable.Subscribe(probe1); var d2 = observable.Subscribe(probe2); - probe1.ExpectEvent(1); - probe1.ExpectEvent(2); - probe1.ExpectCompleted(); - probe1.ExpectNoMsg(); + await probe1.ExpectEventAsync(1); + await probe1.ExpectEventAsync(2); + await probe1.ExpectCompletedAsync(); + await probe1.ExpectNoMsgAsync(); - probe2.ExpectEvent(1); - probe2.ExpectEvent(2); - probe2.ExpectCompleted(); - probe2.ExpectNoMsg(); - return Task.CompletedTask; + await probe2.ExpectEventAsync(1); + await probe2.ExpectEventAsync(2); + await probe2.ExpectCompletedAsync(); + await probe2.ExpectNoMsgAsync(); }, Materializer); } [LocalFact(SkipLocal = "Racy on Azure DevOps")] public async Task An_ObservableSink_must_propagate_error_to_all_observers() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var e = new Exception("boom"); var probe1 = new TestObserver(this); var probe2 = new TestObserver(this); @@ -115,17 +111,16 @@ await this.AssertAllStagesStoppedAsync(() => { var d1 = observable.Subscribe(probe1); var d2 = observable.Subscribe(probe2); - probe1.ExpectError(e); - probe1.ExpectNoMsg(); + await probe1.ExpectErrorAsync(e); + await probe1.ExpectNoMsgAsync(); - probe2.ExpectError(e); - probe2.ExpectNoMsg(); - return Task.CompletedTask; + await probe2.ExpectErrorAsync(e); + await probe2.ExpectNoMsgAsync(); }, Materializer); } [Fact] - public void An_ObservableSink_subscriber_must_be_disposable() + public async Task An_ObservableSink_subscriber_must_be_disposable() { var probe = new TestObserver(this); var tuple = Source.Queue(1, OverflowStrategy.DropHead) @@ -140,21 +135,21 @@ public void An_ObservableSink_subscriber_must_be_disposable() t.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue(); t.Result.ShouldBe(QueueOfferResult.Enqueued.Instance); - probe.ExpectEvent(1); + await probe.ExpectEventAsync(1); t = queue.OfferAsync(2); t.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue(); t.Result.ShouldBe(QueueOfferResult.Enqueued.Instance); - probe.ExpectEvent(2); + await probe.ExpectEventAsync(2); d1.Dispose(); t = queue.OfferAsync(3); t.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue(); - probe.ExpectCompleted(); - probe.ExpectNoMsg(); + await probe.ExpectCompletedAsync(); + await probe.ExpectNoMsgAsync(); } } }