Skip to content

Commit 947e1ed

Browse files
authored
ForEachAsync optimization, added missing SinkForeachAsyncSpec (#6538)
1 parent 1a03706 commit 947e1ed

File tree

6 files changed

+281
-5
lines changed

6 files changed

+281
-5
lines changed

docs/articles/streams/builtinstages.md

+14-4
Original file line numberDiff line numberDiff line change
@@ -295,11 +295,11 @@ if more element are emitted the sink will cancel the stream
295295

296296
**cancels** If too many values are collected
297297

298-
### Foreach
298+
### ForEach
299299

300300
Invoke a given procedure for each element received. Note that it is not safe to mutate shared state from the procedure.
301301

302-
The sink materializes into a ``Task`` which completes when the
302+
The sink materializes into a ``Task<Done>`` which completes when the
303303
stream completes, or fails if the stream fails.
304304

305305
Note that it is not safe to mutate state from the procedure.
@@ -308,9 +308,19 @@ Note that it is not safe to mutate state from the procedure.
308308

309309
**backpressures** when the previous procedure invocation has not yet completed
310310

311-
### ForeachParallel
311+
### ForEachASync
312312

313-
Like ``Foreach`` but allows up to ``parallellism`` procedure calls to happen in parallel.
313+
Invoke a given procedure asynchronously for each element received. Note that if shared state is mutated from the procedure that must be done in a thread-safe way.
314+
315+
The sink materializes into a ``Task<Done>`` which completes when the stream completes, or fails if the stream fails.
316+
317+
**cancels** when a ``Task`` fails
318+
319+
**backpressures** when the number of ``Task``s reaches the configured parallelism
320+
321+
### ForEachParallel
322+
323+
Like ``ForEach`` but allows up to ``parallellism`` procedure calls to happen in parallel.
314324

315325
**cancels** never
316326

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt

+2
Original file line numberDiff line numberDiff line change
@@ -1960,6 +1960,8 @@ namespace Akka.Streams.Dsl
19601960
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
19611961
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
19621962
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachAsync<TIn>(int parallelism, System.Func<TIn, System.Threading.Tasks.Task> action) { }
1963+
[System.ObsoleteAttribute("Use `ForEachAsync` instead, it allows you to choose how to run the procedure, by " +
1964+
"calling some other API returning a Task or using Task.Run. Obsolete since 1.5.1")]
19631965
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
19641966
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
19651967
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt

+2
Original file line numberDiff line numberDiff line change
@@ -1960,6 +1960,8 @@ namespace Akka.Streams.Dsl
19601960
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
19611961
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
19621962
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachAsync<TIn>(int parallelism, System.Func<TIn, System.Threading.Tasks.Task> action) { }
1963+
[System.ObsoleteAttribute("Use `ForEachAsync` instead, it allows you to choose how to run the procedure, by " +
1964+
"calling some other API returning a Task or using Task.Run. Obsolete since 1.5.2")]
19631965
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
19641966
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
19651967
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt

+2
Original file line numberDiff line numberDiff line change
@@ -1960,6 +1960,8 @@ namespace Akka.Streams.Dsl
19601960
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
19611961
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
19621962
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachAsync<TIn>(int parallelism, System.Func<TIn, System.Threading.Tasks.Task> action) { }
1963+
[System.ObsoleteAttribute("Use `ForEachAsync` instead, it allows you to choose how to run the procedure, by " +
1964+
"calling some other API returning a Task or using Task.Run. Obsolete since 1.5.2")]
19631965
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
19641966
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
19651967
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="SinkForeachAsyncSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Collections.Generic;
10+
using System.Linq;
11+
using System.Threading.Tasks;
12+
using Akka.Actor;
13+
using Akka.Streams.Dsl;
14+
using Akka.Streams.Supervision;
15+
using Akka.Streams.TestKit;
16+
using Akka.TestKit;
17+
using Akka.TestKit.Extensions;
18+
using Akka.Util.Internal;
19+
using FluentAssertions;
20+
using Nito.AsyncEx;
21+
using Xunit;
22+
using Xunit.Abstractions;
23+
24+
namespace Akka.Streams.Tests.Dsl
25+
{
26+
public class SinkForeachAsyncSpec : AkkaSpec
27+
{
28+
private ActorMaterializer Materializer { get; }
29+
30+
public SinkForeachAsyncSpec(ITestOutputHelper helper) : base(helper)
31+
{
32+
var settings = ActorMaterializerSettings.Create(Sys);
33+
Materializer = ActorMaterializer.Create(Sys, settings);
34+
}
35+
36+
[Fact]
37+
public async Task A_ForeachAsync_must_handle_empty_source()
38+
{
39+
var p = Source.From(new List<int>()).RunWith(Sink.ForEachAsync<int>(3, _ => Task.CompletedTask), Materializer);
40+
(await p.ShouldCompleteWithin(RemainingOrDefault)).Should().Be(Done.Instance);
41+
}
42+
43+
[Fact]
44+
public async Task A_ForeachAsync_must_be_able_to_run_elements_in_parallel()
45+
{
46+
var probe = CreateTestProbe();
47+
var latch = Enumerable.Range(1, 4)
48+
.Select(i => (i, new TestLatch(1)))
49+
.ToDictionary(t => t.i, t => t.Item2);
50+
51+
var sink = Sink.ForEachAsync<int>(4, n =>
52+
{
53+
latch[n].Ready(RemainingOrDefault);
54+
probe.Ref.Tell(n);
55+
return Task.CompletedTask;
56+
});
57+
58+
var p = Source.From(Enumerable.Range(1, 4)).RunWith(sink, Materializer);
59+
60+
latch[1].CountDown();
61+
probe.ExpectMsg(1);
62+
latch[2].CountDown();
63+
probe.ExpectMsg(2);
64+
latch[3].CountDown();
65+
probe.ExpectMsg(3);
66+
latch[4].CountDown();
67+
probe.ExpectMsg(4);
68+
69+
(await p.ShouldCompleteWithin(TimeSpan.FromSeconds(4))).Should().Be(Done.Instance);
70+
}
71+
72+
[Fact]
73+
public async Task A_ForeachAsync_must_back_pressure_upstream_elements_when_downstream_is_slow()
74+
{
75+
var probe = CreateTestProbe();
76+
var latch = Enumerable.Range(1, 4)
77+
.Select(i => (i, new TestLatch(1)))
78+
.ToDictionary(t => t.i, t => t.Item2);
79+
80+
var sink = Sink.ForEachAsync<Func<int>>(1, async n =>
81+
{
82+
latch[n()].Ready(RemainingOrDefault);
83+
probe.Ref.Tell(n());
84+
await Task.Delay(2000);
85+
});
86+
87+
var oneCalled = false;
88+
var twoCalled = false;
89+
var threeCalled = false;
90+
var fourCalled = false;
91+
92+
int One()
93+
{
94+
oneCalled = true;
95+
return 1;
96+
}
97+
98+
int Two()
99+
{
100+
twoCalled = true;
101+
return 2;
102+
}
103+
104+
int Three()
105+
{
106+
threeCalled = true;
107+
return 3;
108+
}
109+
110+
int Four()
111+
{
112+
fourCalled = true;
113+
return 4;
114+
}
115+
116+
var p = Source.From(new List<Func<int>> { One, Two, Three, Four }).RunWith(sink, Materializer);
117+
118+
latch[1].CountDown();
119+
probe.ExpectMsg(1);
120+
121+
twoCalled.ShouldBeFalse();
122+
threeCalled.ShouldBeFalse();
123+
fourCalled.ShouldBeFalse();
124+
125+
probe.ExpectNoMsg(TimeSpan.FromSeconds(2));
126+
127+
latch[2].CountDown();
128+
probe.ExpectMsg(2);
129+
130+
threeCalled.ShouldBeFalse();
131+
fourCalled.ShouldBeFalse();
132+
133+
probe.ExpectNoMsg(TimeSpan.FromSeconds(2));
134+
135+
latch[3].CountDown();
136+
probe.ExpectMsg(3);
137+
138+
fourCalled.ShouldBeFalse();
139+
140+
probe.ExpectNoMsg(TimeSpan.FromSeconds(2));
141+
142+
latch[4].CountDown();
143+
probe.ExpectMsg(4);
144+
145+
(await p.ShouldCompleteWithin(TimeSpan.FromSeconds(4))).Should().Be(Done.Instance);
146+
147+
oneCalled.ShouldBeTrue();
148+
twoCalled.ShouldBeTrue();
149+
threeCalled.ShouldBeTrue();
150+
fourCalled.ShouldBeTrue();
151+
}
152+
153+
[Fact]
154+
public async Task A_ForeachAsync_must_produce_elements_in_the_order_they_are_ready()
155+
{
156+
var probe = CreateTestProbe();
157+
var latch = Enumerable.Range(1, 4)
158+
.Select(i => (i, new AsyncCountdownEvent(1)))
159+
.ToDictionary(t => t.i, t => t.Item2);
160+
var p = Source.From(Enumerable.Range(1, 4)).RunWith(Sink.ForEachAsync<int>(4, async n =>
161+
{
162+
await latch[n].WaitAsync().ShouldCompleteWithin(TimeSpan.FromSeconds(5));
163+
probe.Ref.Tell(n);
164+
}), Materializer);
165+
166+
latch[2].Signal();
167+
probe.ExpectMsg(2);
168+
latch[4].Signal();
169+
probe.ExpectMsg(4);
170+
latch[3].Signal();
171+
probe.ExpectMsg(3);
172+
173+
p.IsCompleted.ShouldBeFalse();
174+
175+
latch[1].Signal();
176+
probe.ExpectMsg(1);
177+
178+
(await p.ShouldCompleteWithin(TimeSpan.FromSeconds(4))).Should().Be(Done.Instance);
179+
}
180+
181+
[Fact]
182+
public async Task A_ForeachAsync_must_not_run_more_functions_in_parallel_then_specified()
183+
{
184+
var probe = CreateTestProbe();
185+
var latch = Enumerable.Range(1, 5)
186+
.Select(i => (i, new AsyncCountdownEvent(1)))
187+
.ToDictionary(t => t.i, t => t.Item2);
188+
var p = Source.From(Enumerable.Range(1, 5)).RunWith(Sink.ForEachAsync<int>(4, async n =>
189+
{
190+
probe.Ref.Tell(n);
191+
await latch[n].WaitAsync().ShouldCompleteWithin(TimeSpan.FromSeconds(5));
192+
}), Materializer);
193+
194+
probe.ExpectMsgAllOf(new[] { 1, 2, 3, 4 });
195+
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
196+
197+
p.IsCompleted.Should().BeFalse();
198+
199+
Enumerable.Range(1, 4).ForEach(i => latch[i].Signal());
200+
201+
latch[5].Signal();
202+
probe.ExpectMsg(5);
203+
204+
(await p.ShouldCompleteWithin(TimeSpan.FromSeconds(5))).Should().Be(Done.Instance);
205+
}
206+
207+
[Fact]
208+
public async Task A_ForeachAsync_must_resume_after_function_failure()
209+
{
210+
var probe = CreateTestProbe();
211+
var latch = new AsyncCountdownEvent(1);
212+
213+
var p = Source.From(Enumerable.Range(1, 5)).RunWith(Sink.ForEachAsync<int>(4, async n =>
214+
{
215+
if (n == 3)
216+
throw new TestException("err1");
217+
218+
probe.Ref.Tell(n);
219+
await latch.WaitAsync().ShouldCompleteWithin(TimeSpan.FromSeconds(10));
220+
}).WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)), Materializer);
221+
222+
latch.Signal();
223+
probe.ExpectMsgAllOf(new[] { 1, 2, 4, 5 });
224+
225+
(await p.ShouldCompleteWithin(TimeSpan.FromSeconds(5))).Should().Be(Done.Instance);
226+
}
227+
228+
[Fact]
229+
public void A_ForeachAsync_must_finish_after_function_failure()
230+
{
231+
var probe = CreateTestProbe();
232+
var element4Latch = new AsyncCountdownEvent(1);
233+
var errorLatch = new AsyncCountdownEvent(2);
234+
235+
var p = Source.From(Enumerable.Range(1, int.MaxValue)).RunWith(Sink.ForEachAsync<int>(3, async n =>
236+
{
237+
if (n == 3)
238+
{
239+
// Error will happen only after elements 1, 2 has been processed
240+
await errorLatch.WaitAsync().ShouldCompleteWithin(TimeSpan.FromSeconds(5));
241+
throw new TestException("err2");
242+
}
243+
244+
probe.Ref.Tell(n);
245+
errorLatch.Signal();
246+
await element4Latch.WaitAsync().ShouldCompleteWithin(TimeSpan.FromSeconds(5)); // Block element 4, 5, 6, ... from entering
247+
}).WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.StoppingDecider)), Materializer);
248+
249+
// Only the first two messages are guaranteed to arrive due to their enforced ordering related to the time
250+
// of failure.
251+
probe.ExpectMsgAllOf(new[] { 1, 2 });
252+
element4Latch.Signal(); // Release elements 4, 5, 6, ...
253+
254+
var ex = p.Invoking(t => t.Wait(TimeSpan.FromSeconds(1))).Should().Throw<AggregateException>().Which;
255+
ex.Flatten().InnerException.Should().BeOfType<TestException>();
256+
ex.Flatten().InnerException.Message.Should().Be("err2");
257+
}
258+
}
259+
}

src/core/Akka.Streams/Dsl/Sink.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ public static Sink<TIn, Task<Done>> ForEach<TIn>(Action<TIn> action) => Flow.Cre
325325
/// <param name="action">Async function delegate to be executed on all elements</param>
326326
/// <returns>TBD</returns>
327327
public static Sink<TIn, Task<Done>> ForEachAsync<TIn>(int parallelism, Func<TIn, Task> action) => Flow.Create<TIn>()
328-
.SelectAsync(parallelism, async input =>
328+
.SelectAsyncUnordered(parallelism, async input =>
329329
{
330330
await action(input);
331331
return NotUsed.Instance;
@@ -375,6 +375,7 @@ public static Sink<TIn, NotUsed> Combine<TIn, TOut, TMat>(Func<int, IGraph<Unifo
375375
/// <param name="parallelism">TBD</param>
376376
/// <param name="action">TBD</param>
377377
/// <returns>TBD</returns>
378+
[Obsolete("Use `ForEachAsync` instead, it allows you to choose how to run the procedure, by calling some other API returning a Task or using Task.Run. Obsolete since 1.5.2")]
378379
public static Sink<TIn, Task<Done>> ForEachParallel<TIn>(int parallelism, Action<TIn> action) => Flow.Create<TIn>()
379380
.SelectAsyncUnordered(parallelism, input => Task.Run(() =>
380381
{

0 commit comments

Comments
 (0)