Skip to content

Commit 95c1266

Browse files
authored
[54-74] LazySinkSpec (#6601)
* [54-74] `LazySinkSpec` * Changes to `async/await` * clean up
1 parent 2f402be commit 95c1266

File tree

1 file changed

+47
-44
lines changed

1 file changed

+47
-44
lines changed

src/core/Akka.Streams.Tests/Dsl/LazySinkSpec.cs

+47-44
Original file line numberDiff line numberDiff line change
@@ -44,53 +44,57 @@ private static Func<TMat> Fallback<TMat>()
4444
private static readonly Exception Ex = new TestException("");
4545

4646
[Fact]
47-
public void A_LazySink_must_work_in_the_happy_case()
47+
public async Task A_LazySink_must_work_in_the_happy_case()
4848
{
49-
this.AssertAllStagesStopped(async() =>
49+
await this.AssertAllStagesStoppedAsync(async() =>
5050
{
5151
var lazySink = Sink.LazyInitAsync(() => Task.FromResult(this.SinkProbe<int>()));
5252
var taskProbe = Source.From(Enumerable.Range(0, 11)).RunWith(lazySink, Materializer);
5353
var probe = await taskProbe.ShouldCompleteWithin(RemainingOrDefault);
5454
probe.Value.Request(100);
55-
Enumerable.Range(0, 11).ForEach(i => probe.Value.ExpectNext(i));
55+
foreach (var i in Enumerable.Range(0, 11))
56+
{
57+
await probe.Value.ExpectNextAsync(i);
58+
}
5659
}, Materializer);
5760
}
5861

5962
[Fact]
60-
public void A_LazySink_must_work_with_slow_sink_init()
63+
public async Task A_LazySink_must_work_with_slow_sink_init()
6164
{
62-
this.AssertAllStagesStopped(async() =>
65+
await this.AssertAllStagesStoppedAsync(async() =>
6366
{
6467
var p = new TaskCompletionSource<Sink<int, TestSubscriber.Probe<int>>>();
6568
var sourceProbe = this.CreateManualPublisherProbe<int>();
6669
var taskProbe = Source.FromPublisher(sourceProbe)
6770
.RunWith(Sink.LazyInitAsync(() => p.Task), Materializer);
6871

69-
var sourceSub = sourceProbe.ExpectSubscription();
70-
sourceSub.ExpectRequest(1);
72+
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
73+
await sourceSub.ExpectRequestAsync(1);
7174
sourceSub.SendNext(0);
72-
sourceSub.ExpectRequest(1);
73-
sourceProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
75+
await sourceSub.ExpectRequestAsync(1);
76+
await sourceProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
7477
taskProbe.Wait(TimeSpan.FromMilliseconds(200)).ShouldBeFalse();
7578

7679
p.SetResult(this.SinkProbe<int>());
7780
var complete = await taskProbe.ShouldCompleteWithin(RemainingOrDefault);
7881
var probe = complete.Value;
7982
probe.Request(100);
80-
probe.ExpectNext(0);
81-
Enumerable.Range(1,10).ForEach(i =>
83+
await probe.ExpectNextAsync(0);
84+
85+
foreach (var i in Enumerable.Range(1, 10))
8286
{
8387
sourceSub.SendNext(i);
84-
probe.ExpectNext(i);
85-
});
88+
await probe.ExpectNextAsync(i);
89+
}
8690
sourceSub.SendComplete();
8791
}, Materializer);
8892
}
8993

9094
[Fact]
91-
public void A_LazySink_must_complete_when_there_was_no_elements_in_stream()
95+
public async Task A_LazySink_must_complete_when_there_was_no_elements_in_stream()
9296
{
93-
this.AssertAllStagesStopped(async() =>
97+
await this.AssertAllStagesStoppedAsync(async() =>
9498
{
9599
var lazySink = Sink.LazyInitAsync(() => Task.FromResult(Sink.Aggregate(0, (int i, int i2) => i + i2)));
96100
var taskProbe = Source.Empty<int>().RunWith(lazySink, Materializer);
@@ -100,56 +104,55 @@ public void A_LazySink_must_complete_when_there_was_no_elements_in_stream()
100104
}
101105

102106
[Fact]
103-
public void A_LazySink_must_complete_normally_when_upstream_is_completed()
107+
public async Task A_LazySink_must_complete_normally_when_upstream_is_completed()
104108
{
105-
this.AssertAllStagesStopped(async() =>
109+
await this.AssertAllStagesStoppedAsync(async() =>
106110
{
107111
var lazySink = Sink.LazyInitAsync(() => Task.FromResult(this.SinkProbe<int>()));
108112
var taskProbe = Source.Single(1).RunWith(lazySink, Materializer);
109113
var taskResult = await taskProbe.ShouldCompleteWithin(RemainingOrDefault);
110-
taskResult.Value.Request(1).ExpectNext(1).ExpectComplete();
114+
await taskResult.Value.Request(1).ExpectNext(1).ExpectCompleteAsync();
111115
}, Materializer);
112116
}
113117

114118
[Fact]
115-
public void A_LazySink_must_fail_gracefully_when_sink_factory_method_failed()
119+
public async Task A_LazySink_must_fail_gracefully_when_sink_factory_method_failed()
116120
{
117-
this.AssertAllStagesStopped(() =>
118-
{
121+
await this.AssertAllStagesStoppedAsync(async() => {
119122
var sourceProbe = this.CreateManualPublisherProbe<int>();
120123
var taskProbe = Source.FromPublisher(sourceProbe).RunWith(Sink.LazyInitAsync<int, NotUsed>(() => throw Ex), Materializer);
121-
var sourceSub = sourceProbe.ExpectSubscription();
122-
sourceSub.ExpectRequest(1);
124+
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
125+
await sourceSub.ExpectRequestAsync(1);
123126
sourceSub.SendNext(0);
124-
sourceSub.ExpectCancellation();
127+
await sourceSub.ExpectCancellationAsync();
125128
taskProbe.Invoking(t => t.Wait()).Should().Throw<TestException>();
126129
}, Materializer);
127130
}
128131

129132
[Fact]
130-
public void A_LazySink_must_fail_gracefully_when_upstream_failed()
133+
public async Task A_LazySink_must_fail_gracefully_when_upstream_failed()
131134
{
132-
this.AssertAllStagesStopped(async() =>
135+
await this.AssertAllStagesStoppedAsync(async() =>
133136
{
134137
var sourceProbe = this.CreateManualPublisherProbe<int>();
135138
var lazySink = Sink.LazyInitAsync(() => Task.FromResult(this.SinkProbe<int>()));
136139
var taskProbe = Source.FromPublisher(sourceProbe).RunWith(lazySink, Materializer);
137140

138-
var sourceSub = sourceProbe.ExpectSubscription();
139-
sourceSub.ExpectRequest(1);
141+
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
142+
await sourceSub.ExpectRequestAsync(1);
140143
sourceSub.SendNext(0);
141144
var complete = await taskProbe.ShouldCompleteWithin(RemainingOrDefault);
142145
var probe = complete.Value;
143-
probe.Request(1).ExpectNext(0);
146+
await probe.Request(1).ExpectNextAsync(0);
144147
sourceSub.SendError(Ex);
145148
probe.ExpectError().Should().Be(Ex);
146149
}, Materializer);
147150
}
148151

149152
[Fact]
150-
public void A_LazySink_must_fail_gracefully_when_factory_task_failed()
153+
public async Task A_LazySink_must_fail_gracefully_when_factory_task_failed()
151154
{
152-
this.AssertAllStagesStopped(() =>
155+
await this.AssertAllStagesStoppedAsync(async() =>
153156
{
154157
var sourceProbe = this.CreateManualPublisherProbe<int>();
155158
var lazySink = Sink.LazyInitAsync(() => Task.FromException<Sink<int, TestSubscriber.Probe<int>>>(Ex));
@@ -159,38 +162,38 @@ public void A_LazySink_must_fail_gracefully_when_factory_task_failed()
159162
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.StoppingDecider))
160163
.Run(Materializer);
161164

162-
var sourceSub = sourceProbe.ExpectSubscription();
163-
sourceSub.ExpectRequest(1);
165+
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
166+
await sourceSub.ExpectRequestAsync(1);
164167
sourceSub.SendNext(0);
165168
taskProbe.Invoking(t => t.Wait(TimeSpan.FromMilliseconds(300))).Should().Throw<TestException>();
169+
166170
}, Materializer);
167171
}
168172

169173
[Fact]
170-
public void A_LazySink_must_cancel_upstream_when_internal_sink_is_cancelled()
174+
public async Task A_LazySink_must_cancel_upstream_when_internal_sink_is_cancelled()
171175
{
172-
this.AssertAllStagesStopped(async() =>
176+
await this.AssertAllStagesStoppedAsync(async() =>
173177
{
174178
var sourceProbe = this.CreateManualPublisherProbe<int>();
175179
var lazySink = Sink.LazyInitAsync(() => Task.FromResult(this.SinkProbe<int>()));
176180
var taskProbe = Source.FromPublisher(sourceProbe).RunWith(lazySink, Materializer);
177-
var sourceSub = sourceProbe.ExpectSubscription();
178-
sourceSub.ExpectRequest(1);
181+
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
182+
await sourceSub.ExpectRequestAsync(1);
179183
sourceSub.SendNext(0);
180-
sourceSub.ExpectRequest(1);
184+
await sourceSub.ExpectRequestAsync(1);
181185
var complete = await taskProbe.ShouldCompleteWithin(RemainingOrDefault);
182186
var probe = complete.Value;
183-
probe.Request(1).ExpectNext(0);
187+
await probe.Request(1).ExpectNextAsync(0);
184188
probe.Cancel();
185-
sourceSub.ExpectCancellation();
189+
await sourceSub.ExpectCancellationAsync();
186190
}, Materializer);
187191
}
188192

189193
[Fact]
190-
public void A_LazySink_must_fail_correctly_when_materialization_of_inner_sink_fails()
194+
public async Task A_LazySink_must_fail_correctly_when_materialization_of_inner_sink_fails()
191195
{
192-
this.AssertAllStagesStopped(() =>
193-
{
196+
await this.AssertAllStagesStoppedAsync(() => {
194197
var matFail = new TestException("fail!");
195198

196199
var task = Source.Single("whatever")
@@ -205,7 +208,7 @@ public void A_LazySink_must_fail_correctly_when_materialization_of_inner_sink_fa
205208
task.IsFaulted.ShouldBe(true);
206209
task.Exception.ShouldNotBe(null);
207210
task.Exception.Flatten().InnerException.Should().BeEquivalentTo(matFail);
208-
211+
return Task.CompletedTask;
209212
}, Materializer);
210213
}
211214

0 commit comments

Comments
 (0)