Skip to content

Commit 40fdc1b

Browse files
eabaAaronontheweb
andauthored
[70-74] GraphStageLogicSpec (#6618)
* [70-74] `GraphStageLogicSpec` * more improves --------- Co-authored-by: Aaron Stannard <[email protected]>
1 parent 154d842 commit 40fdc1b

File tree

1 file changed

+63
-70
lines changed

1 file changed

+63
-70
lines changed

src/core/Akka.Streams.Tests/Implementation/GraphStageLogicSpec.cs

+63-70
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System;
99
using System.Linq;
1010
using System.Threading;
11+
using System.Threading.Tasks;
1112
using Akka.Actor;
1213
using Akka.Configuration;
1314
using Akka.Event;
@@ -273,37 +274,34 @@ public GraphStageLogicSpec(ITestOutputHelper output) : base(output, Config)
273274
}
274275

275276
[Fact]
276-
public void A_GraphStageLogic_must_read_N_and_emit_N_before_completing()
277+
public async Task A_GraphStageLogic_must_read_N_and_emit_N_before_completing()
277278
{
278-
this.AssertAllStagesStopped(() =>
279-
{
280-
Source.From(Enumerable.Range(1, 10))
281-
.Via(new ReadNEmitN(2))
282-
.RunWith(this.SinkProbe<int>(), Materializer)
283-
.Request(10)
284-
.ExpectNext( 1, 2)
285-
.ExpectComplete();
279+
await this.AssertAllStagesStoppedAsync(async() => {
280+
await Source.From(Enumerable.Range(1, 10))
281+
.Via(new ReadNEmitN(2))
282+
.RunWith(this.SinkProbe<int>(), Materializer)
283+
.Request(10)
284+
.ExpectNext(1, 2)
285+
.ExpectCompleteAsync();
286286
}, Materializer);
287287
}
288288

289289
[Fact]
290-
public void A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_completes_before_N_is_sent()
290+
public async Task A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_completes_before_N_is_sent()
291291
{
292-
this.AssertAllStagesStopped(() =>
293-
{
294-
Source.From(Enumerable.Range(1, 5))
295-
.Via(new ReadNEmitN(6))
296-
.RunWith(this.SinkProbe<int>(), Materializer)
297-
.Request(10)
298-
.ExpectComplete();
292+
await this.AssertAllStagesStoppedAsync(async() => {
293+
await Source.From(Enumerable.Range(1, 5))
294+
.Via(new ReadNEmitN(6))
295+
.RunWith(this.SinkProbe<int>(), Materializer)
296+
.Request(10)
297+
.ExpectCompleteAsync();
299298
}, Materializer);
300299
}
301300

302301
[Fact]
303-
public void A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_fails_before_N_is_sent()
302+
public async Task A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_fails_before_N_is_sent()
304303
{
305-
this.AssertAllStagesStopped(() =>
306-
{
304+
await this.AssertAllStagesStoppedAsync(() => {
307305
var error = new ArgumentException("Don't argue like that!");
308306
Source.From(Enumerable.Range(1, 5))
309307
.Select(x =>
@@ -316,49 +314,47 @@ public void A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_fails_befo
316314
.RunWith(this.SinkProbe<int>(), Materializer)
317315
.Request(10)
318316
.ExpectError().Should().Be(error);
317+
return Task.CompletedTask;
319318
}, Materializer);
320319
}
321320

322321
[Fact]
323-
public void A_GraphStageLogic_must_read_N_should_provide_elements_read_if_OnComplete_happens_before_N_elements_have_been_seen()
322+
public async Task A_GraphStageLogic_must_read_N_should_provide_elements_read_if_OnComplete_happens_before_N_elements_have_been_seen()
324323
{
325-
this.AssertAllStagesStopped(() =>
326-
{
327-
Source.From(Enumerable.Range(1, 5))
328-
.Via(new ReadNEmitRestOnComplete(6))
329-
.RunWith(this.SinkProbe<int>(), Materializer)
330-
.Request(10)
331-
.ExpectNext( 1, 2, 3, 4, 5)
332-
.ExpectComplete();
324+
await this.AssertAllStagesStoppedAsync(async() => {
325+
await Source.From(Enumerable.Range(1, 5))
326+
.Via(new ReadNEmitRestOnComplete(6))
327+
.RunWith(this.SinkProbe<int>(), Materializer)
328+
.Request(10)
329+
.ExpectNext(1, 2, 3, 4, 5)
330+
.ExpectCompleteAsync();
333331
}, Materializer);
334332
}
335333

336334
[Fact]
337-
public void A_GraphStageLogic_must_emit_all_things_before_completing()
335+
public async Task A_GraphStageLogic_must_emit_all_things_before_completing()
338336
{
339-
this.AssertAllStagesStopped(() =>
340-
{
341-
Source.Empty<int>()
342-
.Via(new Emit1234().Named("testStage"))
343-
.RunWith(this.SinkProbe<int>(), Materializer)
344-
.Request(5)
345-
.ExpectNext(1)
346-
//emitting with callback gives nondeterminism whether 2 or 3 will be pushed first
347-
.ExpectNextUnordered(2, 3)
348-
.ExpectNext(4)
349-
.ExpectComplete();
337+
await this.AssertAllStagesStoppedAsync(async() => {
338+
await Source.Empty<int>()
339+
.Via(new Emit1234().Named("testStage"))
340+
.RunWith(this.SinkProbe<int>(), Materializer)
341+
.Request(5)
342+
.ExpectNext(1)
343+
//emitting with callback gives nondeterminism whether 2 or 3 will be pushed first
344+
.ExpectNextUnordered(2, 3)
345+
.ExpectNext(4)
346+
.ExpectCompleteAsync();
350347
}, Materializer);
351348
}
352349

353350
[Fact]
354-
public void A_GraphStageLogic_must_emit_all_things_before_completing_with_two_fused_stages()
351+
public async Task A_GraphStageLogic_must_emit_all_things_before_completing_with_two_fused_stages()
355352
{
356-
this.AssertAllStagesStopped(() =>
357-
{
353+
await this.AssertAllStagesStoppedAsync(async() => {
358354
var flow = Flow.Create<int>().Via(new Emit1234()).Via(new Emit5678());
359355
var g = Streams.Implementation.Fusing.Fusing.Aggressive(flow);
360356

361-
Source.Empty<int>()
357+
await Source.Empty<int>()
362358
.Via(g)
363359
.RunWith(this.SinkProbe<int>(), Materializer)
364360
.Request(9)
@@ -370,19 +366,18 @@ public void A_GraphStageLogic_must_emit_all_things_before_completing_with_two_fu
370366
//emitting with callback gives nondeterminism whether 6 or 7 will be pushed first
371367
.ExpectNextUnordered(6, 7)
372368
.ExpectNext(8)
373-
.ExpectComplete();
369+
.ExpectCompleteAsync();
374370
}, Materializer);
375371
}
376372

377373
[Fact]
378-
public void A_GraphStageLogic_must_emit_all_things_before_completing_with_three_fused_stages()
374+
public async Task A_GraphStageLogic_must_emit_all_things_before_completing_with_three_fused_stages()
379375
{
380-
this.AssertAllStagesStopped(() =>
381-
{
376+
await this.AssertAllStagesStoppedAsync(async() => {
382377
var flow = Flow.Create<int>().Via(new Emit1234()).Via(new PassThrough()).Via(new Emit5678());
383378
var g = Streams.Implementation.Fusing.Fusing.Aggressive(flow);
384379

385-
Source.Empty<int>()
380+
await Source.Empty<int>()
386381
.Via(g)
387382
.RunWith(this.SinkProbe<int>(), Materializer)
388383
.Request(9)
@@ -394,20 +389,20 @@ public void A_GraphStageLogic_must_emit_all_things_before_completing_with_three_
394389
//emitting with callback gives nondeterminism whether 6 or 7 will be pushed first
395390
.ExpectNextUnordered(6, 7)
396391
.ExpectNext(8)
397-
.ExpectComplete();
392+
.ExpectCompleteAsync();
398393
}, Materializer);
399394
}
400395

401396
[Fact]
402-
public void A_GraphStageLogic_must_emit_properly_after_empty_iterable()
397+
public async Task A_GraphStageLogic_must_emit_properly_after_empty_iterable()
403398
{
404-
this.AssertAllStagesStopped(() =>
405-
{
406-
Source.FromGraph(new EmitEmptyIterable())
407-
.RunWith(Sink.Seq<int>(), Materializer)
408-
.Result.Should()
409-
.HaveCount(1)
410-
.And.OnlyContain(x => x == 42);
399+
await this.AssertAllStagesStoppedAsync(() => {
400+
Source.FromGraph(new EmitEmptyIterable())
401+
.RunWith(Sink.Seq<int>(), Materializer)
402+
.Result.Should()
403+
.HaveCount(1)
404+
.And.OnlyContain(x => x == 42);
405+
return Task.CompletedTask;
411406
}, Materializer);
412407
}
413408

@@ -425,17 +420,15 @@ public void A_GraphStageLogic_must_support_logging_in_custom_graphstage()
425420
}
426421

427422
[Fact]
428-
public void A_GraphStageLogic_must_invoke_livecycle_hooks_in_the_right_order()
423+
public async Task A_GraphStageLogic_must_invoke_livecycle_hooks_in_the_right_order()
429424
{
430-
this.AssertAllStagesStopped(() =>
431-
{
425+
await this.AssertAllStagesStoppedAsync(async() => {
432426
var g = new LifecycleStage(TestActor);
433427

434-
Source.Single(1).Via(g).RunWith(Sink.Ignore<int>(), Materializer);
435-
ExpectMsg("preStart");
436-
ExpectMsg("pulled");
437-
ExpectMsg("postStop");
438-
428+
await Source.Single(1).Via(g).RunWith(Sink.Ignore<int>(), Materializer);
429+
await ExpectMsgAsync("preStart");
430+
await ExpectMsgAsync("pulled");
431+
await ExpectMsgAsync("postStop");
439432
}, Materializer);
440433
}
441434

@@ -492,14 +485,14 @@ public void A_GraphStageLogic_must_not_double_terminate_a_single_stage()
492485
{
493486
WithBaseBuilderSetup(
494487
new GraphStage<FlowShape<int, int>>[] {new DoubleTerminateStage(TestActor), new PassThrough()},
495-
interpreter =>
488+
async interpreter =>
496489
{
497490
interpreter.Complete(interpreter.Connections[0]);
498491
interpreter.Cancel(interpreter.Connections[1], SubscriptionWithCancelException.NoMoreElementsNeeded.Instance);
499492
interpreter.Execute(2);
500493

501-
ExpectMsg("postStop2");
502-
ExpectNoMsg(0);
494+
await ExpectMsgAsync("postStop2");
495+
await ExpectNoMsgAsync(0);
503496

504497
interpreter.IsCompleted.Should().BeFalse();
505498
interpreter.IsSuspended.Should().BeFalse();

0 commit comments

Comments
 (0)