Skip to content

[54-74] LazySinkSpec #6601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 4, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 39 additions & 25 deletions src/core/Akka.Streams.Tests/Dsl/LazySinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ await this.AssertAllStagesStoppedAsync(async() =>
var taskProbe = Source.From(Enumerable.Range(0, 11)).RunWith(lazySink, Materializer);
var probe = await taskProbe.ShouldCompleteWithin(RemainingOrDefault);
probe.Value.Request(100);
Enumerable.Range(0, 11).ForEach(i => probe.Value.ExpectNext(i));
foreach (var i in Enumerable.Range(0, 11))
{
await probe.Value.ExpectNextAsync(i);
}
//I CHECK IT THIS IS NOT GOOD - EVERYTHING WORKS FINE NOW
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

//I CHECK IT THIS IS NOT GOOD - EVERYTHING WORKS FINE NOW

// Enumerable.Range(0, 11).ForEach(i => probe.Value.ExpectNext(i));
}, Materializer);
}

Expand All @@ -66,23 +71,32 @@ await this.AssertAllStagesStoppedAsync(async() =>
var taskProbe = Source.FromPublisher(sourceProbe)
.RunWith(Sink.LazyInitAsync(() => p.Task), Materializer);

var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
await sourceSub.ExpectRequestAsync(1);
sourceSub.SendNext(0);
sourceSub.ExpectRequest(1);
sourceProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
await sourceSub.ExpectRequestAsync(1);
await sourceProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
taskProbe.Wait(TimeSpan.FromMilliseconds(200)).ShouldBeFalse();

p.SetResult(this.SinkProbe<int>());
var complete = await taskProbe.ShouldCompleteWithin(RemainingOrDefault);
var probe = complete.Value;
probe.Request(100);
probe.ExpectNext(0);
Enumerable.Range(1,10).ForEach(i =>
await probe.ExpectNextAsync(0);

foreach (var i in Enumerable.Range(1, 10))
{
sourceSub.SendNext(i);
await probe.ExpectNextAsync(i);
}
//I CHECK IT THIS IS NOT GOOD - EVERYTHING WORKS FINE NOW
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove any comments that aren't functional

/*
Enumerable.Range(1,10).ForEach(i =>
{
sourceSub.SendNext(i);
probe.ExpectNext(i);
});
*/
sourceSub.SendComplete();
}, Materializer);
}
Expand All @@ -107,22 +121,21 @@ await this.AssertAllStagesStoppedAsync(async() =>
var lazySink = Sink.LazyInitAsync(() => Task.FromResult(this.SinkProbe<int>()));
var taskProbe = Source.Single(1).RunWith(lazySink, Materializer);
var taskResult = await taskProbe.ShouldCompleteWithin(RemainingOrDefault);
taskResult.Value.Request(1).ExpectNext(1).ExpectComplete();
await taskResult.Value.Request(1).ExpectNext(1).ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public async Task A_LazySink_must_fail_gracefully_when_sink_factory_method_failed()
{
await this.AssertAllStagesStoppedAsync(() => {
await this.AssertAllStagesStoppedAsync(async() => {
var sourceProbe = this.CreateManualPublisherProbe<int>();
var taskProbe = Source.FromPublisher(sourceProbe).RunWith(Sink.LazyInitAsync<int, NotUsed>(() => throw Ex), Materializer);
var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
await sourceSub.ExpectRequestAsync(1);
sourceSub.SendNext(0);
sourceSub.ExpectCancellation();
await sourceSub.ExpectCancellationAsync();
taskProbe.Invoking(t => t.Wait()).Should().Throw<TestException>();
return Task.CompletedTask;
}, Materializer);
}

Expand All @@ -135,12 +148,12 @@ await this.AssertAllStagesStoppedAsync(async() =>
var lazySink = Sink.LazyInitAsync(() => Task.FromResult(this.SinkProbe<int>()));
var taskProbe = Source.FromPublisher(sourceProbe).RunWith(lazySink, Materializer);

var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
await sourceSub.ExpectRequestAsync(1);
sourceSub.SendNext(0);
var complete = await taskProbe.ShouldCompleteWithin(RemainingOrDefault);
var probe = complete.Value;
probe.Request(1).ExpectNext(0);
await probe.Request(1).ExpectNextAsync(0);
sourceSub.SendError(Ex);
probe.ExpectError().Should().Be(Ex);
}, Materializer);
Expand All @@ -149,7 +162,8 @@ await this.AssertAllStagesStoppedAsync(async() =>
[Fact]
public async Task A_LazySink_must_fail_gracefully_when_factory_task_failed()
{
await this.AssertAllStagesStoppedAsync(() => {
await this.AssertAllStagesStoppedAsync(async() =>
{
var sourceProbe = this.CreateManualPublisherProbe<int>();
var lazySink = Sink.LazyInitAsync(() => Task.FromException<Sink<int, TestSubscriber.Probe<int>>>(Ex));
var taskProbe =
Expand All @@ -158,11 +172,11 @@ await this.AssertAllStagesStoppedAsync(() => {
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.StoppingDecider))
.Run(Materializer);

var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
await sourceSub.ExpectRequestAsync(1);
sourceSub.SendNext(0);
taskProbe.Invoking(t => t.Wait(TimeSpan.FromMilliseconds(300))).Should().Throw<TestException>();
return Task.CompletedTask;

}, Materializer);
}

Expand All @@ -174,15 +188,15 @@ await this.AssertAllStagesStoppedAsync(async() =>
var sourceProbe = this.CreateManualPublisherProbe<int>();
var lazySink = Sink.LazyInitAsync(() => Task.FromResult(this.SinkProbe<int>()));
var taskProbe = Source.FromPublisher(sourceProbe).RunWith(lazySink, Materializer);
var sourceSub = sourceProbe.ExpectSubscription();
sourceSub.ExpectRequest(1);
var sourceSub = await sourceProbe.ExpectSubscriptionAsync();
await sourceSub.ExpectRequestAsync(1);
sourceSub.SendNext(0);
sourceSub.ExpectRequest(1);
await sourceSub.ExpectRequestAsync(1);
var complete = await taskProbe.ShouldCompleteWithin(RemainingOrDefault);
var probe = complete.Value;
probe.Request(1).ExpectNext(0);
await probe.Request(1).ExpectNextAsync(0);
probe.Cancel();
sourceSub.ExpectCancellation();
await sourceSub.ExpectCancellationAsync();
}, Materializer);
}

Expand Down