Skip to content

Commit 3156272

Browse files
Akka:Streams Resolve IAsyncEnumerator.DisposeAsync bug (#6290)
* Upgraded Akka.Streams and Akka.Streams.Tests to C# 9 * added reproduction for #6280 * close #6280 * added compiler directive back to fix compilation issues on Linux * added comment * bump CI
1 parent 1974404 commit 3156272

File tree

4 files changed

+63
-38
lines changed

4 files changed

+63
-38
lines changed

src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
<PropertyGroup>
66
<AssemblyName>Akka.Streams.Tests</AssemblyName>
77
<TargetFrameworks>$(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion)</TargetFrameworks>
8+
<LangVersion>9</LangVersion>
89
</PropertyGroup>
910

1011
<ItemGroup>

src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs

Lines changed: 60 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,19 @@
1717
using Xunit;
1818
using Xunit.Abstractions;
1919
using System.Collections.Generic;
20-
using Akka.Actor;
21-
using Akka.Streams.Actors;
2220
using Akka.Streams.TestKit.Tests;
23-
using Akka.Streams.Tests.Actor;
24-
using Reactive.Streams;
2521
using System.Runtime.CompilerServices;
2622
using Akka.Util;
2723
using FluentAssertions.Extensions;
2824

2925
namespace Akka.Streams.Tests.Dsl
3026
{
31-
#if NETCOREAPP
27+
#if !NETFRAMEWORK // disabling this causes .NET Framework 4.7.2 builds to fail on Linux
3228
public class AsyncEnumerableSpec : AkkaSpec
3329
{
3430
private ActorMaterializer Materializer { get; }
3531
private ITestOutputHelper _helper;
32+
3633
public AsyncEnumerableSpec(ITestOutputHelper helper) : base(
3734
AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig),
3835
helper)
@@ -50,7 +47,7 @@ public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
5047

5148
var cts = new CancellationTokenSource();
5249
var token = cts.Token;
53-
50+
5451
var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
5552
var output = input.ToArray();
5653
bool caught = false;
@@ -65,7 +62,7 @@ public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
6562
{
6663
caught = true;
6764
}
68-
65+
6966
caught.ShouldBeTrue();
7067
}
7168

@@ -80,6 +77,7 @@ public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_S
8077
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
8178
output = output.Skip(1).ToArray();
8279
}
80+
8381
output.Length.ShouldBe(0, "Did not receive all elements!");
8482
}
8583

@@ -94,14 +92,16 @@ public async Task RunAsAsyncEnumerable_must_allow_multiple_enumerations()
9492
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
9593
output = output.Skip(1).ToArray();
9694
}
95+
9796
output.Length.ShouldBe(0, "Did not receive all elements!");
98-
97+
9998
output = input.ToArray();
10099
await foreach (var a in asyncEnumerable)
101100
{
102101
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
103102
output = output.Skip(1).ToArray();
104103
}
104+
105105
output.Length.ShouldBe(0, "Did not receive all elements in second enumeration!!");
106106
}
107107

@@ -112,7 +112,7 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
112112
var materializer = ActorMaterializer.Create(Sys);
113113
var probe = this.CreatePublisherProbe<int>();
114114
var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer);
115-
115+
116116
var a = Task.Run(async () =>
117117
{
118118
await foreach (var notused in task)
@@ -140,7 +140,7 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
140140

141141
thrown.ShouldBeTrue();
142142
}
143-
143+
144144
[Fact]
145145
public async Task RunAsAsyncEnumerable_Throws_if_materializer_gone_before_Enumeration()
146146
{
@@ -155,7 +155,7 @@ async Task ShouldThrow()
155155
{
156156
}
157157
}
158-
158+
159159
await Assert.ThrowsAsync<IllegalStateException>(ShouldThrow);
160160
}
161161

@@ -187,7 +187,7 @@ public void AsyncEnumerableSource_Must_Process_All_Elements()
187187
subscription.Request(101);
188188

189189
subscriber.ExpectNextN(Enumerable.Range(0, 100));
190-
190+
191191
subscriber.ExpectComplete();
192192
}
193193

@@ -206,7 +206,7 @@ public void AsyncEnumerableSource_Must_Process_Source_That_Immediately_Throws()
206206
subscriber.ExpectNextN(Enumerable.Range(0, 50));
207207

208208
var exception = subscriber.ExpectError();
209-
209+
210210
// Exception should be automatically unrolled, this SHOULD NOT be AggregateException
211211
exception.Should().BeOfType<TestException>();
212212
exception.Message.Should().Be("BOOM!");
@@ -231,13 +231,54 @@ public async Task AsyncEnumerableSource_Must_Cancel_Running_Source_If_Downstream
231231
await WithinAsync(3.Seconds(), async () => latch.Value);
232232
}
233233

234-
private static async IAsyncEnumerable<int> RangeAsync(int start, int count,
234+
/// <summary>
235+
/// Reproduction for https://github.com/akkadotnet/akka.net/issues/6280
236+
/// </summary>
237+
[Fact]
238+
public async Task AsyncEnumerableSource_BugFix6280()
239+
{
240+
async IAsyncEnumerable<int> GenerateInts()
241+
{
242+
foreach (var i in Enumerable.Range(0, 100))
243+
{
244+
if (i > 50)
245+
await Task.Delay(1000);
246+
yield return i;
247+
}
248+
}
249+
250+
var source = Source.From(GenerateInts);
251+
var subscriber = this.CreateManualSubscriberProbe<int>();
252+
253+
await EventFilter.Warning().ExpectAsync(0, async () =>
254+
{
255+
var mat = source
256+
.WatchTermination(Keep.Right)
257+
.ToMaterialized(Sink.FromSubscriber(subscriber), Keep.Left);
258+
259+
#pragma warning disable CS4014
260+
var task = mat.Run(Materializer);
261+
#pragma warning restore CS4014
262+
263+
var subscription = subscriber.ExpectSubscription();
264+
subscription.Request(50);
265+
subscriber.ExpectNextN(Enumerable.Range(0, 50));
266+
subscription.Request(10); // the iterator is going to start delaying 1000ms per item here
267+
subscription.Cancel();
268+
269+
270+
// The cancellation token inside the IAsyncEnumerable should be cancelled
271+
await task;
272+
});
273+
}
274+
275+
private static async IAsyncEnumerable<int> RangeAsync(int start, int count,
235276
[EnumeratorCancellation] CancellationToken token = default)
236277
{
237278
foreach (var i in Enumerable.Range(start, count))
238279
{
239280
await Task.Delay(10, token);
240-
if(token.IsCancellationRequested)
281+
if (token.IsCancellationRequested)
241282
yield break;
242283
yield return i;
243284
}
@@ -248,33 +289,28 @@ private static async IAsyncEnumerable<int> ThrowingRangeAsync(int start, int cou
248289
{
249290
foreach (var i in Enumerable.Range(start, count))
250291
{
251-
if(token.IsCancellationRequested)
292+
if (token.IsCancellationRequested)
252293
yield break;
253294

254295
if (i == throwAt)
255296
throw new TestException("BOOM!");
256-
297+
257298
yield return i;
258299
}
259300
}
260301

261302
private static async IAsyncEnumerable<int> ProbeableRangeAsync(int start, int count, AtomicBoolean latch,
262303
[EnumeratorCancellation] CancellationToken token = default)
263304
{
264-
token.Register(() =>
265-
{
266-
latch.GetAndSet(true);
267-
});
305+
token.Register(() => { latch.GetAndSet(true); });
268306
foreach (var i in Enumerable.Range(start, count))
269307
{
270-
if(token.IsCancellationRequested)
308+
if (token.IsCancellationRequested)
271309
yield break;
272310

273311
yield return i;
274312
}
275313
}
276-
277314
}
278-
#else
279315
#endif
280316
}

src/core/Akka.Streams/Akka.Streams.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
<TargetFrameworks>$(NetStandardLibVersion)</TargetFrameworks>
88
<PackageTags>$(AkkaPackageTags);reactive;stream</PackageTags>
99
<GenerateDocumentationFile>true</GenerateDocumentationFile>
10+
<LangVersion>9</LangVersion>
1011
</PropertyGroup>
1112
<ItemGroup>
1213
<EmbeddedResource Include="reference.conf" />

src/core/Akka.Streams/Implementation/Fusing/Ops.cs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3816,20 +3816,7 @@ public override void OnDownstreamFinish()
38163816
{
38173817
_completionCts.Cancel();
38183818
_completionCts.Dispose();
3819-
3820-
try
3821-
{
3822-
_enumerator.DisposeAsync().GetAwaiter().GetResult();
3823-
}
3824-
catch (Exception ex)
3825-
{
3826-
Log.Warning(ex, "Failed to dispose IAsyncEnumerator asynchronously");
3827-
}
3828-
finally
3829-
{
3830-
CompleteStage();
3831-
base.OnDownstreamFinish();
3832-
}
3819+
CompleteStage();
38333820
base.OnDownstreamFinish();
38343821
}
38353822

0 commit comments

Comments
 (0)