-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[70-74] GraphStageLogicSpec
#6618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e439225
53483ba
de4a86a
15f6df9
83aa084
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
using System; | ||
using System.Linq; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Akka.Actor; | ||
using Akka.Configuration; | ||
using Akka.Event; | ||
|
@@ -273,37 +274,34 @@ public GraphStageLogicSpec(ITestOutputHelper output) : base(output, Config) | |
} | ||
|
||
[Fact] | ||
public void A_GraphStageLogic_must_read_N_and_emit_N_before_completing() | ||
public async Task A_GraphStageLogic_must_read_N_and_emit_N_before_completing() | ||
{ | ||
this.AssertAllStagesStopped(() => | ||
{ | ||
Source.From(Enumerable.Range(1, 10)) | ||
.Via(new ReadNEmitN(2)) | ||
.RunWith(this.SinkProbe<int>(), Materializer) | ||
.Request(10) | ||
.ExpectNext( 1, 2) | ||
.ExpectComplete(); | ||
await this.AssertAllStagesStoppedAsync(async() => { | ||
await Source.From(Enumerable.Range(1, 10)) | ||
.Via(new ReadNEmitN(2)) | ||
.RunWith(this.SinkProbe<int>(), Materializer) | ||
.Request(10) | ||
.ExpectNext(1, 2) | ||
.ExpectCompleteAsync(); | ||
}, Materializer); | ||
} | ||
|
||
[Fact] | ||
public void A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_completes_before_N_is_sent() | ||
public async Task A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_completes_before_N_is_sent() | ||
{ | ||
this.AssertAllStagesStopped(() => | ||
{ | ||
Source.From(Enumerable.Range(1, 5)) | ||
.Via(new ReadNEmitN(6)) | ||
.RunWith(this.SinkProbe<int>(), Materializer) | ||
.Request(10) | ||
.ExpectComplete(); | ||
await this.AssertAllStagesStoppedAsync(async() => { | ||
await Source.From(Enumerable.Range(1, 5)) | ||
.Via(new ReadNEmitN(6)) | ||
.RunWith(this.SinkProbe<int>(), Materializer) | ||
.Request(10) | ||
.ExpectCompleteAsync(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good? @Aaronontheweb There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks good, but you have a lot of code in other PRs today that looks like: ...
.ExpectComplete();
return Task.CompletedTask; You should change that to .ExpectCompleteAsync(); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I will do it - bit by bit! |
||
}, Materializer); | ||
} | ||
|
||
[Fact] | ||
public void A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_fails_before_N_is_sent() | ||
public async Task A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_fails_before_N_is_sent() | ||
{ | ||
this.AssertAllStagesStopped(() => | ||
{ | ||
await this.AssertAllStagesStoppedAsync(() => { | ||
var error = new ArgumentException("Don't argue like that!"); | ||
Source.From(Enumerable.Range(1, 5)) | ||
.Select(x => | ||
|
@@ -316,49 +314,47 @@ public void A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_fails_befo | |
.RunWith(this.SinkProbe<int>(), Materializer) | ||
.Request(10) | ||
.ExpectError().Should().Be(error); | ||
return Task.CompletedTask; | ||
}, Materializer); | ||
} | ||
|
||
[Fact] | ||
public void A_GraphStageLogic_must_read_N_should_provide_elements_read_if_OnComplete_happens_before_N_elements_have_been_seen() | ||
public async Task A_GraphStageLogic_must_read_N_should_provide_elements_read_if_OnComplete_happens_before_N_elements_have_been_seen() | ||
{ | ||
this.AssertAllStagesStopped(() => | ||
{ | ||
Source.From(Enumerable.Range(1, 5)) | ||
.Via(new ReadNEmitRestOnComplete(6)) | ||
.RunWith(this.SinkProbe<int>(), Materializer) | ||
.Request(10) | ||
.ExpectNext( 1, 2, 3, 4, 5) | ||
.ExpectComplete(); | ||
await this.AssertAllStagesStoppedAsync(async() => { | ||
await Source.From(Enumerable.Range(1, 5)) | ||
.Via(new ReadNEmitRestOnComplete(6)) | ||
.RunWith(this.SinkProbe<int>(), Materializer) | ||
.Request(10) | ||
.ExpectNext(1, 2, 3, 4, 5) | ||
.ExpectCompleteAsync(); | ||
}, Materializer); | ||
} | ||
|
||
[Fact] | ||
public void A_GraphStageLogic_must_emit_all_things_before_completing() | ||
public async Task A_GraphStageLogic_must_emit_all_things_before_completing() | ||
{ | ||
this.AssertAllStagesStopped(() => | ||
{ | ||
Source.Empty<int>() | ||
.Via(new Emit1234().Named("testStage")) | ||
.RunWith(this.SinkProbe<int>(), Materializer) | ||
.Request(5) | ||
.ExpectNext(1) | ||
//emitting with callback gives nondeterminism whether 2 or 3 will be pushed first | ||
.ExpectNextUnordered(2, 3) | ||
.ExpectNext(4) | ||
.ExpectComplete(); | ||
await this.AssertAllStagesStoppedAsync(async() => { | ||
await Source.Empty<int>() | ||
.Via(new Emit1234().Named("testStage")) | ||
.RunWith(this.SinkProbe<int>(), Materializer) | ||
.Request(5) | ||
.ExpectNext(1) | ||
//emitting with callback gives nondeterminism whether 2 or 3 will be pushed first | ||
.ExpectNextUnordered(2, 3) | ||
.ExpectNext(4) | ||
.ExpectCompleteAsync(); | ||
}, Materializer); | ||
} | ||
|
||
[Fact] | ||
public void A_GraphStageLogic_must_emit_all_things_before_completing_with_two_fused_stages() | ||
public async Task A_GraphStageLogic_must_emit_all_things_before_completing_with_two_fused_stages() | ||
{ | ||
this.AssertAllStagesStopped(() => | ||
{ | ||
await this.AssertAllStagesStoppedAsync(async() => { | ||
var flow = Flow.Create<int>().Via(new Emit1234()).Via(new Emit5678()); | ||
var g = Streams.Implementation.Fusing.Fusing.Aggressive(flow); | ||
|
||
Source.Empty<int>() | ||
await Source.Empty<int>() | ||
.Via(g) | ||
.RunWith(this.SinkProbe<int>(), Materializer) | ||
.Request(9) | ||
|
@@ -370,19 +366,18 @@ public void A_GraphStageLogic_must_emit_all_things_before_completing_with_two_fu | |
//emitting with callback gives nondeterminism whether 6 or 7 will be pushed first | ||
.ExpectNextUnordered(6, 7) | ||
.ExpectNext(8) | ||
.ExpectComplete(); | ||
.ExpectCompleteAsync(); | ||
}, Materializer); | ||
} | ||
|
||
[Fact] | ||
public void A_GraphStageLogic_must_emit_all_things_before_completing_with_three_fused_stages() | ||
public async Task A_GraphStageLogic_must_emit_all_things_before_completing_with_three_fused_stages() | ||
{ | ||
this.AssertAllStagesStopped(() => | ||
{ | ||
await this.AssertAllStagesStoppedAsync(async() => { | ||
var flow = Flow.Create<int>().Via(new Emit1234()).Via(new PassThrough()).Via(new Emit5678()); | ||
var g = Streams.Implementation.Fusing.Fusing.Aggressive(flow); | ||
|
||
Source.Empty<int>() | ||
await Source.Empty<int>() | ||
.Via(g) | ||
.RunWith(this.SinkProbe<int>(), Materializer) | ||
.Request(9) | ||
|
@@ -394,20 +389,20 @@ public void A_GraphStageLogic_must_emit_all_things_before_completing_with_three_ | |
//emitting with callback gives nondeterminism whether 6 or 7 will be pushed first | ||
.ExpectNextUnordered(6, 7) | ||
.ExpectNext(8) | ||
.ExpectComplete(); | ||
.ExpectCompleteAsync(); | ||
}, Materializer); | ||
} | ||
|
||
[Fact] | ||
public void A_GraphStageLogic_must_emit_properly_after_empty_iterable() | ||
public async Task A_GraphStageLogic_must_emit_properly_after_empty_iterable() | ||
{ | ||
this.AssertAllStagesStopped(() => | ||
{ | ||
Source.FromGraph(new EmitEmptyIterable()) | ||
.RunWith(Sink.Seq<int>(), Materializer) | ||
.Result.Should() | ||
.HaveCount(1) | ||
.And.OnlyContain(x => x == 42); | ||
await this.AssertAllStagesStoppedAsync(() => { | ||
Source.FromGraph(new EmitEmptyIterable()) | ||
.RunWith(Sink.Seq<int>(), Materializer) | ||
.Result.Should() | ||
.HaveCount(1) | ||
.And.OnlyContain(x => x == 42); | ||
return Task.CompletedTask; | ||
}, Materializer); | ||
} | ||
|
||
|
@@ -425,17 +420,15 @@ public void A_GraphStageLogic_must_support_logging_in_custom_graphstage() | |
} | ||
|
||
[Fact] | ||
public void A_GraphStageLogic_must_invoke_livecycle_hooks_in_the_right_order() | ||
public async Task A_GraphStageLogic_must_invoke_livecycle_hooks_in_the_right_order() | ||
{ | ||
this.AssertAllStagesStopped(() => | ||
{ | ||
await this.AssertAllStagesStoppedAsync(async() => { | ||
var g = new LifecycleStage(TestActor); | ||
|
||
Source.Single(1).Via(g).RunWith(Sink.Ignore<int>(), Materializer); | ||
ExpectMsg("preStart"); | ||
ExpectMsg("pulled"); | ||
ExpectMsg("postStop"); | ||
|
||
await Source.Single(1).Via(g).RunWith(Sink.Ignore<int>(), Materializer); | ||
await ExpectMsgAsync("preStart"); | ||
await ExpectMsgAsync("pulled"); | ||
await ExpectMsgAsync("postStop"); | ||
}, Materializer); | ||
} | ||
|
||
|
@@ -492,14 +485,14 @@ public void A_GraphStageLogic_must_not_double_terminate_a_single_stage() | |
{ | ||
WithBaseBuilderSetup( | ||
new GraphStage<FlowShape<int, int>>[] {new DoubleTerminateStage(TestActor), new PassThrough()}, | ||
interpreter => | ||
async interpreter => | ||
{ | ||
interpreter.Complete(interpreter.Connections[0]); | ||
interpreter.Cancel(interpreter.Connections[1], SubscriptionWithCancelException.NoMoreElementsNeeded.Instance); | ||
interpreter.Execute(2); | ||
|
||
ExpectMsg("postStop2"); | ||
ExpectNoMsg(0); | ||
await ExpectMsgAsync("postStop2"); | ||
await ExpectNoMsgAsync(0); | ||
|
||
interpreter.IsCompleted.Should().BeFalse(); | ||
interpreter.IsSuspended.Should().BeFalse(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aaronontheweb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you have a bunch of PRs open where you'll need to do this @eaba