Skip to content

[45-74] GraphPartitionSpec #6592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 58 additions & 64 deletions src/core/Akka.Streams.Tests/Dsl/GraphPartitionSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ public GraphPartitionSpec(ITestOutputHelper helper) : base(helper)
}

[Fact]
public void A_Partition_must_partition_to_three_subscribers()
public async Task A_Partition_must_partition_to_three_subscribers()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var s = Sink.Seq<int>();
var t = RunnableGraph.FromGraph(GraphDsl.Create(s, s, s, ValueTuple.Create, (b, sink1, sink2, sink3) =>
{
Expand All @@ -54,24 +53,24 @@ public void A_Partition_must_partition_to_three_subscribers()

var task = Task.WhenAll(t.Item1, t.Item2, t.Item3);
task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
task.Result[0].Should().BeEquivalentTo(new[] {4, 5});
task.Result[1].Should().BeEquivalentTo(new[] {1, 2});
task.Result[2].Should().BeEquivalentTo(new[] {3});
task.Result[0].Should().BeEquivalentTo(new[] { 4, 5 });
task.Result[1].Should().BeEquivalentTo(new[] { 1, 2 });
task.Result[2].Should().BeEquivalentTo(new[] { 3 });
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Partition_must_complete_stage_after_upstream_completes()
public async Task A_Partition_must_complete_stage_after_upstream_completes()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateSubscriberProbe<string>();
var c2 = this.CreateSubscriberProbe<string>();

RunnableGraph.FromGraph(GraphDsl.Create(b =>
{
var partition = b.Add(new Partition<string>(2, s => s.Length > 4 ? 0 : 1));
var source = Source.From(new[] {"this", "is", "just", "another", "test"});
var source = Source.From(new[] { "this", "is", "just", "another", "test" });

b.From(source).To(partition.In);
b.From(partition.Out(0)).To(Sink.FromSubscriber(c1));
Expand All @@ -80,27 +79,26 @@ public void A_Partition_must_complete_stage_after_upstream_completes()
return ClosedShape.Instance;
})).Run(Materializer);

c1.Request(1);
c2.Request(4);
c1.ExpectNext("another");
c2.ExpectNext( "this", "is", "just", "test");
c1.ExpectComplete();
c2.ExpectComplete();
await c1.RequestAsync(1);
await c2.RequestAsync(4);
await c1.ExpectNextAsync("another");
c2.ExpectNext("this", "is", "just", "test");
await c1.ExpectCompleteAsync();
await c2.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Partition_must_remember_first_pull_even_thought_first_element_target_another_out()
public async Task A_Partition_must_remember_first_pull_even_thought_first_element_target_another_out()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateSubscriberProbe<int>();
var c2 = this.CreateSubscriberProbe<int>();

RunnableGraph.FromGraph(GraphDsl.Create(b =>
{
var partition = b.Add(new Partition<int>(2, i => i < 6 ? 0 : 1));
var source = Source.From(new [] {6,3});
var source = Source.From(new[] { 6, 3 });

b.From(source).To(partition.In);
b.From(partition.Out(0)).To(Sink.FromSubscriber(c1));
Expand All @@ -109,21 +107,20 @@ public void A_Partition_must_remember_first_pull_even_thought_first_element_targ
return ClosedShape.Instance;
})).Run(Materializer);

c1.Request(1);
c1.ExpectNoMsg(TimeSpan.FromSeconds(1));
c2.Request(1);
c2.ExpectNext(6);
c1.ExpectNext(3);
c1.ExpectComplete();
c2.ExpectComplete();
await c1.RequestAsync(1);
await c1.ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
await c2.RequestAsync(1);
await c2.ExpectNextAsync(6);
await c1.ExpectNextAsync(3);
await c1.ExpectCompleteAsync();
await c2.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Partition_must_cancel_upstream_when_downstreams_cancel()
public async Task A_Partition_must_cancel_upstream_when_downstreams_cancel()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var p1 = this.CreatePublisherProbe<int>();
var c1 = this.CreateSubscriberProbe<int>();
var c2 = this.CreateSubscriberProbe<int>();
Expand All @@ -144,30 +141,29 @@ public void A_Partition_must_cancel_upstream_when_downstreams_cancel()
return ClosedShape.Instance;
})).Run(Materializer);

var p1Sub = p1.ExpectSubscription();
var sub1 = c1.ExpectSubscription();
var sub2 = c2.ExpectSubscription();
var p1Sub = await p1.ExpectSubscriptionAsync();
var sub1 = await c1.ExpectSubscriptionAsync();
var sub2 = await c2.ExpectSubscriptionAsync();
sub1.Request(3);
sub2.Request(3);
p1Sub.SendNext(1);
p1Sub.SendNext(8);
c1.ExpectNext(1);
c2.ExpectNext(8);
await c1.ExpectNextAsync(1);
await c2.ExpectNextAsync(8);
p1Sub.SendNext(2);
c1.ExpectNext(2);
await c1.ExpectNextAsync(2);
sub1.Cancel();
sub2.Cancel();
p1Sub.ExpectCancellation();
await p1Sub.ExpectCancellationAsync();
}, Materializer);
}

[Fact]
public void A_Partition_must_work_with_merge()
public async Task A_Partition_must_work_with_merge()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var s = Sink.Seq<int>();
var input = new[] {5, 2, 9, 1, 1, 1, 10};
var input = new[] { 5, 2, 9, 1, 1, 1, 10 };

var task = RunnableGraph.FromGraph(GraphDsl.Create(s, (b, sink) =>
{
Expand All @@ -185,14 +181,14 @@ public void A_Partition_must_work_with_merge()

task.Wait(RemainingOrDefault).Should().BeTrue();
task.Result.Should().BeEquivalentTo(input);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Partition_must_stage_completion_is_waiting_for_pending_output()
public async Task A_Partition_must_stage_completion_is_waiting_for_pending_output()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateSubscriberProbe<int>();
var c2 = this.CreateSubscriberProbe<int>();

Expand All @@ -208,20 +204,19 @@ public void A_Partition_must_stage_completion_is_waiting_for_pending_output()
return ClosedShape.Instance;
})).Run(Materializer);

c1.Request(1);
c1.ExpectNoMsg(TimeSpan.FromSeconds(1));
c2.Request(1);
c2.ExpectNext(6);
c1.ExpectComplete();
c2.ExpectComplete();
await c1.RequestAsync(1);
await c1.ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
await c2.RequestAsync(1);
await c2.ExpectNextAsync(6);
await c1.ExpectCompleteAsync();
await c2.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Partition_must_fail_stage_if_partitioner_outcome_is_out_of_bound()
public async Task A_Partition_must_fail_stage_if_partitioner_outcome_is_out_of_bound()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateSubscriberProbe<int>();

RunnableGraph.FromGraph(GraphDsl.Create(b =>
Expand All @@ -237,7 +232,7 @@ public void A_Partition_must_fail_stage_if_partitioner_outcome_is_out_of_bound()
})).Run(Materializer);


c1.Request(1);
await c1.RequestAsync(1);
var error = c1.ExpectError();
error.Should().BeOfType<PartitionOutOfBoundsException>();
error.Message.Should()
Expand All @@ -247,10 +242,9 @@ public void A_Partition_must_fail_stage_if_partitioner_outcome_is_out_of_bound()
}

[Fact]
public void A_Partition_divertTo_must_send_matching_elements_to_the_sink()
public async Task A_Partition_divertTo_must_send_matching_elements_to_the_sink()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var odd = this.CreateSubscriberProbe<int>();
var even = this.CreateSubscriberProbe<int>();

Expand All @@ -259,13 +253,13 @@ public void A_Partition_divertTo_must_send_matching_elements_to_the_sink()
.To(Sink.FromSubscriber(even))
.Run(Materializer);

even.Request(1);
even.ExpectNoMsg(TimeSpan.FromSeconds(1));
odd.Request(1);
odd.ExpectNext(1);
even.ExpectNext(2);
odd.ExpectComplete();
even.ExpectComplete();
await even.RequestAsync(1);
await even.ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
await odd.RequestAsync(1);
await odd.ExpectNextAsync(1);
await even.ExpectNextAsync(2);
await odd.ExpectCompleteAsync();
await even.ExpectCompleteAsync();
}, Materializer);
}
}
Expand Down