Skip to content

[58-74] ObservableSinkSpec #6606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 31, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 33 additions & 40 deletions src/core/Akka.Streams.Tests/Dsl/ObservableSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Streams.Actors;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
using Akka.TestKit.Xunit2.Attributes;
using FluentAssertions.Execution;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -37,10 +36,10 @@ public TestObserver(AkkaSpec spec)
public void OnError(Exception error) => _probe.Ref.Tell(new OnError(error), ActorRefs.NoSender);
public void OnCompleted() => _probe.Ref.Tell(OnComplete.Instance, ActorRefs.NoSender);

public T ExpectEvent(T expected) => (T)_probe.ExpectMsg<OnNext>(x => Equals(x.Element, expected)).Element;
public TError ExpectError<TError>(TError error) where TError : Exception => (TError)_probe.ExpectMsg<OnError>(x => Equals(x.Cause, error)).Cause;
public void ExpectCompleted() => _probe.ExpectMsg<OnComplete>();
public void ExpectNoMsg() => _probe.ExpectNoMsg();
public async Task<T> ExpectEventAsync(T expected) => (T)(await _probe.ExpectMsgAsync<OnNext>(x => Equals(x.Element, expected))).Element;
public async Task<TError> ExpectErrorAsync<TError>(TError error) where TError : Exception => (TError)(await _probe.ExpectMsgAsync<OnError>(x => Equals(x.Cause, error))).Cause;
public async Task ExpectCompletedAsync() => await _probe.ExpectMsgAsync<OnComplete>();
public async Task ExpectNoMsgAsync() => await _probe.ExpectNoMsgAsync();
}

#endregion
Expand All @@ -55,10 +54,9 @@ public ObservableSinkSpec(ITestOutputHelper helper) : base(SpecConfig, helper)
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only_once()
public async Task An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only_once()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var probe = new TestObserver<int>(this);
var observable = Source.From(new[] { 1, 2, 3 })
.RunWith(Sink.AsObservable<int>(), Materializer);
Expand All @@ -68,20 +66,18 @@ public void An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only

d1.ShouldBe(d2);

probe.ExpectEvent(1);
probe.ExpectEvent(2);
probe.ExpectEvent(3);
probe.ExpectCompleted();
probe.ExpectNoMsg();

await probe.ExpectEventAsync(1);
await probe.ExpectEventAsync(2);
await probe.ExpectEventAsync(3);
await probe.ExpectCompletedAsync();
await probe.ExpectNoMsgAsync();
}, Materializer);
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void An_ObservableSink_must_propagate_events_to_all_observers()
public async Task An_ObservableSink_must_propagate_events_to_all_observers()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var probe1 = new TestObserver<int>(this);
var probe2 = new TestObserver<int>(this);
var observable = Source.From(new[] { 1, 2 })
Expand All @@ -90,24 +86,22 @@ public void An_ObservableSink_must_propagate_events_to_all_observers()
var d1 = observable.Subscribe(probe1);
var d2 = observable.Subscribe(probe2);

probe1.ExpectEvent(1);
probe1.ExpectEvent(2);
probe1.ExpectCompleted();
probe1.ExpectNoMsg();

probe2.ExpectEvent(1);
probe2.ExpectEvent(2);
probe2.ExpectCompleted();
probe2.ExpectNoMsg();
await probe1.ExpectEventAsync(1);
await probe1.ExpectEventAsync(2);
await probe1.ExpectCompletedAsync();
await probe1.ExpectNoMsgAsync();

await probe2.ExpectEventAsync(1);
await probe2.ExpectEventAsync(2);
await probe2.ExpectCompletedAsync();
await probe2.ExpectNoMsgAsync();
}, Materializer);
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void An_ObservableSink_must_propagate_error_to_all_observers()
public async Task An_ObservableSink_must_propagate_error_to_all_observers()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var e = new Exception("boom");
var probe1 = new TestObserver<int>(this);
var probe2 = new TestObserver<int>(this);
Expand All @@ -117,17 +111,16 @@ public void An_ObservableSink_must_propagate_error_to_all_observers()
var d1 = observable.Subscribe(probe1);
var d2 = observable.Subscribe(probe2);

probe1.ExpectError(e);
probe1.ExpectNoMsg();

probe2.ExpectError(e);
probe2.ExpectNoMsg();
await probe1.ExpectErrorAsync(e);
await probe1.ExpectNoMsgAsync();

await probe2.ExpectErrorAsync(e);
await probe2.ExpectNoMsgAsync();
}, Materializer);
}

[Fact]
public void An_ObservableSink_subscriber_must_be_disposable()
public async Task An_ObservableSink_subscriber_must_be_disposable()
{
var probe = new TestObserver<int>(this);
var tuple = Source.Queue<int>(1, OverflowStrategy.DropHead)
Expand All @@ -142,21 +135,21 @@ public void An_ObservableSink_subscriber_must_be_disposable()
t.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue();
t.Result.ShouldBe(QueueOfferResult.Enqueued.Instance);

probe.ExpectEvent(1);
await probe.ExpectEventAsync(1);

t = queue.OfferAsync(2);
t.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue();
t.Result.ShouldBe(QueueOfferResult.Enqueued.Instance);

probe.ExpectEvent(2);
await probe.ExpectEventAsync(2);

d1.Dispose();

t = queue.OfferAsync(3);
t.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue();

probe.ExpectCompleted();
probe.ExpectNoMsg();
await probe.ExpectCompletedAsync();
await probe.ExpectNoMsgAsync();
}
}
}