Skip to content

[47-74] GraphUnzipSpec #6594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 55 additions & 58 deletions src/core/Akka.Streams.Tests/Dsl/GraphUnzipSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
Expand All @@ -28,10 +29,9 @@ public GraphUnzipSpec(ITestOutputHelper helper) : base (helper)
}

[Fact]
public void A_Unzip_must_unzip_to_two_subscribers()
public async Task A_Unzip_must_unzip_to_two_subscribers()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<string>();

Expand All @@ -45,10 +45,10 @@ public void A_Unzip_must_unzip_to_two_subscribers()
new KeyValuePair<int, string>(2, "b"),
new KeyValuePair<int, string>(3, "c")
});

b.From(source).To(unzip.In);
b.From(unzip.Out0)
.Via(Flow.Create<int>().Buffer(16, OverflowStrategy.Backpressure).Select(x => x*2))
.Via(Flow.Create<int>().Buffer(16, OverflowStrategy.Backpressure).Select(x => x * 2))
.To(Sink.FromSubscriber(c1));
b.From(unzip.Out1)
.Via(Flow.Create<string>().Buffer(16, OverflowStrategy.Backpressure))
Expand All @@ -57,29 +57,28 @@ public void A_Unzip_must_unzip_to_two_subscribers()
return ClosedShape.Instance;
})).Run(Materializer);

var sub1 = c1.ExpectSubscription();
var sub2 = c2.ExpectSubscription();
var sub1 = await c1.ExpectSubscriptionAsync();
var sub2 = await c2.ExpectSubscriptionAsync();

sub1.Request(1);
sub2.Request(2);
c1.ExpectNext(1*2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be converted to async @eaba - looks like this entire test fixture does.

c1.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
c2.ExpectNext( "a", "b");
c2.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
await c1.ExpectNextAsync(1 * 2);
await c1.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
c2.ExpectNext("a", "b");
await c2.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
sub1.Request(3);
c1.ExpectNext( 2*2, 3*2);
c1.ExpectComplete();
c1.ExpectNext(2 * 2, 3 * 2);
await c1.ExpectCompleteAsync();
sub2.Request(3);
c2.ExpectNext("c");
c2.ExpectComplete();
await c2.ExpectNextAsync("c");
await c2.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Unzip_must_produce_to_right_downstream_even_though_left_downstream_cancels()
public async Task A_Unzip_must_produce_to_right_downstream_even_though_left_downstream_cancels()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<string>();

Expand All @@ -101,21 +100,20 @@ public void A_Unzip_must_produce_to_right_downstream_even_though_left_downstream
return ClosedShape.Instance;
})).Run(Materializer);

var sub1 = c1.ExpectSubscription();
var sub2 = c2.ExpectSubscription();
var sub1 = await c1.ExpectSubscriptionAsync();
var sub2 = await c2.ExpectSubscriptionAsync();

sub1.Cancel();
sub2.Request(3);
c2.ExpectNext( "a", "b", "c");
c2.ExpectComplete();
c2.ExpectNext("a", "b", "c");
await c2.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Unzip_must_produce_to_left_downstream_even_though_right_downstream_cancels()
public async Task A_Unzip_must_produce_to_left_downstream_even_though_right_downstream_cancels()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<string>();

Expand All @@ -137,19 +135,20 @@ public void A_Unzip_must_produce_to_left_downstream_even_though_right_downstream
return ClosedShape.Instance;
})).Run(Materializer);

var sub1 = c1.ExpectSubscription();
var sub2 = c2.ExpectSubscription();
var sub1 = await c1.ExpectSubscriptionAsync();
var sub2 = await c2.ExpectSubscriptionAsync();

sub2.Cancel();
sub1.Request(3);
c1.ExpectNext( 1, 2, 3);
c1.ExpectComplete();
c1.ExpectNext(1, 2, 3);
await c1.ExpectCompleteAsync();

}, Materializer);
}


[Fact]
public void A_Unzip_must_not_push_twice_when_pull_is_followed_by_cancel_before_element_has_been_pushed()
public async Task A_Unzip_must_not_push_twice_when_pull_is_followed_by_cancel_before_element_has_been_pushed()
{
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<string>();
Expand All @@ -170,17 +169,17 @@ public void A_Unzip_must_not_push_twice_when_pull_is_followed_by_cancel_before_e
return ClosedShape.Instance;
})).Run(Materializer);

var sub1 = c1.ExpectSubscription();
var sub2 = c2.ExpectSubscription();
var sub1 = await c1.ExpectSubscriptionAsync();
var sub2 = await c2.ExpectSubscriptionAsync();
sub2.Request(3);
sub1.Request(3);
sub2.Cancel();
c1.ExpectNext( 1,2,3);
c1.ExpectComplete();
await c1.ExpectCompleteAsync();
}

[Fact]
public void A_Unzip_must_not_loose_elements_when_pull_is_followed_by_cancel_before_other_sink_has_requested()
public async Task A_Unzip_must_not_loose_elements_when_pull_is_followed_by_cancel_before_other_sink_has_requested()
{
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<string>();
Expand All @@ -201,20 +200,19 @@ public void A_Unzip_must_not_loose_elements_when_pull_is_followed_by_cancel_befo
return ClosedShape.Instance;
})).Run(Materializer);

var sub1 = c1.ExpectSubscription();
var sub2 = c2.ExpectSubscription();
var sub1 = await c1.ExpectSubscriptionAsync();
var sub2 = await c2.ExpectSubscriptionAsync();
sub2.Request(3);
sub2.Cancel();
sub1.Request(3);
c1.ExpectNext( 1, 2, 3);
c1.ExpectComplete();
c1.ExpectNext(1, 2, 3);
await c1.ExpectCompleteAsync();
}

[Fact]
public void A_Unzip_must_cancel_upstream_when_downstream_cancel()
public async Task A_Unzip_must_cancel_upstream_when_downstream_cancel()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async () => {
var p1 = this.CreateManualPublisherProbe<KeyValuePair<int, string>>();
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<string>();
Expand All @@ -231,30 +229,29 @@ public void A_Unzip_must_cancel_upstream_when_downstream_cancel()
return ClosedShape.Instance;
})).Run(Materializer);

var p1Sub = p1.ExpectSubscription();
var sub1 = c1.ExpectSubscription();
var sub2 = c2.ExpectSubscription();
var p1Sub = await p1.ExpectSubscriptionAsync();
var sub1 = await c1.ExpectSubscriptionAsync();
var sub2 = await c2.ExpectSubscriptionAsync();

sub1.Request(3);
sub2.Request(3);
p1.ExpectRequest(p1Sub, 16);
await p1.ExpectRequestAsync(p1Sub, 16);
p1Sub.SendNext(new KeyValuePair<int, string>(1, "a"));
c1.ExpectNext(1);
c2.ExpectNext("a");
await c1.ExpectNextAsync(1);
await c2.ExpectNextAsync("a");
p1Sub.SendNext(new KeyValuePair<int, string>(2, "b"));
c1.ExpectNext(2);
c2.ExpectNext("b");
await c1.ExpectNextAsync(2);
await c2.ExpectNextAsync("b");
sub1.Cancel();
sub2.Cancel();
p1Sub.ExpectCancellation();
await p1Sub.ExpectCancellationAsync();
}, Materializer);
}

[Fact]
public void A_Unzip_must_work_with_Zip()
public async Task A_Unzip_must_work_with_Zip()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateManualSubscriberProbe<(int, string)>();

RunnableGraph.FromGraph(GraphDsl.Create(b =>
Expand All @@ -277,12 +274,12 @@ public void A_Unzip_must_work_with_Zip()
return ClosedShape.Instance;
})).Run(Materializer);

var sub1 = c1.ExpectSubscription();
var sub1 = await c1.ExpectSubscriptionAsync();
sub1.Request(5);
c1.ExpectNext((1, "a"));
c1.ExpectNext((2, "b"));
c1.ExpectNext((3, "c"));
c1.ExpectComplete();
await c1.ExpectNextAsync((1, "a"));
await c1.ExpectNextAsync((2, "b"));
await c1.ExpectNextAsync((3, "c"));
await c1.ExpectCompleteAsync();
}, Materializer);
}
}
Expand Down