Skip to content

[70-74] GraphStageLogicSpec #6618

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 28, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 63 additions & 70 deletions src/core/Akka.Streams.Tests/Implementation/GraphStageLogicSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Event;
Expand Down Expand Up @@ -273,37 +274,34 @@ public GraphStageLogicSpec(ITestOutputHelper output) : base(output, Config)
}

[Fact]
public void A_GraphStageLogic_must_read_N_and_emit_N_before_completing()
public async Task A_GraphStageLogic_must_read_N_and_emit_N_before_completing()
{
this.AssertAllStagesStopped(() =>
{
Source.From(Enumerable.Range(1, 10))
.Via(new ReadNEmitN(2))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(10)
.ExpectNext( 1, 2)
.ExpectComplete();
await this.AssertAllStagesStoppedAsync(async() => {
await Source.From(Enumerable.Range(1, 10))
.Via(new ReadNEmitN(2))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(10)
.ExpectNext(1, 2)
.ExpectCompleteAsync();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have a bunch of PRs open where you'll need to do this @eaba

}, Materializer);
}

[Fact]
public void A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_completes_before_N_is_sent()
public async Task A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_completes_before_N_is_sent()
{
this.AssertAllStagesStopped(() =>
{
Source.From(Enumerable.Range(1, 5))
.Via(new ReadNEmitN(6))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(10)
.ExpectComplete();
await this.AssertAllStagesStoppedAsync(async() => {
await Source.From(Enumerable.Range(1, 5))
.Via(new ReadNEmitN(6))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(10)
.ExpectCompleteAsync();
Copy link
Contributor Author

@eaba eaba Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, but you have a lot of code in other PRs today that looks like:

...                        
  .ExpectComplete();
  return Task.CompletedTask;

You should change that to

.ExpectCompleteAsync();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I will do it - bit by bit!

}, Materializer);
}

[Fact]
public void A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_fails_before_N_is_sent()
public async Task A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_fails_before_N_is_sent()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var error = new ArgumentException("Don't argue like that!");
Source.From(Enumerable.Range(1, 5))
.Select(x =>
Expand All @@ -316,49 +314,47 @@ public void A_GraphStageLogic_must_read_N_should_not_emit_if_upstream_fails_befo
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(10)
.ExpectError().Should().Be(error);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_GraphStageLogic_must_read_N_should_provide_elements_read_if_OnComplete_happens_before_N_elements_have_been_seen()
public async Task A_GraphStageLogic_must_read_N_should_provide_elements_read_if_OnComplete_happens_before_N_elements_have_been_seen()
{
this.AssertAllStagesStopped(() =>
{
Source.From(Enumerable.Range(1, 5))
.Via(new ReadNEmitRestOnComplete(6))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(10)
.ExpectNext( 1, 2, 3, 4, 5)
.ExpectComplete();
await this.AssertAllStagesStoppedAsync(async() => {
await Source.From(Enumerable.Range(1, 5))
.Via(new ReadNEmitRestOnComplete(6))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(10)
.ExpectNext(1, 2, 3, 4, 5)
.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_GraphStageLogic_must_emit_all_things_before_completing()
public async Task A_GraphStageLogic_must_emit_all_things_before_completing()
{
this.AssertAllStagesStopped(() =>
{
Source.Empty<int>()
.Via(new Emit1234().Named("testStage"))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(5)
.ExpectNext(1)
//emitting with callback gives nondeterminism whether 2 or 3 will be pushed first
.ExpectNextUnordered(2, 3)
.ExpectNext(4)
.ExpectComplete();
await this.AssertAllStagesStoppedAsync(async() => {
await Source.Empty<int>()
.Via(new Emit1234().Named("testStage"))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(5)
.ExpectNext(1)
//emitting with callback gives nondeterminism whether 2 or 3 will be pushed first
.ExpectNextUnordered(2, 3)
.ExpectNext(4)
.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_GraphStageLogic_must_emit_all_things_before_completing_with_two_fused_stages()
public async Task A_GraphStageLogic_must_emit_all_things_before_completing_with_two_fused_stages()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var flow = Flow.Create<int>().Via(new Emit1234()).Via(new Emit5678());
var g = Streams.Implementation.Fusing.Fusing.Aggressive(flow);

Source.Empty<int>()
await Source.Empty<int>()
.Via(g)
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(9)
Expand All @@ -370,19 +366,18 @@ public void A_GraphStageLogic_must_emit_all_things_before_completing_with_two_fu
//emitting with callback gives nondeterminism whether 6 or 7 will be pushed first
.ExpectNextUnordered(6, 7)
.ExpectNext(8)
.ExpectComplete();
.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_GraphStageLogic_must_emit_all_things_before_completing_with_three_fused_stages()
public async Task A_GraphStageLogic_must_emit_all_things_before_completing_with_three_fused_stages()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var flow = Flow.Create<int>().Via(new Emit1234()).Via(new PassThrough()).Via(new Emit5678());
var g = Streams.Implementation.Fusing.Fusing.Aggressive(flow);

Source.Empty<int>()
await Source.Empty<int>()
.Via(g)
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(9)
Expand All @@ -394,20 +389,20 @@ public void A_GraphStageLogic_must_emit_all_things_before_completing_with_three_
//emitting with callback gives nondeterminism whether 6 or 7 will be pushed first
.ExpectNextUnordered(6, 7)
.ExpectNext(8)
.ExpectComplete();
.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_GraphStageLogic_must_emit_properly_after_empty_iterable()
public async Task A_GraphStageLogic_must_emit_properly_after_empty_iterable()
{
this.AssertAllStagesStopped(() =>
{
Source.FromGraph(new EmitEmptyIterable())
.RunWith(Sink.Seq<int>(), Materializer)
.Result.Should()
.HaveCount(1)
.And.OnlyContain(x => x == 42);
await this.AssertAllStagesStoppedAsync(() => {
Source.FromGraph(new EmitEmptyIterable())
.RunWith(Sink.Seq<int>(), Materializer)
.Result.Should()
.HaveCount(1)
.And.OnlyContain(x => x == 42);
return Task.CompletedTask;
}, Materializer);
}

Expand All @@ -425,17 +420,15 @@ public void A_GraphStageLogic_must_support_logging_in_custom_graphstage()
}

[Fact]
public void A_GraphStageLogic_must_invoke_livecycle_hooks_in_the_right_order()
public async Task A_GraphStageLogic_must_invoke_livecycle_hooks_in_the_right_order()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var g = new LifecycleStage(TestActor);

Source.Single(1).Via(g).RunWith(Sink.Ignore<int>(), Materializer);
ExpectMsg("preStart");
ExpectMsg("pulled");
ExpectMsg("postStop");

await Source.Single(1).Via(g).RunWith(Sink.Ignore<int>(), Materializer);
await ExpectMsgAsync("preStart");
await ExpectMsgAsync("pulled");
await ExpectMsgAsync("postStop");
}, Materializer);
}

Expand Down Expand Up @@ -492,14 +485,14 @@ public void A_GraphStageLogic_must_not_double_terminate_a_single_stage()
{
WithBaseBuilderSetup(
new GraphStage<FlowShape<int, int>>[] {new DoubleTerminateStage(TestActor), new PassThrough()},
interpreter =>
async interpreter =>
{
interpreter.Complete(interpreter.Connections[0]);
interpreter.Cancel(interpreter.Connections[1], SubscriptionWithCancelException.NoMoreElementsNeeded.Instance);
interpreter.Execute(2);

ExpectMsg("postStop2");
ExpectNoMsg(0);
await ExpectMsgAsync("postStop2");
await ExpectNoMsgAsync(0);

interpreter.IsCompleted.Should().BeFalse();
interpreter.IsSuspended.Should().BeFalse();
Expand Down