Skip to content

[54-74] LazySinkSpec #6601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 4, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 55 additions & 42 deletions src/core/Akka.Streams.Tests/Dsl/LazySinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,53 +44,67 @@ private static Func<TMat> Fallback<TMat>()
private static readonly Exception Ex = new TestException("");

[Fact]
public void A_LazySink_must_work_in_the_happy_case()
public async Task A_LazySink_must_work_in_the_happy_case()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var lazySink = Sink.LazyInitAsync(() => Task.FromResult(this.SinkProbe<int>()));
var taskProbe = Source.From(Enumerable.Range(0, 11)).RunWith(lazySink, Materializer);
var probe = await taskProbe.ShouldCompleteWithin(RemainingOrDefault);
probe.Value.Request(100);
Enumerable.Range(0, 11).ForEach(i => probe.Value.ExpectNext(i));
foreach (var i in Enumerable.Range(0, 11))
{
await probe.Value.ExpectNextAsync(i);
}
//I CHECK IT THIS IS NOT GOOD - EVERYTHING WORKS FINE NOW
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

//I CHECK IT THIS IS NOT GOOD - EVERYTHING WORKS FINE NOW

// Enumerable.Range(0, 11).ForEach(i => probe.Value.ExpectNext(i));
}, Materializer);
}

[Fact]
public void A_LazySink_must_work_with_slow_sink_init()
public async Task A_LazySink_must_work_with_slow_sink_init()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var p = new TaskCompletionSource<Sink<int, TestSubscriber.Probe<int>>>();
var sourceProbe = this.CreateManualPublisherProbe<int>();
var taskProbe = Source.FromPublisher(sourceProbe)
.RunWith(Sink.LazyInitAsync(() => p.Task), Materializer);

var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
await sourceSub.ExpectRequestAsync(1);
sourceSub.SendNext(0);
sourceSub.ExpectRequest(1);
sourceProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
await sourceSub.ExpectRequestAsync(1);
await sourceProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
taskProbe.Wait(TimeSpan.FromMilliseconds(200)).ShouldBeFalse();

p.SetResult(this.SinkProbe<int>());
var complete = await taskProbe.ShouldCompleteWithin(RemainingOrDefault);
var probe = complete.Value;
probe.Request(100);
probe.ExpectNext(0);
Enumerable.Range(1,10).ForEach(i =>
await probe.ExpectNextAsync(0);

foreach (var i in Enumerable.Range(1, 10))
{
sourceSub.SendNext(i);
await probe.ExpectNextAsync(i);
}
//I CHECK IT THIS IS NOT GOOD - EVERYTHING WORKS FINE NOW
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove any comments that aren't functional

/*
Enumerable.Range(1,10).ForEach(i =>
{
sourceSub.SendNext(i);
probe.ExpectNext(i);
});
*/
sourceSub.SendComplete();
}, Materializer);
}

[Fact]
public void A_LazySink_must_complete_when_there_was_no_elements_in_stream()
public async Task A_LazySink_must_complete_when_there_was_no_elements_in_stream()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var lazySink = Sink.LazyInitAsync(() => Task.FromResult(Sink.Aggregate(0, (int i, int i2) => i + i2)));
var taskProbe = Source.Empty<int>().RunWith(lazySink, Materializer);
Expand All @@ -100,56 +114,55 @@ public void A_LazySink_must_complete_when_there_was_no_elements_in_stream()
}

[Fact]
public void A_LazySink_must_complete_normally_when_upstream_is_completed()
public async Task A_LazySink_must_complete_normally_when_upstream_is_completed()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var lazySink = Sink.LazyInitAsync(() => Task.FromResult(this.SinkProbe<int>()));
var taskProbe = Source.Single(1).RunWith(lazySink, Materializer);
var taskResult = await taskProbe.ShouldCompleteWithin(RemainingOrDefault);
taskResult.Value.Request(1).ExpectNext(1).ExpectComplete();
await taskResult.Value.Request(1).ExpectNext(1).ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_LazySink_must_fail_gracefully_when_sink_factory_method_failed()
public async Task A_LazySink_must_fail_gracefully_when_sink_factory_method_failed()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var sourceProbe = this.CreateManualPublisherProbe<int>();
var taskProbe = Source.FromPublisher(sourceProbe).RunWith(Sink.LazyInitAsync<int, NotUsed>(() => throw Ex), Materializer);
var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
await sourceSub.ExpectRequestAsync(1);
sourceSub.SendNext(0);
sourceSub.ExpectCancellation();
await sourceSub.ExpectCancellationAsync();
taskProbe.Invoking(t => t.Wait()).Should().Throw<TestException>();
}, Materializer);
}

[Fact]
public void A_LazySink_must_fail_gracefully_when_upstream_failed()
public async Task A_LazySink_must_fail_gracefully_when_upstream_failed()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var sourceProbe = this.CreateManualPublisherProbe<int>();
var lazySink = Sink.LazyInitAsync(() => Task.FromResult(this.SinkProbe<int>()));
var taskProbe = Source.FromPublisher(sourceProbe).RunWith(lazySink, Materializer);

var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
await sourceSub.ExpectRequestAsync(1);
sourceSub.SendNext(0);
var complete = await taskProbe.ShouldCompleteWithin(RemainingOrDefault);
var probe = complete.Value;
probe.Request(1).ExpectNext(0);
await probe.Request(1).ExpectNextAsync(0);
sourceSub.SendError(Ex);
probe.ExpectError().Should().Be(Ex);
}, Materializer);
}

[Fact]
public void A_LazySink_must_fail_gracefully_when_factory_task_failed()
public async Task A_LazySink_must_fail_gracefully_when_factory_task_failed()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var sourceProbe = this.CreateManualPublisherProbe<int>();
var lazySink = Sink.LazyInitAsync(() => Task.FromException<Sink<int, TestSubscriber.Probe<int>>>(Ex));
Expand All @@ -159,38 +172,38 @@ public void A_LazySink_must_fail_gracefully_when_factory_task_failed()
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.StoppingDecider))
.Run(Materializer);

var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
await sourceSub.ExpectRequestAsync(1);
sourceSub.SendNext(0);
taskProbe.Invoking(t => t.Wait(TimeSpan.FromMilliseconds(300))).Should().Throw<TestException>();

}, Materializer);
}

[Fact]
public void A_LazySink_must_cancel_upstream_when_internal_sink_is_cancelled()
public async Task A_LazySink_must_cancel_upstream_when_internal_sink_is_cancelled()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var sourceProbe = this.CreateManualPublisherProbe<int>();
var lazySink = Sink.LazyInitAsync(() => Task.FromResult(this.SinkProbe<int>()));
var taskProbe = Source.FromPublisher(sourceProbe).RunWith(lazySink, Materializer);
var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
await sourceSub.ExpectRequestAsync(1);
sourceSub.SendNext(0);
sourceSub.ExpectRequest(1);
await sourceSub.ExpectRequestAsync(1);
var complete = await taskProbe.ShouldCompleteWithin(RemainingOrDefault);
var probe = complete.Value;
probe.Request(1).ExpectNext(0);
await probe.Request(1).ExpectNextAsync(0);
probe.Cancel();
sourceSub.ExpectCancellation();
await sourceSub.ExpectCancellationAsync();
}, Materializer);
}

[Fact]
public void A_LazySink_must_fail_correctly_when_materialization_of_inner_sink_fails()
public async Task A_LazySink_must_fail_correctly_when_materialization_of_inner_sink_fails()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var matFail = new TestException("fail!");

var task = Source.Single("whatever")
Expand All @@ -205,7 +218,7 @@ public void A_LazySink_must_fail_correctly_when_materialization_of_inner_sink_fa
task.IsFaulted.ShouldBe(true);
task.Exception.ShouldNotBe(null);
task.Exception.Flatten().InnerException.Should().BeEquivalentTo(matFail);

return Task.CompletedTask;
}, Materializer);
}

Expand Down