Skip to content

Commit 8fbd281

Browse files
authored
[58-74] ObservableSinkSpec (#6606)
* [58-74] `ObservableSinkSpec` * Changes to `async` TestKit
1 parent 5dab14b commit 8fbd281

File tree

1 file changed

+33
-40
lines changed

1 file changed

+33
-40
lines changed

src/core/Akka.Streams.Tests/Dsl/ObservableSinkSpec.cs

+33-40
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,13 @@
66
//-----------------------------------------------------------------------
77

88
using System;
9-
using System.Collections.Generic;
9+
using System.Threading.Tasks;
1010
using Akka.Actor;
1111
using Akka.Streams.Actors;
1212
using Akka.Streams.Dsl;
1313
using Akka.Streams.TestKit;
1414
using Akka.TestKit;
1515
using Akka.TestKit.Xunit2.Attributes;
16-
using FluentAssertions.Execution;
1716
using Xunit;
1817
using Xunit.Abstractions;
1918

@@ -37,10 +36,10 @@ public TestObserver(AkkaSpec spec)
3736
public void OnError(Exception error) => _probe.Ref.Tell(new OnError(error), ActorRefs.NoSender);
3837
public void OnCompleted() => _probe.Ref.Tell(OnComplete.Instance, ActorRefs.NoSender);
3938

40-
public T ExpectEvent(T expected) => (T)_probe.ExpectMsg<OnNext>(x => Equals(x.Element, expected)).Element;
41-
public TError ExpectError<TError>(TError error) where TError : Exception => (TError)_probe.ExpectMsg<OnError>(x => Equals(x.Cause, error)).Cause;
42-
public void ExpectCompleted() => _probe.ExpectMsg<OnComplete>();
43-
public void ExpectNoMsg() => _probe.ExpectNoMsg();
39+
public async Task<T> ExpectEventAsync(T expected) => (T)(await _probe.ExpectMsgAsync<OnNext>(x => Equals(x.Element, expected))).Element;
40+
public async Task<TError> ExpectErrorAsync<TError>(TError error) where TError : Exception => (TError)(await _probe.ExpectMsgAsync<OnError>(x => Equals(x.Cause, error))).Cause;
41+
public async Task ExpectCompletedAsync() => await _probe.ExpectMsgAsync<OnComplete>();
42+
public async Task ExpectNoMsgAsync() => await _probe.ExpectNoMsgAsync();
4443
}
4544

4645
#endregion
@@ -55,10 +54,9 @@ public ObservableSinkSpec(ITestOutputHelper helper) : base(SpecConfig, helper)
5554
}
5655

5756
[LocalFact(SkipLocal = "Racy on Azure DevOps")]
58-
public void An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only_once()
57+
public async Task An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only_once()
5958
{
60-
this.AssertAllStagesStopped(() =>
61-
{
59+
await this.AssertAllStagesStoppedAsync(async() => {
6260
var probe = new TestObserver<int>(this);
6361
var observable = Source.From(new[] { 1, 2, 3 })
6462
.RunWith(Sink.AsObservable<int>(), Materializer);
@@ -68,20 +66,18 @@ public void An_ObservableSink_must_allow_the_same_observer_to_be_subscribed_only
6866

6967
d1.ShouldBe(d2);
7068

71-
probe.ExpectEvent(1);
72-
probe.ExpectEvent(2);
73-
probe.ExpectEvent(3);
74-
probe.ExpectCompleted();
75-
probe.ExpectNoMsg();
76-
69+
await probe.ExpectEventAsync(1);
70+
await probe.ExpectEventAsync(2);
71+
await probe.ExpectEventAsync(3);
72+
await probe.ExpectCompletedAsync();
73+
await probe.ExpectNoMsgAsync();
7774
}, Materializer);
7875
}
7976

8077
[LocalFact(SkipLocal = "Racy on Azure DevOps")]
81-
public void An_ObservableSink_must_propagate_events_to_all_observers()
78+
public async Task An_ObservableSink_must_propagate_events_to_all_observers()
8279
{
83-
this.AssertAllStagesStopped(() =>
84-
{
80+
await this.AssertAllStagesStoppedAsync(async() => {
8581
var probe1 = new TestObserver<int>(this);
8682
var probe2 = new TestObserver<int>(this);
8783
var observable = Source.From(new[] { 1, 2 })
@@ -90,24 +86,22 @@ public void An_ObservableSink_must_propagate_events_to_all_observers()
9086
var d1 = observable.Subscribe(probe1);
9187
var d2 = observable.Subscribe(probe2);
9288

93-
probe1.ExpectEvent(1);
94-
probe1.ExpectEvent(2);
95-
probe1.ExpectCompleted();
96-
probe1.ExpectNoMsg();
97-
98-
probe2.ExpectEvent(1);
99-
probe2.ExpectEvent(2);
100-
probe2.ExpectCompleted();
101-
probe2.ExpectNoMsg();
89+
await probe1.ExpectEventAsync(1);
90+
await probe1.ExpectEventAsync(2);
91+
await probe1.ExpectCompletedAsync();
92+
await probe1.ExpectNoMsgAsync();
10293

94+
await probe2.ExpectEventAsync(1);
95+
await probe2.ExpectEventAsync(2);
96+
await probe2.ExpectCompletedAsync();
97+
await probe2.ExpectNoMsgAsync();
10398
}, Materializer);
10499
}
105100

106101
[LocalFact(SkipLocal = "Racy on Azure DevOps")]
107-
public void An_ObservableSink_must_propagate_error_to_all_observers()
102+
public async Task An_ObservableSink_must_propagate_error_to_all_observers()
108103
{
109-
this.AssertAllStagesStopped(() =>
110-
{
104+
await this.AssertAllStagesStoppedAsync(async() => {
111105
var e = new Exception("boom");
112106
var probe1 = new TestObserver<int>(this);
113107
var probe2 = new TestObserver<int>(this);
@@ -117,17 +111,16 @@ public void An_ObservableSink_must_propagate_error_to_all_observers()
117111
var d1 = observable.Subscribe(probe1);
118112
var d2 = observable.Subscribe(probe2);
119113

120-
probe1.ExpectError(e);
121-
probe1.ExpectNoMsg();
122-
123-
probe2.ExpectError(e);
124-
probe2.ExpectNoMsg();
114+
await probe1.ExpectErrorAsync(e);
115+
await probe1.ExpectNoMsgAsync();
125116

117+
await probe2.ExpectErrorAsync(e);
118+
await probe2.ExpectNoMsgAsync();
126119
}, Materializer);
127120
}
128121

129122
[Fact]
130-
public void An_ObservableSink_subscriber_must_be_disposable()
123+
public async Task An_ObservableSink_subscriber_must_be_disposable()
131124
{
132125
var probe = new TestObserver<int>(this);
133126
var tuple = Source.Queue<int>(1, OverflowStrategy.DropHead)
@@ -142,21 +135,21 @@ public void An_ObservableSink_subscriber_must_be_disposable()
142135
t.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue();
143136
t.Result.ShouldBe(QueueOfferResult.Enqueued.Instance);
144137

145-
probe.ExpectEvent(1);
138+
await probe.ExpectEventAsync(1);
146139

147140
t = queue.OfferAsync(2);
148141
t.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue();
149142
t.Result.ShouldBe(QueueOfferResult.Enqueued.Instance);
150143

151-
probe.ExpectEvent(2);
144+
await probe.ExpectEventAsync(2);
152145

153146
d1.Dispose();
154147

155148
t = queue.OfferAsync(3);
156149
t.Wait(TimeSpan.FromSeconds(1)).ShouldBeTrue();
157150

158-
probe.ExpectCompleted();
159-
probe.ExpectNoMsg();
151+
await probe.ExpectCompletedAsync();
152+
await probe.ExpectNoMsgAsync();
160153
}
161154
}
162155
}

0 commit comments

Comments
 (0)