Skip to content

[55-74] LazySourceSpec #6602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 31, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 38 additions & 41 deletions src/core/Akka.Streams.Tests/Dsl/LazySourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using FluentAssertions;
using Xunit;
using FluentAssertions.Extensions;
using Xunit.Sdk;

namespace Akka.Streams.Tests.Dsl
{
Expand All @@ -30,11 +31,11 @@ public LazySourceSpec()
}

private ActorMaterializer Materializer { get; }

[Fact]
public void A_lazy_source_must_work_like_a_normal_source_happy_path()
public async Task A_lazy_source_must_work_like_a_normal_source_happy_path()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var result = Source.Lazily(() => Source.From(new[] { 1, 2, 3 })).RunWith(Sink.Seq<int>(), Materializer);
var complete = await result.ShouldCompleteWithin(3.Seconds());
Expand All @@ -43,10 +44,9 @@ public void A_lazy_source_must_work_like_a_normal_source_happy_path()
}

[Fact]
public void A_lazy_source_must_work_never_construct_the_source_when_there_was_no_demand()
public async Task A_lazy_source_must_work_never_construct_the_source_when_there_was_no_demand()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var probe = this.CreateSubscriberProbe<int>();
var constructed = new AtomicBoolean();
Source.Lazily(() =>
Expand All @@ -57,49 +57,48 @@ public void A_lazy_source_must_work_never_construct_the_source_when_there_was_no

probe.Cancel();
constructed.Value.Should().BeFalse();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_lazy_source_must_fail_the_materialized_value_when_downstream_cancels_without_ever_consuming_any_element()
public async Task A_lazy_source_must_fail_the_materialized_value_when_downstream_cancels_without_ever_consuming_any_element()
{
this.AssertAllStagesStopped(() =>
{
var result = Source.Lazily(() => Source.From(new[] { 1, 2, 3 }))
.ToMaterialized(Sink.Cancelled<int>(), Keep.Left)
.Run(Materializer);
await this.AssertAllStagesStoppedAsync(() => {
var result = Source.Lazily(() => Source.From(new[] { 1, 2, 3 }))
.ToMaterialized(Sink.Cancelled<int>(), Keep.Left)
.Run(Materializer);

Intercept(() =>
AssertThrows<Exception>(() =>
{
var boom = result.Result;
});
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_lazy_source_must_stop_consuming_when_downstream_has_cancelled()
public async Task A_lazy_source_must_stop_consuming_when_downstream_has_cancelled()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var outProbe = this.CreateSubscriberProbe<int>();
var inProbe = this.CreatePublisherProbe<int>();

Source.Lazily(() => Source.FromPublisher(inProbe)).RunWith(Sink.FromSubscriber(outProbe), Materializer);

outProbe.Request(1);
inProbe.ExpectRequest();
inProbe.SendNext(27);
outProbe.ExpectNext(27);
outProbe.Cancel();
inProbe.ExpectCancellation();
await inProbe.ExpectRequestAsync();
await inProbe.SendNextAsync(27);
await outProbe.ExpectNextAsync(27);
await outProbe.CancelAsync();
await inProbe.ExpectCancellationAsync();
}, Materializer);
}

[Fact]
public void A_lazy_source_must_materialize_when_the_source_has_been_created()
public async Task A_lazy_source_must_materialize_when_the_source_has_been_created()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var probe = this.CreateSubscriberProbe<int>();

var task = Source.Lazily(() => Source.From(new[] { 1, 2, 3 }).MapMaterializedValue(_ => Done.Instance))
Expand All @@ -108,18 +107,17 @@ public void A_lazy_source_must_materialize_when_the_source_has_been_created()

task.IsCompleted.Should().BeFalse();
probe.Request(1);
probe.ExpectNext(1);
await probe.ExpectNextAsync(1);
task.Result.Should().Be(Done.Instance);

probe.Cancel();
}, Materializer);
}

[Fact]
public void A_lazy_source_must_propagate_downstream_cancellation_cause_when_inner_source_has_been_materialized()
public async Task A_lazy_source_must_propagate_downstream_cancellation_cause_when_inner_source_has_been_materialized()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var probe = CreateTestProbe();
var (doneF, killSwitch) = Source.Lazily(() =>
{
Expand All @@ -138,7 +136,7 @@ public void A_lazy_source_must_propagate_downstream_cancellation_cause_when_inne
.Run(Materializer);

var boom = new TestException("boom");
probe.ExpectMsg<Done>();
await probe.ExpectMsgAsync<Done>();
killSwitch.Abort(boom);
doneF.ContinueWith(t =>
{
Expand All @@ -150,19 +148,18 @@ public void A_lazy_source_must_propagate_downstream_cancellation_cause_when_inne
}

[Fact]
public void A_lazy_source_must_fail_stage_when_upstream_fails()
public async Task A_lazy_source_must_fail_stage_when_upstream_fails()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var outProbe = this.CreateSubscriberProbe<int>();
var inProbe = this.CreatePublisherProbe<int>();

Source.Lazily(() => Source.FromPublisher(inProbe)).RunWith(Sink.FromSubscriber(outProbe), Materializer);

outProbe.Request(1);
inProbe.ExpectRequest();
inProbe.SendNext(27);
outProbe.ExpectNext(27);
await inProbe.ExpectRequestAsync();
await inProbe.SendNextAsync(27);
await outProbe.ExpectNextAsync(27);

var testException = new TestException("OMG Who set that on fire !?!");
inProbe.SendError(testException);
Expand All @@ -171,9 +168,9 @@ public void A_lazy_source_must_fail_stage_when_upstream_fails()
}

[Fact]
public void A_lazy_source_must_propagate_attributes_to_inner_stream()
public async Task A_lazy_source_must_propagate_attributes_to_inner_stream()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var attributesSource = Source.FromGraph(new AttibutesSourceStage())
.AddAttributes(Attributes.CreateName("inner"));
Expand All @@ -193,9 +190,9 @@ public void A_lazy_source_must_propagate_attributes_to_inner_stream()
}

[Fact]
public void A_lazy_source_must_fail_correctly_when_materialization_of_inner_source_fails()
public async Task A_lazy_source_must_fail_correctly_when_materialization_of_inner_source_fails()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(() =>
{
var matFail = new TestException("fail!");

Expand All @@ -207,12 +204,12 @@ public void A_lazy_source_must_fail_correctly_when_materialization_of_inner_sour
{
task.Wait(TimeSpan.FromSeconds(1));
}
catch (AggregateException) {}
catch (AggregateException) { }

task.IsFaulted.ShouldBe(true);
task.Exception.ShouldNotBe(null);
task.Exception.InnerException.Should().BeEquivalentTo(matFail);

return Task.CompletedTask;
}, Materializer);
}

Expand Down