Skip to content

[Obsolete][CS0618] AwaitResult > Use ShouldCompleteWithin instead #6498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
374fcb0
[Obsolete][CS0618] `AwaitResult` > `Use ShouldCompleteWithin instead`
eaba Mar 7, 2023
2128453
Merge branch 'dev' into use_should_complete_within
eaba Mar 7, 2023
9387a0f
fix
eaba Mar 7, 2023
71537a6
Merge branch 'use_should_complete_within' of https://github.com/eaba/…
eaba Mar 7, 2023
73e8f3d
Merge branch 'dev' into use_should_complete_within
Aaronontheweb Mar 8, 2023
563a1ff
more fixed
eaba Mar 8, 2023
f6a7444
[test] Within `2.Seconds()`
eaba Mar 8, 2023
b6aa08a
[test][FileSinkSpec] change to `Async`s
eaba Mar 8, 2023
5b4bf78
Merge branch 'dev' into use_should_complete_within
Aaronontheweb Mar 9, 2023
a58e552
Fixes
eaba Mar 9, 2023
f1093c4
Merge branch 'use_should_complete_within' of https://github.com/eaba/…
eaba Mar 9, 2023
8e18ee7
[revert] `ValveSpec` seq.Invoking*
eaba Mar 10, 2023
e9f5e05
[change] `await this.AssertAllStagesStoppedAsync(async() => { }, mate…
eaba Mar 10, 2023
a0bedab
Merge branch 'dev' into use_should_complete_within
eaba Mar 11, 2023
fd17ffa
Merge branch 'dev' into use_should_complete_within
Arkatufus Mar 14, 2023
8077102
improves `FlowAggregateAsyncSpec`, `FlowSelectAsyncSpec`
eaba Mar 15, 2023
fabaeef
Merge branch 'dev' into use_should_complete_within
eaba Mar 15, 2023
de89da8
fixed
eaba Mar 15, 2023
02a0e9e
Merge branch 'use_should_complete_within' of https://github.com/eaba/…
eaba Mar 15, 2023
22155e8
Merge branch 'dev' into use_should_complete_within
eaba Mar 16, 2023
3b40a3a
Merge branch 'dev' into use_should_complete_within
Aaronontheweb Mar 17, 2023
6a1d837
Merge branch 'dev' into use_should_complete_within
Arkatufus Mar 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/core/Akka.Streams.Tests/Dsl/AttributesSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
using Akka.Streams.Implementation;
using Akka.Streams.TestKit;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;

namespace Akka.Streams.Tests.Dsl
Expand All @@ -30,26 +32,27 @@ public AttributesSpec()
Attributes.CreateName("a").And(Attributes.CreateName("b")).And(Attributes.CreateInputBuffer(1, 2));

[Fact]
public void Attributes_must_be_overridable_on_a_module_basis()
public async Task Attributes_must_be_overridable_on_a_module_basis()
{
var runnable =
Source.Empty<NotUsed>()
.ToMaterialized(AttributesSink.Create().WithAttributes(Attributes.CreateName("new-name")),
Keep.Right);
var task = runnable.Run(Materializer);

task.AwaitResult().GetAttribute<Attributes.Name>().Value.Should().Contain("new-name");
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.GetAttribute<Attributes.Name>().Value.Should().Contain("new-name");
}

[Fact]
public void Attributes_must_keep_the_outermost_attribute_as_the_least_specific()
public async Task Attributes_must_keep_the_outermost_attribute_as_the_least_specific()
{
var task = Source.Empty<NotUsed>()
.ToMaterialized(AttributesSink.Create(), Keep.Right)
.WithAttributes(Attributes.CreateName("new-name"))
.Run(Materializer);

task.AwaitResult().GetAttribute<Attributes.Name>().Value.Should().Contain("attributesSink");
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.GetAttribute<Attributes.Name>().Value.Should().Contain("attributesSink");
}

[Fact]
Expand Down
37 changes: 22 additions & 15 deletions src/core/Akka.Streams.Tests/Dsl/FlowAggregateAsyncSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,41 +58,45 @@ private static Sink<int, Task<int>> AggregateSink
[Fact]
public void A_AggregateAsync_must_work_when_using_Source_AggregateAsync()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = AggregateSource.RunWith(Sink.First<int>(), Materializer);
task.AwaitResult().Should().Be(Expected);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does AwaitResult do and is it different than ShouldCompleteWithin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"warning CS0618: 'Utils.AwaitResult(Task, TimeSpan?)' is obsolete: 'Use ShouldCompleteWithin instead'"

var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

[Fact]
public void A_AggregateAsync_must_work_when_using_Sink_AggregateAsync()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = InputSource.RunWith(AggregateSink, Materializer);
task.AwaitResult().Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void A_AggregateAsync_must_work_when_using_Flow_AggregateAsync()
{
var flowTimeout = TimeSpan.FromMilliseconds(FlowDelayInMs*Input.Count()) + TimeSpan.FromSeconds(3);
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = InputSource.Via(AggregateFlow).RunWith(Sink.First<int>(), Materializer);
task.AwaitResult(flowTimeout).Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(flowTimeout);
complete.Should().Be(Expected);
}, Materializer);
}

[Fact]
public void A_AggregateAsync_must_work_when_using_Source_AggregateAsync_and_Flow_AggregateAsync_and_Sink_AggregateAsync()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = AggregateSource.Via(AggregateFlow).RunWith(AggregateSink, Materializer);
task.AwaitResult().Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

Expand Down Expand Up @@ -279,9 +283,9 @@ await this.AssertAllStagesStoppedAsync(async () =>
[Fact]
public void A_AggregateAsync_must_finish_after_task_failure()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
Source.From(Enumerable.Range(1, 3)).AggregateAsync(1, (_, n) => Task.Run(() =>
var complete = Source.From(Enumerable.Range(1, 3)).AggregateAsync(1, (_, n) => Task.Run(() =>
{
if (n == 3)
throw new Exception("err3b");
Expand All @@ -290,7 +294,8 @@ public void A_AggregateAsync_must_finish_after_task_failure()
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.Grouped(10)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer)
.AwaitResult().Should().BeEquivalentTo(2);
.ShouldCompleteWithin(3.Seconds());
complete.Should().BeEquivalentTo(2);
}, Materializer);
}

Expand Down Expand Up @@ -417,22 +422,24 @@ public void A_AggregateAsync_must_handle_cancel_properly()
[Fact]
public void A_AggregateAsync_must_complete_task_and_return_zero_given_an_empty_stream()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = Source.From(Enumerable.Empty<int>())
.RunAggregateAsync(0, (acc, element) => Task.FromResult(acc + element), Materializer);
task.AwaitResult(RemainingOrDefault).ShouldBe(0);
var complete = await task.ShouldCompleteWithin(RemainingOrDefault);
complete.ShouldBe(0);
}, Materializer);
}

[Fact]
public void A_AggregateAsync_must_complete_task_and_return_zero_and_item_given_a_stream_of_one_item()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = Source.Single(100)
.RunAggregateAsync(5, (acc, element) => Task.FromResult(acc + element), Materializer);
task.AwaitResult(RemainingOrDefault).ShouldBe(105);
var complete = await task.ShouldCompleteWithin(RemainingOrDefault);
complete.ShouldBe(105);
}, Materializer);
}
}
Expand Down
42 changes: 26 additions & 16 deletions src/core/Akka.Streams.Tests/Dsl/FlowAggregateSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
using Akka.Streams.TestKit;
using Akka.TestKit;
using FluentAssertions;
using Akka.TestKit.Extensions;
using Xunit;
using Xunit.Abstractions;
using FluentAssertions.Extensions;

namespace Akka.Streams.Tests.Dsl
{
Expand All @@ -38,50 +40,55 @@ public FlowAggregateSpec(ITestOutputHelper helper) : base(helper)
[Fact]
public void A_Aggregate_must_work_when_using_Source_RunAggregate()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = InputSource.RunAggregate(0, (sum, i) => sum + i, Materializer);
task.AwaitResult().Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

[Fact]
public void A_Aggregate_must_work_when_using_Source_Aggregate()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = AggregateSource.RunWith(Sink.First<int>(), Materializer);
task.AwaitResult().Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

[Fact]
public void A_Aggregate_must_work_when_using_Sink_Aggregate()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = InputSource.RunWith(AggregateSink, Materializer);
task.AwaitResult().Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

[Fact]
public void A_Aggregate_must_work_when_using_Flow_Aggregate()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = InputSource.Via(AggregateFlow).RunWith(Sink.First<int>(), Materializer);
task.AwaitResult().Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

[Fact]
public void A_Aggregate_must_work_when_using_Source_Aggregate_and_Flow_Aggregate_and_Sink_Aggregate()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = AggregateSource.Via(AggregateFlow).RunWith(AggregateSink, Materializer);
task.AwaitResult().Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

Expand Down Expand Up @@ -129,7 +136,7 @@ public void
[Fact]
public void A_Aggregate_must_resume_with_the_accumulated_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_resume()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var error = new Exception("boom");
var aggregate = Sink.Aggregate(0, (int x, int y) =>
Expand All @@ -142,14 +149,15 @@ public void A_Aggregate_must_resume_with_the_accumulated_state_when_the_aggregat
var task = InputSource.RunWith(
aggregate.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)),
Materializer);
task.AwaitResult().Should().Be(Expected - 50);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected - 50);
}, Materializer);
}

[Fact]
public void A_Aggregate_must_resume_and_reset_the_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_restart()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var error = new Exception("boom");
var aggregate = Sink.Aggregate(0, (int x, int y) =>
Expand All @@ -162,18 +170,20 @@ public void A_Aggregate_must_resume_and_reset_the_state_when_the_aggregating_fun
var task = InputSource.RunWith(
aggregate.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)),
Materializer);
task.AwaitResult().Should().Be(Enumerable.Range(51, 50).Sum());
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Enumerable.Range(51, 50).Sum());
}, Materializer);
}

[Fact]
public void A_Aggregate_must_complete_task_and_return_zero_given_an_empty_stream()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = Source.From(Enumerable.Empty<int>())
.RunAggregate(0, (acc, element) => acc + element, Materializer);
task.AwaitResult().ShouldBe(0);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(0);
}, Materializer);
}
}
Expand Down
13 changes: 9 additions & 4 deletions src/core/Akka.Streams.Tests/Dsl/FlowOrElseSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
using System;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit.Extensions;
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using System.Threading.Tasks;
using FluentAssertions.Extensions;

namespace Akka.Streams.Tests.Dsl
{
Expand All @@ -28,25 +31,27 @@ public FlowOrElseSpec(ITestOutputHelper helper) : base(helper)
private ActorMaterializer Materializer { get; }

[Fact]
public void An_OrElse_flow_should_pass_elements_from_the_first_input()
public async Task An_OrElse_flow_should_pass_elements_from_the_first_input()
{
var source1 = Source.From(new[] {1, 2, 3});
var source2 = Source.From(new[] {4, 5, 6});

var sink = Sink.Seq<int>();

source1.OrElse(source2).RunWith(sink, Materializer).AwaitResult().Should().BeEquivalentTo(new[] {1, 2, 3});
var complete = await source1.OrElse(source2).RunWith(sink, Materializer).ShouldCompleteWithin(3.Seconds());
complete.Should().BeEquivalentTo(new[] { 1, 2, 3 });
}

[Fact]
public void An_OrElse_flow_should_pass_elements_from_the_second_input_if_the_first_completes_with_no_elements_emitted()
public async Task An_OrElse_flow_should_pass_elements_from_the_second_input_if_the_first_completes_with_no_elements_emitted()
{
var source1 = Source.Empty<int>();
var source2 = Source.From(new[] { 4, 5, 6 });

var sink = Sink.Seq<int>();

source1.OrElse(source2).RunWith(sink, Materializer).AwaitResult().Should().BeEquivalentTo(new[] { 4, 5, 6 });
var complete = await source1.OrElse(source2).RunWith(sink, Materializer).ShouldCompleteWithin(3.Seconds());
complete.Should().BeEquivalentTo(new[] { 4, 5, 6 });
}

[Fact]
Expand Down
7 changes: 5 additions & 2 deletions src/core/Akka.Streams.Tests/Dsl/FlowScanAsyncSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
using Akka.Streams.TestKit;
using Akka.TestKit;
using Akka.TestKit.Xunit2.Attributes;
using Akka.TestKit.Extensions;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using Decider = Akka.Streams.Supervision.Decider;
using FluentAssertions.Extensions;

namespace Akka.Streams.Tests.Dsl
{
Expand Down Expand Up @@ -58,14 +60,15 @@ public void A_ScanAsync_must_work_with_a_single_source()
}

[Fact]
public void A_ScanAsync_must_work_with_a_large_source()
public async Task A_ScanAsync_must_work_with_a_large_source()
{
var elements = Enumerable.Range(1, 100000).Select(i => (long)i).ToList();
var expectedSum = elements.Sum();
var eventualActual = Source.From(elements)
.ScanAsync(0L, (l, l1) => Task.FromResult(l + l1))
.RunWith(Sink.Last<long>(), Materializer);
eventualActual.AwaitResult().ShouldBe(expectedSum);
var complete = await eventualActual.ShouldCompleteWithin(3.Seconds());
complete.ShouldBe(expectedSum);
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
Expand Down
Loading