Skip to content

Commit 5fa8ef4

Browse files
Added Source/Flow Setup operator (#6788)
Co-authored-by: Aaron Stannard <[email protected]>
1 parent 74c59c5 commit 5fa8ef4

File tree

8 files changed

+431
-1
lines changed

8 files changed

+431
-1
lines changed

docs/articles/streams/builtinstages.md

+14
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,13 @@ Combine the elements of multiple streams into a stream of sequences using a comb
231231

232232
**completes** when any upstream completes
233233

234+
### Setup
235+
236+
Defer the creation of a `Source` until materialization and access `ActorMaterializer` and `Attributes`.
237+
238+
Typically used when access to materializer is needed to run a different stream during the construction of a source/flow.
239+
Can also be used to access the underlying `ActorSystem` from `ActorMaterializer`.
240+
234241
## Sink Stages
235242

236243
These built-in sinks are available from ``Akka.Stream.DSL.Sink``:
@@ -615,6 +622,13 @@ Just like `Scan` but receiving a function that results in a `Task` to the next v
615622

616623
**completes** when upstream completes and the last `Task` is resolved
617624

625+
### Setup
626+
627+
Defer the creation of a `Flow` until materialization and access `ActorMaterializer` and `Attributes`.
628+
629+
Typically used when access to materializer is needed to run a different stream during the construction of a source/flow.
630+
Can also be used to access the underlying `ActorSystem` from `ActorMaterializer`.
631+
618632
### Aggregate
619633

620634
Start with current value ``zero`` and then apply the current and next value to the given function, when upstream

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt

+21
Original file line numberDiff line numberDiff line change
@@ -1353,6 +1353,7 @@ namespace Akka.Streams.Dsl
13531353
public static Akka.Streams.Dsl.Flow<T, T, Akka.NotUsed> Identity<T>() { }
13541354
public static Akka.Streams.Dsl.Flow<T, T, TMat> Identity<T, TMat>() { }
13551355
public static Akka.Streams.Dsl.Flow<TIn, TOut, System.Threading.Tasks.Task<Akka.Util.Option<TMat>>> LazyInitAsync<TIn, TOut, TMat>(System.Func<System.Threading.Tasks.Task<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>>> flowFactory) { }
1356+
public static Akka.Streams.Dsl.Flow<TIn, TOut, System.Threading.Tasks.Task<TMat>> Setup<TIn, TOut, TMat>(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> factory) { }
13561357
}
13571358
public class static FlowOperations
13581359
{
@@ -2033,6 +2034,7 @@ namespace Akka.Streams.Dsl
20332034
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Never<T>() { }
20342035
public static Akka.Streams.Dsl.Source<T, Akka.Streams.ISourceQueueWithComplete<T>> Queue<T>(int bufferSize, Akka.Streams.OverflowStrategy overflowStrategy) { }
20352036
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Repeat<T>(T element) { }
2037+
public static Akka.Streams.Dsl.Source<T, System.Threading.Tasks.Task<TMat>> Setup<T, TMat>(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Source<T, TMat>> factory) { }
20362038
public static Akka.Streams.SourceShape<T> Shape<T>(string name) { }
20372039
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Single<T>(T element) { }
20382040
public static Akka.Streams.Dsl.Source<T, Akka.Actor.ICancelable> Tick<T>(System.TimeSpan initialDelay, System.TimeSpan interval, T tick) { }
@@ -3582,6 +3584,25 @@ namespace Akka.Streams.Implementation
35823584
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<System.Collections.Immutable.IImmutableList<T>>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
35833585
public override string ToString() { }
35843586
}
3587+
[Akka.Annotations.InternalApiAttribute()]
3588+
public sealed class SetupFlowStage<TIn, TOut, TMat> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.FlowShape<TIn, TOut>, System.Threading.Tasks.Task<TMat>>
3589+
{
3590+
public SetupFlowStage(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> factory) { }
3591+
public Akka.Streams.Inlet<TIn> In { get; }
3592+
protected override Akka.Streams.Attributes InitialAttributes { get; }
3593+
public Akka.Streams.Outlet<TOut> Out { get; }
3594+
public override Akka.Streams.FlowShape<TIn, TOut> Shape { get; }
3595+
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<TMat>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
3596+
}
3597+
[Akka.Annotations.InternalApiAttribute()]
3598+
public sealed class SetupSourceStage<TOut, TMat> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SourceShape<TOut>, System.Threading.Tasks.Task<TMat>>
3599+
{
3600+
public SetupSourceStage(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Source<TOut, TMat>> factory) { }
3601+
protected override Akka.Streams.Attributes InitialAttributes { get; }
3602+
public Akka.Streams.Outlet<TOut> Out { get; }
3603+
public override Akka.Streams.SourceShape<TOut> Shape { get; }
3604+
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<TMat>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
3605+
}
35853606
public class SignalThrewException : Akka.Pattern.IllegalStateException, Akka.Streams.Implementation.ISpecViolation
35863607
{
35873608
public SignalThrewException(string message, System.Exception cause) { }

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt

+21
Original file line numberDiff line numberDiff line change
@@ -1353,6 +1353,7 @@ namespace Akka.Streams.Dsl
13531353
public static Akka.Streams.Dsl.Flow<T, T, Akka.NotUsed> Identity<T>() { }
13541354
public static Akka.Streams.Dsl.Flow<T, T, TMat> Identity<T, TMat>() { }
13551355
public static Akka.Streams.Dsl.Flow<TIn, TOut, System.Threading.Tasks.Task<Akka.Util.Option<TMat>>> LazyInitAsync<TIn, TOut, TMat>(System.Func<System.Threading.Tasks.Task<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>>> flowFactory) { }
1356+
public static Akka.Streams.Dsl.Flow<TIn, TOut, System.Threading.Tasks.Task<TMat>> Setup<TIn, TOut, TMat>(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> factory) { }
13561357
}
13571358
public class static FlowOperations
13581359
{
@@ -2033,6 +2034,7 @@ namespace Akka.Streams.Dsl
20332034
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Never<T>() { }
20342035
public static Akka.Streams.Dsl.Source<T, Akka.Streams.ISourceQueueWithComplete<T>> Queue<T>(int bufferSize, Akka.Streams.OverflowStrategy overflowStrategy) { }
20352036
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Repeat<T>(T element) { }
2037+
public static Akka.Streams.Dsl.Source<T, System.Threading.Tasks.Task<TMat>> Setup<T, TMat>(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Source<T, TMat>> factory) { }
20362038
public static Akka.Streams.SourceShape<T> Shape<T>(string name) { }
20372039
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Single<T>(T element) { }
20382040
public static Akka.Streams.Dsl.Source<T, Akka.Actor.ICancelable> Tick<T>(System.TimeSpan initialDelay, System.TimeSpan interval, T tick) { }
@@ -3582,6 +3584,25 @@ namespace Akka.Streams.Implementation
35823584
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<System.Collections.Immutable.IImmutableList<T>>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
35833585
public override string ToString() { }
35843586
}
3587+
[Akka.Annotations.InternalApiAttribute()]
3588+
public sealed class SetupFlowStage<TIn, TOut, TMat> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.FlowShape<TIn, TOut>, System.Threading.Tasks.Task<TMat>>
3589+
{
3590+
public SetupFlowStage(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> factory) { }
3591+
public Akka.Streams.Inlet<TIn> In { get; }
3592+
protected override Akka.Streams.Attributes InitialAttributes { get; }
3593+
public Akka.Streams.Outlet<TOut> Out { get; }
3594+
public override Akka.Streams.FlowShape<TIn, TOut> Shape { get; }
3595+
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<TMat>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
3596+
}
3597+
[Akka.Annotations.InternalApiAttribute()]
3598+
public sealed class SetupSourceStage<TOut, TMat> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SourceShape<TOut>, System.Threading.Tasks.Task<TMat>>
3599+
{
3600+
public SetupSourceStage(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Source<TOut, TMat>> factory) { }
3601+
protected override Akka.Streams.Attributes InitialAttributes { get; }
3602+
public Akka.Streams.Outlet<TOut> Out { get; }
3603+
public override Akka.Streams.SourceShape<TOut> Shape { get; }
3604+
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<TMat>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
3605+
}
35853606
public class SignalThrewException : Akka.Pattern.IllegalStateException, Akka.Streams.Implementation.ISpecViolation
35863607
{
35873608
public SignalThrewException(string message, System.Exception cause) { }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="SeqSinkSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Collections.Generic;
10+
using Akka.Streams.Dsl;
11+
using Akka.TestKit;
12+
using FluentAssertions;
13+
using Xunit;
14+
using Xunit.Abstractions;
15+
16+
namespace Akka.Streams.Tests.Dsl
17+
{
18+
public class SetupSpec : AkkaSpec
19+
{
20+
private ActorMaterializer Materializer { get; }
21+
22+
public SetupSpec(ITestOutputHelper helper)
23+
: base(helper) => Materializer = ActorMaterializer.Create(Sys);
24+
25+
[Fact]
26+
public void SourceSetup_should_expose_materializer()
27+
{
28+
var source = Source.Setup((mat, _) => Source.Single(mat.IsShutdown));
29+
source.RunWith(Sink.First<bool>(), Materializer).Result.Should().BeFalse();
30+
}
31+
32+
[Fact]
33+
public void SourceSetup_should_expose_attributes()
34+
{
35+
var source = Source.Setup((_, attr) => Source.Single(attr.AttributeList));
36+
source.RunWith(Sink.First<IEnumerable<Attributes.IAttribute>>(), Materializer).Result.Should().NotBeEmpty();
37+
}
38+
39+
[Fact]
40+
public void SourceSetup_should_propagate_materialized_value()
41+
{
42+
var source = Source.Setup((_, _) => Source.Maybe<NotUsed>());
43+
44+
var (completion, element) = source.ToMaterialized(Sink.First<NotUsed>(), Keep.Both).Run(Materializer);
45+
completion.Result.TrySetResult(NotUsed.Instance);
46+
element.Result.ShouldBe(NotUsed.Instance);
47+
}
48+
49+
[Fact]
50+
public void SourceSetup_should_propagate_attributes()
51+
{
52+
var source = Source.Setup((_, attr) => Source.Single(attr.GetNameLifted)).Named("my-name");
53+
source.RunWith(Sink.First<Func<string>>(), Materializer).Result.Invoke().ShouldBe("setup-my-name");
54+
}
55+
56+
[Fact]
57+
public void SourceSetup_should_propagate_attributes_when_nested()
58+
{
59+
var source = Source.Setup((_, _) => Source.Setup((_, attr) => Source.Single(attr.GetNameLifted))).Named("my-name");
60+
source.RunWith(Sink.First<Func<string>>(), Materializer).Result.Invoke().ShouldBe("setup-my-name-setup");
61+
}
62+
63+
[Fact]
64+
public void SourceSetup_should_handle_factory_failure()
65+
{
66+
var error = new ApplicationException("boom");
67+
var source = Source.Setup<NotUsed, NotUsed>((_, _) => throw error);
68+
69+
var (materialized, completion) = source.ToMaterialized(Sink.First<NotUsed>(), Keep.Both).Run(Materializer);
70+
71+
Assert.Throws<AggregateException>(() => materialized.Result).InnerException?.Should().BeOfType<ApplicationException>();
72+
Assert.Throws<AggregateException>(() => completion.Result).InnerException?.Should().BeOfType<ApplicationException>();
73+
}
74+
75+
[Fact]
76+
public void SourceSetup_should_handle_materialization_failure()
77+
{
78+
var error = new ApplicationException("boom");
79+
var source = Source.Setup((_, _) => Source.Empty<NotUsed>().MapMaterializedValue<NotUsed>(_ => throw error));
80+
81+
var (materialized, completion) = source.ToMaterialized(Sink.First<NotUsed>(), Keep.Both).Run(Materializer);
82+
83+
Assert.Throws<AggregateException>(() => materialized.Result).InnerException?.Should().BeOfType<ApplicationException>();
84+
Assert.Throws<AggregateException>(() => completion.Result).InnerException?.Should().BeOfType<ApplicationException>();
85+
}
86+
87+
[Fact]
88+
public void FlowSetup_should_expose_materializer()
89+
{
90+
var flow = Flow.Setup((mat, _) => Flow.FromSinkAndSource(
91+
Sink.Ignore<object>().MapMaterializedValue(_ => NotUsed.Instance),
92+
Source.Single(mat.IsShutdown)));
93+
94+
Source.Empty<object>().Via(flow).RunWith(Sink.First<bool>(), Materializer).Result.Should().BeFalse();
95+
}
96+
97+
[Fact]
98+
public void FlowSetup_should_expose_attributes()
99+
{
100+
var flow = Flow.Setup((_, attr) => Flow.FromSinkAndSource(
101+
Sink.Ignore<object>().MapMaterializedValue(_ => NotUsed.Instance),
102+
Source.Single(attr.AttributeList)));
103+
104+
Source.Empty<object>().Via(flow).RunWith(Sink.First<IEnumerable<Attributes.IAttribute>>(), Materializer).Result.Should().NotBeEmpty();
105+
}
106+
107+
[Fact]
108+
public void FlowSetup_should_propagate_materialized_value()
109+
{
110+
var flow = Flow.Setup((_, _) => Flow.FromSinkAndSource(
111+
Sink.Ignore<object>().MapMaterializedValue(_ => NotUsed.Instance),
112+
Source.Maybe<NotUsed>(), Keep.Right));
113+
114+
var (completion, element) = Source.Empty<object>()
115+
.ViaMaterialized(flow, Keep.Right)
116+
.ToMaterialized(Sink.First<NotUsed>(), Keep.Both).Run(Materializer);
117+
118+
completion.Result.TrySetResult(NotUsed.Instance);
119+
element.Result.ShouldBe(NotUsed.Instance);
120+
}
121+
122+
[Fact]
123+
public void FlowSetup_should_propagate_attributes()
124+
{
125+
var flow = Flow.Setup((_, attr) => Flow.FromSinkAndSource(
126+
Sink.Ignore<object>().MapMaterializedValue(_ => NotUsed.Instance),
127+
Source.Single(attr.GetNameLifted))).Named("my-name");
128+
129+
Source.Empty<object>().Via(flow).RunWith(Sink.First<Func<string>>(), Materializer).Result.Invoke().ShouldBe("setup-my-name");
130+
}
131+
132+
[Fact]
133+
public void FlowSetup_should_propagate_attributes_when_nested()
134+
{
135+
var flow = Flow.Setup((_, _) => Flow.Setup((_, attr) => Flow.FromSinkAndSource(
136+
Sink.Ignore<object>().MapMaterializedValue(_ => NotUsed.Instance),
137+
Source.Single(attr.GetNameLifted)))).Named("my-name");
138+
139+
Source.Empty<object>().Via(flow).RunWith(Sink.First<Func<string>>(), Materializer).Result.Invoke().ShouldBe("setup-my-name-setup");
140+
}
141+
142+
[Fact]
143+
public void FlowSetup_should_handle_factory_failure()
144+
{
145+
var error = new ApplicationException("boom");
146+
var flow = Flow.Setup<NotUsed, NotUsed, NotUsed>((_, _) => throw error);
147+
148+
var (materialized, completion) = Source.Empty<NotUsed>()
149+
.ViaMaterialized(flow, Keep.Right)
150+
.ToMaterialized(Sink.First<NotUsed>(), Keep.Both)
151+
.Run(Materializer);
152+
153+
Assert.Throws<AggregateException>(() => materialized.Result).InnerException?.Should().BeOfType<ApplicationException>();
154+
Assert.Throws<AggregateException>(() => completion.Result).InnerException?.Should().BeOfType<ApplicationException>();
155+
}
156+
157+
[Fact]
158+
public void FlowSetup_should_handle_materialization_failure()
159+
{
160+
var error = new ApplicationException("boom");
161+
var flow = Flow.Setup((_, _) => Flow.Create<NotUsed>().MapMaterializedValue<NotUsed>(_ => throw error));
162+
163+
var (materialized, completion) = Source.Empty<NotUsed>()
164+
.ViaMaterialized(flow, Keep.Right)
165+
.ToMaterialized(Sink.First<NotUsed>(), Keep.Both)
166+
.Run(Materializer);
167+
168+
Assert.Throws<AggregateException>(() => materialized.Result).InnerException?.Should().BeOfType<ApplicationException>();
169+
Assert.Throws<AggregateException>(() => completion.Result).InnerException?.Should().BeOfType<ApplicationException>();
170+
}
171+
}
172+
}

src/core/Akka.Streams/Dsl/Flow.cs

+13
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,19 @@ public static Flow<TIn, TOut, NotUsed> FromFunction<TIn, TOut>(Func<TIn, TOut> f
478478
public static Flow<TIn, TOut, TMat> FromGraph<TIn, TOut, TMat>(IGraph<FlowShape<TIn, TOut>, TMat> graph)
479479
=> graph as Flow<TIn, TOut, TMat> ?? new Flow<TIn, TOut, TMat>(graph.Module);
480480

481+
/// <summary>
482+
/// Defers the creation of a <see cref="Flow"/> until materialization. The <paramref name="factory"/>
483+
/// function exposes <see cref="ActorMaterializer"/> which is going to be used during materialization and
484+
/// <see cref="Attributes"/> of the <see cref="Flow"/> returned by this method.
485+
/// </summary>
486+
/// <typeparam name="TIn">TBD</typeparam>
487+
/// <typeparam name="TOut">TBD</typeparam>
488+
/// <typeparam name="TMat">TBD</typeparam>
489+
/// <param name="factory">TBD</param>
490+
/// <returns>TBD</returns>
491+
public static Flow<TIn, TOut, Task<TMat>> Setup<TIn, TOut, TMat>(Func<ActorMaterializer, Attributes, Flow<TIn, TOut, TMat>> factory)
492+
=> FromGraph(new SetupFlowStage<TIn, TOut, TMat>(factory));
493+
481494
/// <summary>
482495
/// Creates a <see cref="Flow{TIn,TOut,TMat}"/> from a <see cref="Sink{TIn,TMat}"/> and a <see cref="Source{TOut,TMat}"/> where the flow's input
483496
/// will be sent to the sink and the flow's output will come from the source.

src/core/Akka.Streams/Dsl/Source.cs

+12
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,18 @@ public static Source<T, NotUsed> Single<T>(T element)
675675
public static Source<T, TMat> FromGraph<T, TMat>(IGraph<SourceShape<T>, TMat> source)
676676
=> source as Source<T, TMat> ?? new Source<T, TMat>(source.Module);
677677

678+
/// <summary>
679+
/// Defers the creation of a <see cref="Source"/> until materialization. The <paramref name="factory"/>
680+
/// function exposes <see cref="ActorMaterializer"/> which is going to be used during materialization and
681+
/// <see cref="Attributes"/> of the <see cref="Source"/> returned by this method.
682+
/// </summary>
683+
/// <typeparam name="T">TBD</typeparam>
684+
/// <typeparam name="TMat">TBD</typeparam>
685+
/// <param name="factory">TBD</param>
686+
/// <returns>TBD</returns>
687+
public static Source<T, Task<TMat>> Setup<T, TMat>(Func<ActorMaterializer, Attributes, Source<T, TMat>> factory)
688+
=> FromGraph(new SetupSourceStage<T, TMat>(factory));
689+
678690
/// <summary>
679691
/// Start a new <see cref="Source{TOut,TMat}"/> from the given <see cref="Task{T}"/>. The stream will consist of
680692
/// one element when the <see cref="Task{T}"/> is completed with a successful value, which

0 commit comments

Comments
 (0)