Skip to content

[42-74] GraphBroadcastSpec #6589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 72 additions & 80 deletions src/core/Akka.Streams.Tests/Dsl/GraphBroadcastSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ public GraphBroadcastSpec(ITestOutputHelper helper) : base(helper)
}

[Fact]
public void A_Broadcast_must_broadcast_to_other_subscriber()
public async Task A_Broadcast_must_broadcast_to_other_subscriber()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<int>();
RunnableGraph.FromGraph(GraphDsl.Create (b =>
RunnableGraph.FromGraph(GraphDsl.Create(b =>
{
var broadcast = b.Add(new Broadcast<int>(2));
var source = Source.From(Enumerable.Range(1, 3));
Expand All @@ -51,50 +50,49 @@ public void A_Broadcast_must_broadcast_to_other_subscriber()
return ClosedShape.Instance;
})).Run(Materializer);

var sub1 = c1.ExpectSubscription();
var sub2 = c2.ExpectSubscription();
var sub1 = await c1.ExpectSubscriptionAsync();
var sub2 = await c2.ExpectSubscriptionAsync();

sub1.Request(1);
sub2.Request(2);

c1.ExpectNext(1).ExpectNoMsg(TimeSpan.FromMilliseconds(100));
c2.ExpectNext( 1, 2).ExpectNoMsg(TimeSpan.FromMilliseconds(100));
await c1.ExpectNext(1).ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
await c2.ExpectNext(1, 2).ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
sub1.Request(3);
c1.ExpectNext( 2, 3).ExpectComplete();
await c1.ExpectNext(2, 3).ExpectCompleteAsync();
sub2.Request(3);
c2.ExpectNext(3).ExpectComplete();
await c2.ExpectNext(3).ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Broadcast_must_work_with_one_way_broadcast()
public async Task A_Broadcast_must_work_with_one_way_broadcast()
{
this.AssertAllStagesStopped(() =>
{
var t = Source.FromGraph(GraphDsl.Create(b =>
{
var broadcast = b.Add(new Broadcast<int>(1));
await this.AssertAllStagesStoppedAsync(() => {
var t = Source.FromGraph(GraphDsl.Create(b =>
{
var broadcast = b.Add(new Broadcast<int>(1));
var source = b.Add(Source.From(Enumerable.Range(1, 3)));

b.From(source).To(broadcast.In);

return new SourceShape<int>(broadcast.Out(0));
})).RunAggregate(new List<int>(), (list, i) =>
{
list.Add(i);
return list;
return new SourceShape<int>(broadcast.Out(0));
})).RunAggregate(new List<int>(), (list, i) =>
{
list.Add(i);
return list;
}, Materializer);

t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
t.Result.Should().BeEquivalentTo(new[] {1, 2, 3});
t.Result.Should().BeEquivalentTo(new[] { 1, 2, 3 });
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Broadcast_must_work_with_n_way_broadcast()
public async Task A_Broadcast_must_work_with_n_way_broadcast()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var headSink = Sink.First<IEnumerable<int>>();

var t = RunnableGraph.FromGraph(GraphDsl.Create(headSink, headSink, headSink, headSink, headSink, ValueTuple.Create,
Expand All @@ -116,14 +114,14 @@ public void A_Broadcast_must_work_with_n_way_broadcast()
task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
foreach (var list in task.Result)
list.Should().BeEquivalentTo(new[] { 1, 2, 3 });
return Task.CompletedTask;
}, Materializer);
}

[Fact(Skip="We don't have enough overloads for GraphDsl.Create")]
public void A_Broadcast_must_with_22_way_broadcast()
public async Task A_Broadcast_must_with_22_way_broadcast()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
//var headSink = Sink.First<IEnumerable<int>>();

//var t = RunnableGraph.FromGraph(GraphDsl.Create(headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, ValueTuple.Create,
Expand Down Expand Up @@ -162,15 +160,14 @@ public void A_Broadcast_must_with_22_way_broadcast()
//task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
//foreach (var list in task.Result)
// list.Should().BeEquivalentTo(new[] { 1, 2, 3 });

return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Broadcast_must_produce_to_other_even_though_downstream_cancels()
public async Task A_Broadcast_must_produce_to_other_even_though_downstream_cancels()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<int>();
RunnableGraph.FromGraph(GraphDsl.Create(b =>
Expand All @@ -187,20 +184,19 @@ public void A_Broadcast_must_produce_to_other_even_though_downstream_cancels()
return ClosedShape.Instance;
})).Run(Materializer);

var sub1 = c1.ExpectSubscription();
var sub1 = await c1.ExpectSubscriptionAsync();
sub1.Cancel();
var sub2 = c2.ExpectSubscription();
var sub2 = await c2.ExpectSubscriptionAsync();
sub2.Request(3);
c2.ExpectNext( 1, 2, 3);
c2.ExpectComplete();
c2.ExpectNext(1, 2, 3);
await c2.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Broadcast_must_produce_to_downstream_even_though_other_cancels()
public async Task A_Broadcast_must_produce_to_downstream_even_though_other_cancels()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<int>();
RunnableGraph.FromGraph(GraphDsl.Create(b =>
Expand All @@ -217,20 +213,19 @@ public void A_Broadcast_must_produce_to_downstream_even_though_other_cancels()
return ClosedShape.Instance;
})).Run(Materializer);

var sub1 = c1.ExpectSubscription();
var sub2 = c2.ExpectSubscription();
var sub1 = await c1.ExpectSubscriptionAsync();
var sub2 = await c2.ExpectSubscriptionAsync();
sub2.Cancel();
sub1.Request(3);
c1.ExpectNext( 1, 2, 3);
c1.ExpectComplete();
c1.ExpectNext(1, 2, 3);
await c1.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Broadcast_must_cancel_upstream_when_downstreams_cancel()
public async Task A_Broadcast_must_cancel_upstream_when_downstreams_cancel()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var p1 = this.CreateManualPublisherProbe<int>();
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<int>();
Expand All @@ -248,30 +243,29 @@ public void A_Broadcast_must_cancel_upstream_when_downstreams_cancel()
return ClosedShape.Instance;
})).Run(Materializer);

var bSub = p1.ExpectSubscription();
var sub1 = c1.ExpectSubscription();
var sub2 = c2.ExpectSubscription();
var bSub = await p1.ExpectSubscriptionAsync();
var sub1 = await c1.ExpectSubscriptionAsync();
var sub2 = await c2.ExpectSubscriptionAsync();

sub1.Request(3);
sub2.Request(3);
p1.ExpectRequest(bSub, 16);
await p1.ExpectRequestAsync(bSub, 16);
bSub.SendNext(1);
c1.ExpectNext(1);
c2.ExpectNext(1);
await c1.ExpectNextAsync(1);
await c2.ExpectNextAsync(1);
bSub.SendNext(2);
c1.ExpectNext(2);
c2.ExpectNext(2);
await c1.ExpectNextAsync(2);
await c2.ExpectNextAsync(2);
sub1.Cancel();
sub2.Cancel();
bSub.ExpectCancellation();
await bSub.ExpectCancellationAsync();
}, Materializer);
}

[Fact]
public void A_Broadcast_must_pass_along_early_cancellation()
public async Task A_Broadcast_must_pass_along_early_cancellation()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c1 = this.CreateManualSubscriberProbe<int>();
var c2 = this.CreateManualSubscriberProbe<int>();

Expand All @@ -287,22 +281,21 @@ public void A_Broadcast_must_pass_along_early_cancellation()

var up = this.CreateManualPublisherProbe<int>();

var downSub1 = c1.ExpectSubscription();
var downSub2 = c2.ExpectSubscription();
var downSub1 = await c1.ExpectSubscriptionAsync();
var downSub2 = await c2.ExpectSubscriptionAsync();
downSub1.Cancel();
downSub2.Cancel();

up.Subscribe(s);
var upSub = up.ExpectSubscription();
upSub.ExpectCancellation();
var upSub = await up.ExpectSubscriptionAsync();
await upSub.ExpectCancellationAsync();
}, Materializer);
}

[Fact]
public void A_Broadcast_must_AltoTo_must_broadcast()
public async Task A_Broadcast_must_AltoTo_must_broadcast()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var p = this.SinkProbe<int>();
var p2 = this.SinkProbe<int>();

Expand All @@ -315,20 +308,19 @@ public void A_Broadcast_must_AltoTo_must_broadcast()
var ps1 = t.Item1;
var ps2 = t.Item2;

ps1.Request(6);
ps2.Request(6);
ps1.ExpectNext( 1, 2, 3, 4, 5, 6);
ps2.ExpectNext( 1, 2, 3, 4, 5, 6);
ps1.ExpectComplete();
ps2.ExpectComplete();
await ps1.RequestAsync(6);
await ps2.RequestAsync(6);
ps1.ExpectNext(1, 2, 3, 4, 5, 6);
ps2.ExpectNext(1, 2, 3, 4, 5, 6);
await ps1.ExpectCompleteAsync();
await ps2.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_Broadcast_must_AlsoTo_must_continue_if_sink_cancels()
public async Task A_Broadcast_must_AlsoTo_must_continue_if_sink_cancels()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var p = this.SinkProbe<int>();
var p2 = this.SinkProbe<int>();

Expand All @@ -340,11 +332,11 @@ public void A_Broadcast_must_AlsoTo_must_continue_if_sink_cancels()

var ps1 = t.Item1;
var ps2 = t.Item2;
ps2.Request(6);

await ps2.RequestAsync(6);
ps1.Cancel();
ps2.ExpectNext( 1, 2, 3, 4, 5, 6);
ps2.ExpectComplete();
ps2.ExpectNext(1, 2, 3, 4, 5, 6);
await ps2.ExpectCompleteAsync();
}, Materializer);
}
}
Expand Down