Skip to content

[37-74] FlowWatchTerminationSpec #6584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 10, 2023
68 changes: 34 additions & 34 deletions src/core/Akka.Streams.Tests/Dsl/FlowWatchTerminationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
Expand All @@ -28,83 +29,81 @@ public FlowWatchTerminationSpec(ITestOutputHelper helper) : base(helper)
}

[Fact]
public void A_WatchTermination_must_complete_the_future_when_stream_is_completed()
public async Task A_WatchTermination_must_complete_the_future_when_stream_is_completed()
{
this.AssertAllStagesStopped(() =>
{
var t =
Source.From(Enumerable.Range(1, 4))
.WatchTermination(Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
await this.AssertAllStagesStoppedAsync(async() => {
var t =
Source.From(Enumerable.Range(1, 4))
.WatchTermination(Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var future = t.Item1;
var p = t.Item2;

p.Request(4).ExpectNext( 1, 2, 3, 4);
p.Request(4).ExpectNext(1, 2, 3, 4);
future.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
p.ExpectComplete();
await p.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_WatchTermination_must_complete_the_future_when_stream_is_cancelled_from_downstream()
public async Task A_WatchTermination_must_complete_the_future_when_stream_is_cancelled_from_downstream()
{
this.AssertAllStagesStopped(() =>
{
var t =
Source.From(Enumerable.Range(1, 4))
.WatchTermination(Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
await this.AssertAllStagesStoppedAsync(() => {
var t =
Source.From(Enumerable.Range(1, 4))
.WatchTermination(Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var future = t.Item1;
var p = t.Item2;

p.Request(3).ExpectNext( 1, 2, 3);
p.Request(3).ExpectNext(1, 2, 3);
p.Cancel();
future.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_WatchTermination_must_fail_the_future_when_stream_is_failed()
public async Task A_WatchTermination_must_fail_the_future_when_stream_is_failed()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var ex = new Exception("Stream failed.");
var t = this.SourceProbe<int>().WatchTermination(Keep.Both).To(Sink.Ignore<int>()).Run(Materializer);
var p = t.Item1;
var future = t.Item2;
p.SendNext(1);
p.SendError(ex);
future.Invoking(f => f.Wait()).Should().Throw<Exception>().WithMessage("Stream failed.");
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_WatchTermination_must_complete_the_future_for_an_empty_stream()
public async Task A_WatchTermination_must_complete_the_future_for_an_empty_stream()
{
this.AssertAllStagesStopped(() =>
{
var t =
Source.Empty<int>()
.WatchTermination(Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
await this.AssertAllStagesStoppedAsync(() => {
var t =
Source.Empty<int>()
.WatchTermination(Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var future = t.Item1;
var p = t.Item2;
p.Request(1);
future.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
return Task.CompletedTask;
}, Materializer);
}

[Fact(Skip = "We need a way to combine multiple sources with different materializer types")]
public void A_WatchTermination_must_complete_the_future_for_graph()
public async Task A_WatchTermination_must_complete_the_future_for_graph()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
//var first = this.SourceProbe<int>().WatchTermination(Keep.Both);
//var second = Source.From(Enumerable.Range(2, 4)).MapMaterializedValue(new Func<NotUsed, (TestPublisher.Probe<int>, Task)>(_ => null));

//var t = Source.FromGraph(
// GraphDsl.Create<SourceShape<int>, (TestPublisher.Probe<int>, Task)>(b =>
// {
Expand All @@ -128,6 +127,7 @@ public void A_WatchTermination_must_complete_the_future_for_graph()

//sourceProbe.SendComplete();
//sinkProbe.ExpectNextN(new[] {2, 3, 4, 5}).ExpectComplete();
return Task.CompletedTask;
}, Materializer);
}

Expand Down