Skip to content

Commit 4e6817d

Browse files
authored
[53-74] LazyFlowSpec (#6600)
* [53-74] `LazyFlowSpec` * Changes to `async/await`
1 parent 95c1266 commit 4e6817d

File tree

1 file changed

+64
-70
lines changed

1 file changed

+64
-70
lines changed

src/core/Akka.Streams.Tests/Dsl/LazyFlowSpec.cs

+64-70
Original file line numberDiff line numberDiff line change
@@ -34,183 +34,176 @@ public LazyFlowSpec(ITestOutputHelper helper)
3434
private static readonly Exception Ex = new TestException("");
3535

3636
private static readonly Task<Flow<int, int, NotUsed>> FlowF = Task.FromResult(Flow.Create<int>());
37-
37+
3838
[Fact]
39-
public void A_LazyFlow_must_work_in_happy_case()
39+
public async Task A_LazyFlow_must_work_in_happy_case()
4040
{
41-
this.AssertAllStagesStopped(() =>
42-
{
43-
Func<Task<Flow<int, string, NotUsed>>> MapF(int e) => () =>
44-
Task.FromResult(Flow.FromFunction<int, string>(i => (i * e).ToString()));
41+
await this.AssertAllStagesStoppedAsync(async() => {
42+
Func<Task<Flow<int, string, NotUsed>>> MapF(int e) => () =>
43+
Task.FromResult(Flow.FromFunction<int, string>(i => (i * e).ToString()));
4544

4645
var probe = Source.From(Enumerable.Range(2, 10))
4746
.Via(Flow.LazyInitAsync(MapF(2)))
4847
.RunWith(this.SinkProbe<string>(), Materializer);
4948
probe.Request(100);
50-
Enumerable.Range(2, 10).Select(i => (i * 2).ToString()).ForEach(i => probe.ExpectNext(i));
49+
foreach(var i in Enumerable.Range(2, 10).Select(i => (i * 2).ToString()))
50+
{
51+
await probe.ExpectNextAsync(i);
52+
}
5153
}, Materializer);
5254
}
5355

5456
[Fact]
55-
public void A_LazyFlow_must_work_with_slow_flow_init()
57+
public async Task A_LazyFlow_must_work_with_slow_flow_init()
5658
{
57-
this.AssertAllStagesStopped(() =>
58-
{
59+
await this.AssertAllStagesStoppedAsync(async() => {
5960
var p = new TaskCompletionSource<Flow<int, int, NotUsed>>();
6061
var sourceProbe = this.CreateManualPublisherProbe<int>();
6162
var flowProbe = Source.FromPublisher(sourceProbe)
6263
.Via(Flow.LazyInitAsync(() => p.Task))
6364
.RunWith(this.SinkProbe<int>(), Materializer);
6465

65-
var sourceSub = sourceProbe.ExpectSubscription();
66+
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
6667
flowProbe.Request(1);
67-
sourceSub.ExpectRequest(1);
68+
await sourceSub.ExpectRequestAsync(1);
6869
sourceSub.SendNext(0);
69-
sourceSub.ExpectRequest(1);
70-
sourceProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
70+
await sourceSub.ExpectRequestAsync(1);
71+
await sourceProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
7172

7273
p.SetResult(Flow.Create<int>());
7374
flowProbe.Request(99);
74-
flowProbe.ExpectNext(0);
75-
Enumerable.Range(1, 10).ForEach(i =>
76-
{
75+
await flowProbe.ExpectNextAsync(0);
76+
foreach(var i in Enumerable.Range(0, 10))
77+
{
7778
sourceSub.SendNext(i);
78-
flowProbe.ExpectNext(i);
79-
});
79+
await flowProbe.ExpectNextAsync(i);
80+
}
8081
sourceSub.SendComplete();
8182
}, Materializer);
8283
}
8384

8485
[Fact]
85-
public void A_LazyFlow_must_complete_when_there_was_no_elements_in_stream()
86+
public async Task A_LazyFlow_must_complete_when_there_was_no_elements_in_stream()
8687
{
87-
this.AssertAllStagesStopped(() =>
88-
{
89-
var probe = Source.Empty<int>()
90-
.Via(Flow.LazyInitAsync(() => FlowF))
91-
.RunWith(this.SinkProbe<int>(), Materializer);
92-
probe.Request(1).ExpectComplete();
88+
await this.AssertAllStagesStoppedAsync(async() => {
89+
var probe = Source.Empty<int>()
90+
.Via(Flow.LazyInitAsync(() => FlowF))
91+
.RunWith(this.SinkProbe<int>(), Materializer);
92+
await probe.Request(1).ExpectCompleteAsync();
9393
}, Materializer);
9494
}
9595

9696
[Fact]
97-
public void A_LazyFlow_must_complete_normally_when_upstream_completes_BEFORE_the_stage_has_switched_to_the_inner_flow()
97+
public async Task A_LazyFlow_must_complete_normally_when_upstream_completes_BEFORE_the_stage_has_switched_to_the_inner_flow()
9898
{
99-
this.AssertAllStagesStopped(() =>
100-
{
99+
await this.AssertAllStagesStoppedAsync(async() => {
101100
var promise = new TaskCompletionSource<Flow<int, int, NotUsed>>();
102101
var (pub, sub) = this.SourceProbe<int>()
103102
.ViaMaterialized(Flow.LazyInitAsync(() => promise.Task), Keep.Left)
104103
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
105104
.Run(Materializer);
106105

107106
sub.Request(1);
108-
pub.SendNext(1).SendComplete();
107+
await pub.SendNext(1).SendCompleteAsync();
109108
promise.SetResult(Flow.Create<int>());
110-
sub.ExpectNext(1).ExpectComplete();
109+
await sub.ExpectNext(1).ExpectCompleteAsync();
111110
}, Materializer);
112111
}
113112

114113
[Fact]
115-
public void A_LazyFlow_must_complete_normally_when_upstream_completes_AFTER_the_stage_has_switched_to_the_inner_flow()
114+
public async Task A_LazyFlow_must_complete_normally_when_upstream_completes_AFTER_the_stage_has_switched_to_the_inner_flow()
116115
{
117-
this.AssertAllStagesStopped(() =>
118-
{
119-
var (pub, sub) = this.SourceProbe<int>()
120-
.ViaMaterialized(Flow.LazyInitAsync(() => Task.FromResult(Flow.Create<int>())), Keep.Left)
121-
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
122-
.Run(Materializer);
116+
await this.AssertAllStagesStoppedAsync(async() => {
117+
var (pub, sub) = this.SourceProbe<int>()
118+
.ViaMaterialized(Flow.LazyInitAsync(() => Task.FromResult(Flow.Create<int>())), Keep.Left)
119+
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
120+
.Run(Materializer);
123121

124122
sub.Request(1);
125-
pub.SendNext(1);
126-
sub.ExpectNext(1);
127-
pub.SendComplete();
128-
sub.ExpectComplete();
123+
await pub.SendNextAsync(1);
124+
await sub.ExpectNextAsync(1);
125+
await pub.SendCompleteAsync();
126+
await sub.ExpectCompleteAsync();
129127
}, Materializer);
130128
}
131129

132130
[Fact]
133-
public void A_LazyFlow_must_fail_gracefully_when_flow_factory_method_failed()
131+
public async Task A_LazyFlow_must_fail_gracefully_when_flow_factory_method_failed()
134132
{
135-
this.AssertAllStagesStopped(() =>
136-
{
133+
await this.AssertAllStagesStoppedAsync(async() => {
137134
var sourceProbe = this.CreateManualPublisherProbe<int>();
138135
var probe = Source.FromPublisher(sourceProbe)
139136
.Via(Flow.LazyInitAsync<int, int, NotUsed>(() => throw Ex))
140137
.RunWith(this.SinkProbe<int>(), Materializer);
141138

142-
var sourceSub = sourceProbe.ExpectSubscription();
139+
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
143140
probe.Request(1);
144-
sourceSub.ExpectRequest(1);
141+
await sourceSub.ExpectRequestAsync(1);
145142
sourceSub.SendNext(0);
146-
sourceSub.ExpectCancellation();
143+
await sourceSub.ExpectCancellationAsync();
147144
probe.ExpectError().Should().Be(Ex);
148145
}, Materializer);
149146
}
150147

151148
[Fact]
152-
public void A_LazyFlow_must_fail_gracefully_when_upstream_failed()
149+
public async Task A_LazyFlow_must_fail_gracefully_when_upstream_failed()
153150
{
154-
this.AssertAllStagesStopped(() =>
155-
{
151+
await this.AssertAllStagesStoppedAsync(async() => {
156152
var sourceProbe = this.CreateManualPublisherProbe<int>();
157153
var probe = Source.FromPublisher(sourceProbe)
158154
.Via(Flow.LazyInitAsync(() => FlowF))
159155
.RunWith(this.SinkProbe<int>(), Materializer);
160156

161-
var sourceSub = sourceProbe.ExpectSubscription();
162-
sourceSub.ExpectRequest(1);
157+
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
158+
await sourceSub.ExpectRequestAsync(1);
163159
sourceSub.SendNext(0);
164-
probe.Request(1).ExpectNext(0);
160+
await probe.Request(1).ExpectNextAsync(0);
165161
sourceSub.SendError(Ex);
166162
probe.ExpectError().Should().Be(Ex);
167163
}, Materializer);
168164
}
169165

170166
[Fact]
171-
public void A_LazyFlow_must_fail_gracefully_when_factory_task_failed()
167+
public async Task A_LazyFlow_must_fail_gracefully_when_factory_task_failed()
172168
{
173-
this.AssertAllStagesStopped(() =>
174-
{
169+
await this.AssertAllStagesStoppedAsync(async() => {
175170
var sourceProbe = this.CreateManualPublisherProbe<int>();
176171
var flowprobe = Source.FromPublisher(sourceProbe)
177172
.Via(Flow.LazyInitAsync(() => Task.FromException<Flow<int, int, NotUsed>>(Ex)))
178173
.RunWith(this.SinkProbe<int>(), Materializer);
179174

180-
var sourceSub = sourceProbe.ExpectSubscription();
181-
sourceSub.ExpectRequest(1);
175+
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
176+
await sourceSub.ExpectRequestAsync(1);
182177
sourceSub.SendNext(0);
183178
var error = flowprobe.Request(1).ExpectError().As<AggregateException>();
184179
error.Flatten().InnerException.Should().Be(Ex);
185180
}, Materializer);
186181
}
187182

188183
[Fact]
189-
public void A_LazyFlow_must_cancel_upstream_when_the_downstream_is_cancelled()
184+
public async Task A_LazyFlow_must_cancel_upstream_when_the_downstream_is_cancelled()
190185
{
191-
this.AssertAllStagesStopped(() =>
192-
{
186+
await this.AssertAllStagesStoppedAsync(async() => {
193187
var sourceProbe = this.CreateManualPublisherProbe<int>();
194188
var probe = Source.FromPublisher(sourceProbe)
195189
.Via(Flow.LazyInitAsync(() => FlowF))
196190
.RunWith(this.SinkProbe<int>(), Materializer);
197191

198-
var sourceSub = sourceProbe.ExpectSubscription();
192+
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
199193
probe.Request(1);
200-
sourceSub.ExpectRequest(1);
194+
await sourceSub.ExpectRequestAsync(1);
201195
sourceSub.SendNext(0);
202-
sourceSub.ExpectRequest(1);
203-
probe.ExpectNext(0);
196+
await sourceSub.ExpectRequestAsync(1);
197+
await probe.ExpectNextAsync(0);
204198
probe.Cancel();
205-
sourceSub.ExpectCancellation();
199+
await sourceSub.ExpectCancellationAsync();
206200
}, Materializer);
207201
}
208202

209203
[Fact]
210-
public void A_LazyFlow_must_fail_correctly_when_factory_throw_error()
204+
public async Task A_LazyFlow_must_fail_correctly_when_factory_throw_error()
211205
{
212-
this.AssertAllStagesStopped(() =>
213-
{
206+
await this.AssertAllStagesStoppedAsync(() => {
214207
const string msg = "fail!";
215208
var matFail = new TestException(msg);
216209

@@ -220,6 +213,7 @@ public void A_LazyFlow_must_fail_correctly_when_factory_throw_error()
220213
.Invoking(source => source.Run(Materializer));
221214

222215
result.Should().Throw<TestException>().WithMessage(msg);
216+
return Task.CompletedTask;
223217
}, Materializer);
224218
}
225219
}

0 commit comments

Comments
 (0)