Skip to content

[9-74]FlowConcatSpec #6553

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 77 additions & 78 deletions src/core/Akka.Streams.Tests/Dsl/FlowConcatSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ protected override TestSubscriber.Probe<int> Setup(IPublisher<int> p1, IPublishe
}

[Fact]
public void A_Concat_for_Flow_must_be_able_to_concat_Flow_with_Source()
public async Task A_Concat_for_Flow_must_be_able_to_concat_Flow_with_Source()
{
var f1 = Flow.Create<int>().Select(x => x + "-s");
var s1 = Source.From(new[] {1, 2, 3});
Expand All @@ -48,14 +48,17 @@ public void A_Concat_for_Flow_must_be_able_to_concat_Flow_with_Source()
var res = f1.Concat(s2).RunWith(s1, subSink, Materializer).Item2;

res.Subscribe(subs);
var sub = subs.ExpectSubscription();
var sub = await subs.ExpectSubscriptionAsync();
sub.Request(9);
Enumerable.Range(1, 6).ForEach(e=>subs.ExpectNext(e + "-s"));
subs.ExpectComplete();

foreach (var e in Enumerable.Range(1, 6))
await subs.ExpectNextAsync(e + "-s");

await subs.ExpectCompleteAsync();
}

[Fact]
public void A_Concat_for_Flow_must_be_able_to_prepend_a_Source_to_a_Flow()
public async Task A_Concat_for_Flow_must_be_able_to_prepend_a_Source_to_a_Flow()
{
var s1 = Source.From(new[] { 1, 2, 3 }).Select(x => x + "-s");
var s2 = Source.From(new[] { 4, 5, 6 });
Expand All @@ -67,48 +70,56 @@ public void A_Concat_for_Flow_must_be_able_to_prepend_a_Source_to_a_Flow()
var res = f2.Prepend(s1).RunWith(s2, subSink, Materializer).Item2;

res.Subscribe(subs);
var sub = subs.ExpectSubscription();
var sub = await subs.ExpectSubscriptionAsync();
sub.Request(9);
Enumerable.Range(1, 6).ForEach(e => subs.ExpectNext(e + "-s"));
subs.ExpectComplete();

foreach (var e in Enumerable.Range(1, 6))
await subs.ExpectNextAsync(e + "-s");

await subs.ExpectCompleteAsync();
}

[Fact]
public void A_Concat_for_Flow_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
public async Task A_Concat_for_Flow_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var subscriber1 = Setup(CompletedPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
var subscription1 = subscriber1.ExpectSubscription();
var subscription1 = await subscriber1.ExpectSubscriptionAsync();
subscription1.Request(5);
Enumerable.Range(1, 4).ForEach(x => subscriber1.ExpectNext(x));
subscriber1.ExpectComplete();

foreach (var x in Enumerable.Range(1, 4))
await subscriber1.ExpectNextAsync(x);

await subscriber1.ExpectCompleteAsync();

var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), CompletedPublisher<int>());
var subscription2 = subscriber2.ExpectSubscription();
var subscription2 = await subscriber2.ExpectSubscriptionAsync();
subscription2.Request(5);
Enumerable.Range(1, 4).ForEach(x => subscriber2.ExpectNext(x));
subscriber2.ExpectComplete();

foreach (var x in Enumerable.Range(1, 4))
await subscriber2.ExpectNextAsync(x);

await subscriber2.ExpectCompleteAsync();

}, Materializer);
}

[Fact]
public void A_Concat_for_Flow_must_work_with_one_immediately_failed_and_one_nonempty_publisher()
public async Task A_Concat_for_Flow_must_work_with_one_immediately_failed_and_one_nonempty_publisher()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var subscriber = Setup(FailedPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
subscriber.ExpectSubscriptionAndError().Should().BeOfType<TestException>();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Concat_for_Flow_must_work_with_one_nonempty_and_one_immediately_failed_publisher()
public async Task A_Concat_for_Flow_must_work_with_one_nonempty_and_one_immediately_failed_publisher()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var subscriber = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), FailedPublisher<int>());
subscriber.ExpectSubscription().Request(5);
(await subscriber.ExpectSubscriptionAsync()).Request(5);

var errorSignalled = Enumerable.Range(1, 4)
.Aggregate(false, (b, e) => b || subscriber.ExpectNextOrError() is TestException);
Expand All @@ -118,22 +129,21 @@ public void A_Concat_for_Flow_must_work_with_one_nonempty_and_one_immediately_fa
}

[Fact]
public void A_Concat_for_Flow_must_work_with_one_delayed_failed_and_one_nonempty_publisher()
public async Task A_Concat_for_Flow_must_work_with_one_delayed_failed_and_one_nonempty_publisher()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var subscriber = Setup(SoonToFailPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
subscriber.ExpectSubscriptionAndError().Should().BeOfType<TestException>();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Concat_for_Flow_must_work_with_one_nonempty_and_one_delayed_failed_publisher()
public async Task A_Concat_for_Flow_must_work_with_one_nonempty_and_one_delayed_failed_publisher()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var subscriber = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), SoonToFailPublisher<int>());
subscriber.ExpectSubscription().Request(5);
(await subscriber.ExpectSubscriptionAsync()).Request(5);

var errorSignalled = Enumerable.Range(1, 4)
.Aggregate(false, (b, e) => b || subscriber.ExpectNextOrError() is TestException);
Expand All @@ -143,54 +153,56 @@ public void A_Concat_for_Flow_must_work_with_one_nonempty_and_one_delayed_failed
}

[Fact]
public void A_Concat_for_Flow_must_correctly_handle_async_errors_in_secondary_upstream()
public async Task A_Concat_for_Flow_must_correctly_handle_async_errors_in_secondary_upstream()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var promise = new TaskCompletionSource<int>();
var subscriber = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 3))
.Concat(Source.FromTask(promise.Task))
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = subscriber.ExpectSubscription();
var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(4);
Enumerable.Range(1, 3).ForEach(x => subscriber.ExpectNext(x));

foreach (var x in Enumerable.Range(1, 3))
await subscriber.ExpectNextAsync(x);

promise.SetException(TestException());
subscriber.ExpectError().Should().BeOfType<TestException>();

}, Materializer);
}

[Fact]
public void A_Concat_for_Flow_must_work_with_Source_DSL()
public async Task A_Concat_for_Flow_must_work_with_Source_DSL()
{
this.AssertAllStagesStopped(() =>
{
var testSource =
Source.From(Enumerable.Range(1, 5))
.ConcatMaterialized(Source.From(Enumerable.Range(6, 5)), Keep.Both)
.Grouped(1000);
await this.AssertAllStagesStoppedAsync(() => {
var testSource =
Source.From(Enumerable.Range(1, 5))
.ConcatMaterialized(Source.From(Enumerable.Range(6, 5)), Keep.Both)
.Grouped(1000);
var task = testSource.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
task.Result.Should().BeEquivalentTo(Enumerable.Range(1,10));
task.Result.Should().BeEquivalentTo(Enumerable.Range(1, 10));

var runnable = testSource.ToMaterialized(Sink.Ignore<IEnumerable<int>>(), Keep.Left);
var t = runnable.Run(Materializer);
t.Item1.Should().BeOfType<NotUsed>();
t.Item2.Should().BeOfType<NotUsed>();

runnable.MapMaterializedValue(_ => "boo").Run(Materializer).Should().Be("boo");
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Concat_for_Flow_must_work_with_Flow_DSL()
public async Task A_Concat_for_Flow_must_work_with_Flow_DSL()
{
this.AssertAllStagesStopped(() =>
{
var testFlow = Flow.Create<int>()
.ConcatMaterialized(Source.From(Enumerable.Range(6, 5)), Keep.Both)
.Grouped(1000);
await this.AssertAllStagesStoppedAsync(() => {
var testFlow = Flow.Create<int>()
.ConcatMaterialized(Source.From(Enumerable.Range(6, 5)), Keep.Both)
.Grouped(1000);
var task = Source.From(Enumerable.Range(1, 5))
.ViaMaterialized(testFlow, Keep.Both)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
Expand All @@ -204,65 +216,52 @@ public void A_Concat_for_Flow_must_work_with_Flow_DSL()
runnable.Invoking(r => r.Run(Materializer)).Should().NotThrow();

runnable.MapMaterializedValue(_ => "boo").Run(Materializer).Should().Be("boo");
return Task.CompletedTask;
}, Materializer);
}

[Fact(Skip = "ConcatMaterialized type conflict")]
public void A_Concat_for_Flow_must_work_with_Flow_DSL2()
public async Task A_Concat_for_Flow_must_work_with_Flow_DSL2()
{
this.AssertAllStagesStopped(() =>
{
var testFlow = Flow.Create<int>()
.ConcatMaterialized(Source.From(Enumerable.Range(6, 5)), Keep.Both)
.Grouped(1000);
await this.AssertAllStagesStoppedAsync(() => {
var testFlow = Flow.Create<int>()
.ConcatMaterialized(Source.From(Enumerable.Range(6, 5)), Keep.Both)
.Grouped(1000);
var task = Source.From(Enumerable.Range(1, 5))
.ViaMaterialized(testFlow, Keep.Both)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
task.Result.Should().BeEquivalentTo(Enumerable.Range(1, 10));

//var sink = testFlow.ConcatMaterialized(Source.From(Enumerable.Range(1, 5)), Keep.Both)
// .To(Sink.Ignore<IEnumerable<int>>())
// .MapMaterializedValue(
// x =>
// {
// x.Item1.Item1.Should().BeOfType<NotUsed>();
// x.Item1.Item2.Should().BeOfType<NotUsed>();
// x.Item2.Should().BeOfType<NotUsed>();
// return "boo";
// });

//Source.From(Enumerable.Range(10, 6)).RunWith(sink, Materializer).Should().Be("boo");
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Concat_for_Flow_must_subscribe_at_one_to_initial_source_and_to_one_that_it_is_concat_to()
public async Task A_Concat_for_Flow_must_subscribe_at_one_to_initial_source_and_to_one_that_it_is_concat_to()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var publisher1 = this.CreatePublisherProbe<int>();
var publisher2 = this.CreatePublisherProbe<int>();
var probeSink =
Source.FromPublisher(publisher1)
.Concat(Source.FromPublisher(publisher2))
.RunWith(this.SinkProbe<int>(), Materializer);

var sub1 = publisher1.ExpectSubscription();
var sub2 = publisher2.ExpectSubscription();
var subSink = probeSink.ExpectSubscription();
var sub1 = await publisher1.ExpectSubscriptionAsync();
var sub2 = await publisher2.ExpectSubscriptionAsync();
var subSink = await probeSink.ExpectSubscriptionAsync();

sub1.SendNext(1);
subSink.Request(1);
probeSink.ExpectNext(1);
await probeSink.ExpectNextAsync(1);
sub1.SendComplete();

sub2.SendNext(2);
subSink.Request(1);
probeSink.ExpectNext(2);
await probeSink.ExpectNextAsync(2);
sub2.SendComplete();

probeSink.ExpectComplete();
await probeSink.ExpectCompleteAsync();
}, Materializer);
}
}
Expand Down