Skip to content

Commit c1cbb7f

Browse files
authored
[22-74] FlowPrefixAndTailSpec (#6570)
* [22-74] `FlowPrefixAndTailSpec` * Changes to `async` TestKit
1 parent 1a8d495 commit c1cbb7f

File tree

1 file changed

+75
-85
lines changed

1 file changed

+75
-85
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowPrefixAndTailSpec.cs

+75-85
Original file line numberDiff line numberDiff line change
@@ -39,41 +39,38 @@ private static
3939

4040

4141
[Fact]
42-
public void PrefixAndTail_must_work_on_empty_input()
42+
public async Task PrefixAndTail_must_work_on_empty_input()
4343
{
44-
this.AssertAllStagesStopped(() =>
45-
{
44+
await this.AssertAllStagesStoppedAsync(async() => {
4645
var futureSink = NewHeadSink;
4746
var fut = Source.Empty<int>().PrefixAndTail(10).RunWith(futureSink, Materializer);
4847
fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
4948
var tailFlow = fut.Result.Item2;
5049
var tailSubscriber = this.CreateManualSubscriberProbe<int>();
5150
tailFlow.To(Sink.FromSubscriber(tailSubscriber)).Run(Materializer);
52-
tailSubscriber.ExpectSubscriptionAndComplete();
51+
await tailSubscriber.ExpectSubscriptionAndCompleteAsync();
5352
}, Materializer);
5453
}
5554

5655
[Fact]
57-
public void PrefixAndTail_must_work_on_short_inputs()
56+
public async Task PrefixAndTail_must_work_on_short_inputs()
5857
{
59-
this.AssertAllStagesStopped(() =>
60-
{
58+
await this.AssertAllStagesStoppedAsync(async() => {
6159
var futureSink = NewHeadSink;
62-
var fut = Source.From(new [] {1,2,3}).PrefixAndTail(10).RunWith(futureSink, Materializer);
60+
var fut = Source.From(new[] { 1, 2, 3 }).PrefixAndTail(10).RunWith(futureSink, Materializer);
6361
fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
64-
fut.Result.Item1.Should().BeEquivalentTo(new[] {1, 2, 3});
62+
fut.Result.Item1.Should().BeEquivalentTo(new[] { 1, 2, 3 });
6563
var tailFlow = fut.Result.Item2;
6664
var tailSubscriber = this.CreateManualSubscriberProbe<int>();
6765
tailFlow.To(Sink.FromSubscriber(tailSubscriber)).Run(Materializer);
68-
tailSubscriber.ExpectSubscriptionAndComplete();
66+
await tailSubscriber.ExpectSubscriptionAndCompleteAsync();
6967
}, Materializer);
7068
}
7169

7270
[Fact]
73-
public void PrefixAndTail_must_work_on_longer_inputs()
71+
public async Task PrefixAndTail_must_work_on_longer_inputs()
7472
{
75-
this.AssertAllStagesStopped(() =>
76-
{
73+
await this.AssertAllStagesStoppedAsync(() => {
7774
var futureSink = NewHeadSink;
7875
var fut = Source.From(Enumerable.Range(1, 10)).PrefixAndTail(5).RunWith(futureSink, Materializer);
7976
fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
@@ -85,14 +82,14 @@ public void PrefixAndTail_must_work_on_longer_inputs()
8582
var fut2 = tail.Grouped(6).RunWith(futureSink2, Materializer);
8683
fut2.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
8784
fut2.Result.Should().BeEquivalentTo(Enumerable.Range(6, 5));
85+
return Task.CompletedTask;
8886
}, Materializer);
8987
}
9088

9189
[Fact]
92-
public void PrefixAndTail_must_handle_zero_take_count()
90+
public async Task PrefixAndTail_must_handle_zero_take_count()
9391
{
94-
this.AssertAllStagesStopped(() =>
95-
{
92+
await this.AssertAllStagesStoppedAsync(() => {
9693
var futureSink = NewHeadSink;
9794
var fut = Source.From(Enumerable.Range(1, 10)).PrefixAndTail(0).RunWith(futureSink, Materializer);
9895
fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
@@ -103,14 +100,14 @@ public void PrefixAndTail_must_handle_zero_take_count()
103100
var fut2 = tail.Grouped(11).RunWith(futureSink2, Materializer);
104101
fut2.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
105102
fut2.Result.Should().BeEquivalentTo(Enumerable.Range(1, 10));
103+
return Task.CompletedTask;
106104
}, Materializer);
107105
}
108106

109107
[Fact]
110-
public void PrefixAndTail_must_handle_negative_take_count()
108+
public async Task PrefixAndTail_must_handle_negative_take_count()
111109
{
112-
this.AssertAllStagesStopped(() =>
113-
{
110+
await this.AssertAllStagesStoppedAsync(() => {
114111
var futureSink = NewHeadSink;
115112
var fut = Source.From(Enumerable.Range(1, 10)).PrefixAndTail(-1).RunWith(futureSink, Materializer);
116113
fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
@@ -121,30 +118,29 @@ public void PrefixAndTail_must_handle_negative_take_count()
121118
var fut2 = tail.Grouped(11).RunWith(futureSink2, Materializer);
122119
fut2.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
123120
fut2.Result.Should().BeEquivalentTo(Enumerable.Range(1, 10));
121+
return Task.CompletedTask;
124122
}, Materializer);
125123
}
126124

127125
[Fact]
128-
public void PrefixAndTail_must_work_if_size_of_tak_is_equal_to_stream_size()
126+
public async Task PrefixAndTail_must_work_if_size_of_tak_is_equal_to_stream_size()
129127
{
130-
this.AssertAllStagesStopped(() =>
131-
{
128+
await this.AssertAllStagesStoppedAsync(async() => {
132129
var futureSink = NewHeadSink;
133-
var fut = Source.From(Enumerable.Range(1,10)).PrefixAndTail(10).RunWith(futureSink, Materializer);
130+
var fut = Source.From(Enumerable.Range(1, 10)).PrefixAndTail(10).RunWith(futureSink, Materializer);
134131
fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
135132
fut.Result.Item1.Should().BeEquivalentTo(Enumerable.Range(1, 10));
136133
var tail = fut.Result.Item2;
137134
var subscriber = this.CreateManualSubscriberProbe<int>();
138135
tail.To(Sink.FromSubscriber(subscriber)).Run(Materializer);
139-
subscriber.ExpectSubscriptionAndComplete();
136+
await subscriber.ExpectSubscriptionAndCompleteAsync();
140137
}, Materializer);
141138
}
142139

143140
[Fact]
144-
public void PrefixAndTail_must_throw_if_tail_is_attempted_to_be_materialized_twice()
141+
public async Task PrefixAndTail_must_throw_if_tail_is_attempted_to_be_materialized_twice()
145142
{
146-
this.AssertAllStagesStopped(() =>
147-
{
143+
await this.AssertAllStagesStoppedAsync(async() => {
148144
var futureSink = NewHeadSink;
149145
var fut = Source.From(Enumerable.Range(1, 2)).PrefixAndTail(1).RunWith(futureSink, Materializer);
150146
fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
@@ -160,15 +156,14 @@ public void PrefixAndTail_must_throw_if_tail_is_attempted_to_be_materialized_twi
160156
subscriber2.ExpectSubscriptionAndError()
161157
.Message.Should()
162158
.Be("Substream Source cannot be materialized more than once");
163-
subscriber1.RequestNext(2).ExpectComplete();
159+
await subscriber1.RequestNext(2).ExpectCompleteAsync();
164160
}, Materializer);
165161
}
166162

167163
[Fact]
168-
public void PrefixAndTail_must_signal_error_if_substream_has_been_not_subscribed_in_time()
164+
public async Task PrefixAndTail_must_signal_error_if_substream_has_been_not_subscribed_in_time()
169165
{
170-
this.AssertAllStagesStopped(() =>
171-
{
166+
await this.AssertAllStagesStoppedAsync(() => {
172167
var ms = 300;
173168

174169
var settings = ActorMaterializerSettings.Create(Sys)
@@ -190,21 +185,21 @@ public void PrefixAndTail_must_signal_error_if_substream_has_been_not_subscribed
190185
subscriber.ExpectSubscriptionAndError()
191186
.Message.Should()
192187
.Be("Substream Source has not been materialized in 00:00:00.3000000");
188+
return Task.CompletedTask;
193189
}, Materializer);
194190
}
195191

196192
[Fact]
197-
public void PrefixAndTail_must_not_fail_the_stream_if_substream_has_not_been_subscribed_in_time_and_configured_subscription_timeout_is_noop()
193+
public async Task PrefixAndTail_must_not_fail_the_stream_if_substream_has_not_been_subscribed_in_time_and_configured_subscription_timeout_is_noop()
198194
{
199-
this.AssertAllStagesStopped(() =>
200-
{
201-
var settings = ActorMaterializerSettings.Create(Sys)
202-
.WithSubscriptionTimeoutSettings(
203-
new StreamSubscriptionTimeoutSettings(
204-
StreamSubscriptionTimeoutTerminationMode.NoopTermination,
205-
TimeSpan.FromMilliseconds(1)));
195+
await this.AssertAllStagesStoppedAsync(async() => {
196+
var settings = ActorMaterializerSettings.Create(Sys)
197+
.WithSubscriptionTimeoutSettings(
198+
new StreamSubscriptionTimeoutSettings(
199+
StreamSubscriptionTimeoutTerminationMode.NoopTermination,
200+
TimeSpan.FromMilliseconds(1)));
206201
var tightTimeoutMaterializer = ActorMaterializer.Create(Sys, settings);
207-
202+
208203
var futureSink = NewHeadSink;
209204
var fut = Source.From(Enumerable.Range(1, 2)).PrefixAndTail(1).RunWith(futureSink, tightTimeoutMaterializer);
210205
fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
@@ -214,27 +209,26 @@ public void PrefixAndTail_must_not_fail_the_stream_if_substream_has_not_been_sub
214209
Thread.Sleep(200);
215210
fut.Result.Item2.To(Sink.FromSubscriber(subscriber)).Run(tightTimeoutMaterializer);
216211
subscriber.ExpectSubscription().Request(2);
217-
subscriber.ExpectNext(2).ExpectComplete();
212+
await subscriber.ExpectNext(2).ExpectCompleteAsync();
218213
}, Materializer);
219214
}
220215

221216
[Fact]
222-
public void PrefixAndTail_must_shut_down_main_stage_if_substream_is_empty_even_when_not_subscribed()
217+
public async Task PrefixAndTail_must_shut_down_main_stage_if_substream_is_empty_even_when_not_subscribed()
223218
{
224-
this.AssertAllStagesStopped(() =>
225-
{
219+
await this.AssertAllStagesStoppedAsync(() => {
226220
var futureSink = NewHeadSink;
227221
var fut = Source.Single(1).PrefixAndTail(1).RunWith(futureSink, Materializer);
228222
fut.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
229223
fut.Result.Item1.Should().ContainSingle(i => i == 1);
224+
return Task.CompletedTask;
230225
}, Materializer);
231226
}
232227

233228
[Fact]
234-
public void PrefixAndTail_must_handle_OnError_when_no_substream_is_open()
229+
public async Task PrefixAndTail_must_handle_OnError_when_no_substream_is_open()
235230
{
236-
this.AssertAllStagesStopped(() =>
237-
{
231+
await this.AssertAllStagesStoppedAsync(async() => {
238232
var publisher = this.CreateManualPublisherProbe<int>();
239233
var subscriber = this.CreateManualSubscriberProbe<(IImmutableList<int>, Source<int, NotUsed>)>();
240234

@@ -243,12 +237,12 @@ public void PrefixAndTail_must_handle_OnError_when_no_substream_is_open()
243237
.To(Sink.FromSubscriber(subscriber))
244238
.Run(Materializer);
245239

246-
var upstream = publisher.ExpectSubscription();
247-
var downstream = subscriber.ExpectSubscription();
240+
var upstream = await publisher.ExpectSubscriptionAsync();
241+
var downstream = await subscriber.ExpectSubscriptionAsync();
248242

249243
downstream.Request(1);
250244

251-
upstream.ExpectRequest();
245+
await upstream.ExpectRequestAsync();
252246
upstream.SendNext(1);
253247
upstream.SendError(TestException);
254248

@@ -257,10 +251,9 @@ public void PrefixAndTail_must_handle_OnError_when_no_substream_is_open()
257251
}
258252

259253
[Fact]
260-
public void PrefixAndTail_must_handle_OnError_when_substream_is_open()
254+
public async Task PrefixAndTail_must_handle_OnError_when_substream_is_open()
261255
{
262-
this.AssertAllStagesStopped(() =>
263-
{
256+
await this.AssertAllStagesStoppedAsync(async() => {
264257
var publisher = this.CreateManualPublisherProbe<int>();
265258
var subscriber = this.CreateManualSubscriberProbe<(IImmutableList<int>, Source<int, NotUsed>)>();
266259

@@ -269,32 +262,31 @@ public void PrefixAndTail_must_handle_OnError_when_substream_is_open()
269262
.To(Sink.FromSubscriber(subscriber))
270263
.Run(Materializer);
271264

272-
var upstream = publisher.ExpectSubscription();
273-
var downstream = subscriber.ExpectSubscription();
265+
var upstream = await publisher.ExpectSubscriptionAsync();
266+
var downstream = await subscriber.ExpectSubscriptionAsync();
274267

275268
downstream.Request(1000);
276269

277-
upstream.ExpectRequest();
270+
await upstream.ExpectRequestAsync();
278271
upstream.SendNext(1);
279272

280-
var t = subscriber.ExpectNext();
273+
var t = await subscriber.ExpectNextAsync();
281274
t.Item1.Should().ContainSingle(i => i == 1);
282275
var tail = t.Item2;
283-
subscriber.ExpectComplete();
276+
await subscriber.ExpectCompleteAsync();
284277

285278
var substreamSubscriber = this.CreateManualSubscriberProbe<int>();
286279
tail.To(Sink.FromSubscriber(substreamSubscriber)).Run(Materializer);
287-
substreamSubscriber.ExpectSubscription();
280+
await substreamSubscriber.ExpectSubscriptionAsync();
288281
upstream.SendError(TestException);
289282
substreamSubscriber.ExpectError().Should().Be(TestException);
290283
}, Materializer);
291284
}
292285

293286
[Fact]
294-
public void PrefixAndTail_must_handle_master_stream_cancellation()
287+
public async Task PrefixAndTail_must_handle_master_stream_cancellation()
295288
{
296-
this.AssertAllStagesStopped(() =>
297-
{
289+
await this.AssertAllStagesStoppedAsync(async() => {
298290
var publisher = this.CreateManualPublisherProbe<int>();
299291
var subscriber = this.CreateManualSubscriberProbe<(IImmutableList<int>, Source<int, NotUsed>)>();
300292

@@ -303,24 +295,23 @@ public void PrefixAndTail_must_handle_master_stream_cancellation()
303295
.To(Sink.FromSubscriber(subscriber))
304296
.Run(Materializer);
305297

306-
var upstream = publisher.ExpectSubscription();
307-
var downstream = subscriber.ExpectSubscription();
298+
var upstream = await publisher.ExpectSubscriptionAsync();
299+
var downstream = await subscriber.ExpectSubscriptionAsync();
308300

309301
downstream.Request(1);
310302

311-
upstream.ExpectRequest();
303+
await upstream.ExpectRequestAsync();
312304
upstream.SendNext(1);
313305

314306
downstream.Cancel();
315-
upstream.ExpectCancellation();
307+
await upstream.ExpectCancellationAsync();
316308
}, Materializer);
317309
}
318310

319311
[Fact]
320-
public void PrefixAndTail_must_handle_substream_cancellation()
312+
public async Task PrefixAndTail_must_handle_substream_cancellation()
321313
{
322-
this.AssertAllStagesStopped(() =>
323-
{
314+
await this.AssertAllStagesStoppedAsync(async() => {
324315
var publisher = this.CreateManualPublisherProbe<int>();
325316
var subscriber = this.CreateManualSubscriberProbe<(IImmutableList<int>, Source<int, NotUsed>)>();
326317

@@ -329,32 +320,31 @@ public void PrefixAndTail_must_handle_substream_cancellation()
329320
.To(Sink.FromSubscriber(subscriber))
330321
.Run(Materializer);
331322

332-
var upstream = publisher.ExpectSubscription();
333-
var downstream = subscriber.ExpectSubscription();
323+
var upstream = await publisher.ExpectSubscriptionAsync();
324+
var downstream = await subscriber.ExpectSubscriptionAsync();
334325

335326
downstream.Request(1000);
336327

337-
upstream.ExpectRequest();
328+
await upstream.ExpectRequestAsync();
338329
upstream.SendNext(1);
339330

340-
var t = subscriber.ExpectNext();
331+
var t = await subscriber.ExpectNextAsync();
341332
t.Item1.Should().ContainSingle(i => i == 1);
342333
var tail = t.Item2;
343-
subscriber.ExpectComplete();
334+
await subscriber.ExpectCompleteAsync();
344335

345336
var substreamSubscriber = this.CreateManualSubscriberProbe<int>();
346337
tail.To(Sink.FromSubscriber(substreamSubscriber)).Run(Materializer);
347-
substreamSubscriber.ExpectSubscription().Cancel();
338+
(await substreamSubscriber.ExpectSubscriptionAsync()).Cancel();
348339

349-
upstream.ExpectCancellation();
340+
await upstream.ExpectCancellationAsync();
350341
}, Materializer);
351342
}
352343

353344
[Fact]
354-
public void PrefixAndTail_must_pass_along_early_cancellation()
345+
public async Task PrefixAndTail_must_pass_along_early_cancellation()
355346
{
356-
this.AssertAllStagesStopped(() =>
357-
{
347+
await this.AssertAllStagesStoppedAsync(async() => {
358348
var up = this.CreateManualPublisherProbe<int>();
359349
var down = this.CreateManualSubscriberProbe<(IImmutableList<int>, Source<int, NotUsed>)>();
360350

@@ -363,16 +353,16 @@ public void PrefixAndTail_must_pass_along_early_cancellation()
363353
.To(Sink.FromSubscriber(down))
364354
.Run(Materializer);
365355

366-
var downstream = down.ExpectSubscription();
356+
var downstream = await down.ExpectSubscriptionAsync();
367357
downstream.Cancel();
368358
up.Subscribe(flowSubscriber);
369-
var upSub = up.ExpectSubscription();
370-
upSub.ExpectCancellation();
359+
var upSub = await up.ExpectSubscriptionAsync();
360+
await upSub.ExpectCancellationAsync();
371361
}, Materializer);
372362
}
373363

374364
[Fact]
375-
public void PrefixAndTail_must_work_even_if_tail_subscriber_arrives_after_substream_completion()
365+
public async Task PrefixAndTail_must_work_even_if_tail_subscriber_arrives_after_substream_completion()
376366
{
377367
var pub = this.CreateManualPublisherProbe<int>();
378368
var sub = this.CreateManualSubscriberProbe<int>();
@@ -381,7 +371,7 @@ public void PrefixAndTail_must_work_even_if_tail_subscriber_arrives_after_substr
381371
Source.FromPublisher(pub)
382372
.PrefixAndTail(1)
383373
.RunWith(Sink.First<(IImmutableList<int>, Source<int, NotUsed>)>(), Materializer);
384-
var s = pub.ExpectSubscription();
374+
var s = await pub.ExpectSubscriptionAsync();
385375
s.SendNext(0);
386376

387377
f.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
@@ -390,7 +380,7 @@ public void PrefixAndTail_must_work_even_if_tail_subscriber_arrives_after_substr
390380
s.SendComplete();
391381

392382
tailPub.Subscribe(sub);
393-
sub.ExpectSubscriptionAndComplete();
383+
await sub.ExpectSubscriptionAndCompleteAsync();
394384
}
395385
}
396386
}

0 commit comments

Comments
 (0)