Skip to content

Commit b78ce49

Browse files
authored
[56-74] MaybeSourceSpec (#6604)
* [56-74] `MaybeSourceSpec` * Changes to `async`
1 parent 2e1a137 commit b78ce49

File tree

1 file changed

+29
-31
lines changed

1 file changed

+29
-31
lines changed

src/core/Akka.Streams.Tests/Dsl/MaybeSourceSpec.cs

+29-31
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,9 @@ public MaybeSourceSpec(ITestOutputHelper output) : base(output)
2929
}
3030

3131
[Fact(DisplayName = "The Maybe Source must complete materialized promise with None when stream cancels")]
32-
public void CompleteMaterializedPromiseWithNoneWhenCancelled()
32+
public async Task CompleteMaterializedPromiseWithNoneWhenCancelled()
3333
{
34-
this.AssertAllStagesStopped(() =>
35-
{
34+
await this.AssertAllStagesStoppedAsync(async() => {
3635
var neverSource = Source.Maybe<int>();
3736
var pubSink = Sink.AsPublisher<int>(false);
3837

@@ -42,11 +41,11 @@ public void CompleteMaterializedPromiseWithNoneWhenCancelled()
4241

4342
var c = this.CreateManualSubscriberProbe<int>();
4443
neverPub.Subscribe(c);
45-
var subs = c.ExpectSubscription();
44+
var subs = await c.ExpectSubscriptionAsync();
4645

4746
subs.Request(1000);
48-
c.ExpectNoMsg(100.Milliseconds());
49-
47+
await c.ExpectNoMsgAsync(100.Milliseconds());
48+
5049
subs.Cancel();
5150

5251
tcs.Task.Wait(3.Seconds()).Should().BeTrue();
@@ -55,39 +54,39 @@ public void CompleteMaterializedPromiseWithNoneWhenCancelled()
5554
}
5655

5756
[Fact(DisplayName = "The Maybe Source must complete materialized promise with 0 when stream cancels with a failure cause")]
58-
public void CompleteMaterializedTaskWithNoneWhenStreamCancelsWithFailure()
57+
public async Task CompleteMaterializedTaskWithNoneWhenStreamCancelsWithFailure()
5958
{
60-
this.AssertAllStagesStopped(() =>
61-
{
62-
var (tcs, killSwitch) = Source.Maybe<int>()
63-
.ViaMaterialized(KillSwitches.Single<int>(), Keep.Both)
64-
.To(Sink.Ignore<int>())
65-
.Run(_materializer);
59+
await this.AssertAllStagesStoppedAsync(() => {
60+
var (tcs, killSwitch) = Source.Maybe<int>()
61+
.ViaMaterialized(KillSwitches.Single<int>(), Keep.Both)
62+
.To(Sink.Ignore<int>())
63+
.Run(_materializer);
6664

6765
var boom = new TestException("Boom");
6866
killSwitch.Abort(boom);
6967
// Could make sense to fail it with the propagated exception instead but that breaks
7068
// the assumptions in the CoupledTerminationFlowSpec
7169
tcs.Task.Wait(3.Seconds()).Should().BeTrue();
7270
tcs.Task.Result.Should().Be(0);
71+
return Task.CompletedTask;
7372
}, _materializer);
7473
}
7574

7675
[Fact(DisplayName = "The Maybe Source must allow external triggering of empty completion")]
77-
public void AllowExternalTriggeringOfEmptyCompletion()
76+
public async Task AllowExternalTriggeringOfEmptyCompletion()
7877
{
79-
this.AssertAllStagesStopped(() =>
80-
{
78+
await this.AssertAllStagesStoppedAsync(() => {
8179
var neverSource = Source.Maybe<int>().Where(_ => false);
8280
var counterSink = Sink.Aggregate<int, int>(0, (acc, _) => acc + 1);
8381
var (neverPromise, counterFuture) = neverSource
8482
.ToMaterialized(counterSink, Keep.Both)
8583
.Run(_materializer);
86-
84+
8785
// external cancellation
8886
neverPromise.TrySetResult(0).Should().BeTrue();
8987
counterFuture.Wait(3.Seconds()).Should().BeTrue();
9088
counterFuture.Result.Should().Be(0);
89+
return Task.CompletedTask;
9190
}, _materializer);
9291
}
9392

@@ -96,54 +95,52 @@ public void AllowExternalTriggeringOfEmptyCompletion()
9695
[Fact(
9796
DisplayName = "The Maybe Source must allow external triggering of empty completion when there was no demand",
9897
Skip = "Not working, check Maybe<T> source.")]
99-
public void AllowExternalTriggerOfEmptyCompletionWhenNoDemand()
98+
public async Task AllowExternalTriggerOfEmptyCompletionWhenNoDemand()
10099
{
101-
this.AssertAllStagesStopped(() =>
102-
{
100+
await this.AssertAllStagesStoppedAsync(async() => {
103101
var probe = this.CreateSubscriberProbe<int>();
104102
var promise = Source
105103
.Maybe<int>()
106104
.To(Sink.FromSubscriber(probe))
107105
.Run(_materializer);
108-
106+
109107
// external cancellation
110-
probe.EnsureSubscription();
108+
await probe.EnsureSubscriptionAsync();
111109
promise.TrySetResult(0).Should().BeTrue();
112-
probe.ExpectComplete();
110+
await probe.ExpectCompleteAsync();
113111
}, _materializer);
114112
}
115113

116114
[Fact(DisplayName = "The Maybe Source must allow external triggering of non-empty completion")]
117-
public void AllowExternalTriggerNonEmptyCompletion()
115+
public async Task AllowExternalTriggerNonEmptyCompletion()
118116
{
119-
this.AssertAllStagesStopped(() =>
120-
{
117+
await this.AssertAllStagesStoppedAsync(() => {
121118
var neverSource = Source.Maybe<int>();
122119
var counterSink = Sink.First<int>();
123120

124121
var (neverPromise, counterFuture) = neverSource
125122
.ToMaterialized(counterSink, Keep.Both)
126123
.Run(_materializer);
127-
124+
128125
// external cancellation
129126
neverPromise.TrySetResult(6).Should().BeTrue();
130127
counterFuture.Wait(3.Seconds()).Should().BeTrue();
131128
counterFuture.Result.Should().Be(6);
129+
return Task.CompletedTask;
132130
}, _materializer);
133131
}
134132

135133
[Fact(DisplayName = "The Maybe Source must allow external triggering of onError")]
136-
public void AllowExternalTriggerOnError()
134+
public async Task AllowExternalTriggerOnError()
137135
{
138-
this.AssertAllStagesStopped(() =>
139-
{
136+
await this.AssertAllStagesStoppedAsync(() => {
140137
var neverSource = Source.Maybe<int>();
141138
var counterSink = Sink.Aggregate<int, int>(0, (acc, _) => acc + 1);
142139

143140
var (neverPromise, counterFuture) = neverSource
144141
.ToMaterialized(counterSink, Keep.Both)
145142
.Run(_materializer);
146-
143+
147144
// external cancellation
148145
neverPromise.TrySetException(new TestException("Boom")).Should().BeTrue();
149146

@@ -152,6 +149,7 @@ public void AllowExternalTriggerOnError()
152149
.WithInnerException<AggregateException>()
153150
.WithInnerException<TestException>()
154151
.WithMessage("Boom");
152+
return Task.CompletedTask;
155153
}, _materializer);
156154
}
157155

0 commit comments

Comments
 (0)