diff --git a/docs/articles/streams/builtinstages.md b/docs/articles/streams/builtinstages.md index a8666c94264..d8fbf4e5e41 100644 --- a/docs/articles/streams/builtinstages.md +++ b/docs/articles/streams/builtinstages.md @@ -231,6 +231,13 @@ Combine the elements of multiple streams into a stream of sequences using a comb **completes** when any upstream completes +### Setup + +Defer the creation of a `Source` until materialization and access `ActorMaterializer` and `Attributes`. + +Typically used when access to materializer is needed to run a different stream during the construction of a source/flow. +Can also be used to access the underlying `ActorSystem` from `ActorMaterializer`. + ## Sink Stages These built-in sinks are available from ``Akka.Stream.DSL.Sink``: @@ -615,6 +622,13 @@ Just like `Scan` but receiving a function that results in a `Task` to the next v **completes** when upstream completes and the last `Task` is resolved +### Setup + +Defer the creation of a `Flow` until materialization and access `ActorMaterializer` and `Attributes`. + +Typically used when access to materializer is needed to run a different stream during the construction of a source/flow. +Can also be used to access the underlying `ActorSystem` from `ActorMaterializer`. + ### Aggregate Start with current value ``zero`` and then apply the current and next value to the given function, when upstream diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt index 7c3f3a233db..0ef559953a3 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt @@ -1353,6 +1353,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Flow Identity() { } public static Akka.Streams.Dsl.Flow Identity() { } public static Akka.Streams.Dsl.Flow>> LazyInitAsync(System.Func>> flowFactory) { } + public static Akka.Streams.Dsl.Flow> Setup(System.Func> factory) { } } public class static FlowOperations { @@ -2033,6 +2034,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source Never() { } public static Akka.Streams.Dsl.Source> Queue(int bufferSize, Akka.Streams.OverflowStrategy overflowStrategy) { } public static Akka.Streams.Dsl.Source Repeat(T element) { } + public static Akka.Streams.Dsl.Source> Setup(System.Func> factory) { } public static Akka.Streams.SourceShape Shape(string name) { } public static Akka.Streams.Dsl.Source Single(T element) { } public static Akka.Streams.Dsl.Source Tick(System.TimeSpan initialDelay, System.TimeSpan interval, T tick) { } @@ -3582,6 +3584,25 @@ namespace Akka.Streams.Implementation public override Akka.Streams.Stage.ILogicAndMaterializedValue>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { } public override string ToString() { } } + [Akka.Annotations.InternalApiAttribute()] + public sealed class SetupFlowStage : Akka.Streams.Stage.GraphStageWithMaterializedValue, System.Threading.Tasks.Task> + { + public SetupFlowStage(System.Func> factory) { } + public Akka.Streams.Inlet In { get; } + protected override Akka.Streams.Attributes InitialAttributes { get; } + public Akka.Streams.Outlet Out { get; } + public override Akka.Streams.FlowShape Shape { get; } + public override Akka.Streams.Stage.ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { } + } + [Akka.Annotations.InternalApiAttribute()] + public sealed class SetupSourceStage : Akka.Streams.Stage.GraphStageWithMaterializedValue, System.Threading.Tasks.Task> + { + public SetupSourceStage(System.Func> factory) { } + protected override Akka.Streams.Attributes InitialAttributes { get; } + public Akka.Streams.Outlet Out { get; } + public override Akka.Streams.SourceShape Shape { get; } + public override Akka.Streams.Stage.ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { } + } public class SignalThrewException : Akka.Pattern.IllegalStateException, Akka.Streams.Implementation.ISpecViolation { public SignalThrewException(string message, System.Exception cause) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt index 4529f13188d..107bc594f50 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt @@ -1353,6 +1353,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Flow Identity() { } public static Akka.Streams.Dsl.Flow Identity() { } public static Akka.Streams.Dsl.Flow>> LazyInitAsync(System.Func>> flowFactory) { } + public static Akka.Streams.Dsl.Flow> Setup(System.Func> factory) { } } public class static FlowOperations { @@ -2033,6 +2034,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source Never() { } public static Akka.Streams.Dsl.Source> Queue(int bufferSize, Akka.Streams.OverflowStrategy overflowStrategy) { } public static Akka.Streams.Dsl.Source Repeat(T element) { } + public static Akka.Streams.Dsl.Source> Setup(System.Func> factory) { } public static Akka.Streams.SourceShape Shape(string name) { } public static Akka.Streams.Dsl.Source Single(T element) { } public static Akka.Streams.Dsl.Source Tick(System.TimeSpan initialDelay, System.TimeSpan interval, T tick) { } @@ -3582,6 +3584,25 @@ namespace Akka.Streams.Implementation public override Akka.Streams.Stage.ILogicAndMaterializedValue>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { } public override string ToString() { } } + [Akka.Annotations.InternalApiAttribute()] + public sealed class SetupFlowStage : Akka.Streams.Stage.GraphStageWithMaterializedValue, System.Threading.Tasks.Task> + { + public SetupFlowStage(System.Func> factory) { } + public Akka.Streams.Inlet In { get; } + protected override Akka.Streams.Attributes InitialAttributes { get; } + public Akka.Streams.Outlet Out { get; } + public override Akka.Streams.FlowShape Shape { get; } + public override Akka.Streams.Stage.ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { } + } + [Akka.Annotations.InternalApiAttribute()] + public sealed class SetupSourceStage : Akka.Streams.Stage.GraphStageWithMaterializedValue, System.Threading.Tasks.Task> + { + public SetupSourceStage(System.Func> factory) { } + protected override Akka.Streams.Attributes InitialAttributes { get; } + public Akka.Streams.Outlet Out { get; } + public override Akka.Streams.SourceShape Shape { get; } + public override Akka.Streams.Stage.ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { } + } public class SignalThrewException : Akka.Pattern.IllegalStateException, Akka.Streams.Implementation.ISpecViolation { public SignalThrewException(string message, System.Exception cause) { } diff --git a/src/core/Akka.Streams.Tests/Dsl/SetupSpec.cs b/src/core/Akka.Streams.Tests/Dsl/SetupSpec.cs new file mode 100644 index 00000000000..318f1baa3dc --- /dev/null +++ b/src/core/Akka.Streams.Tests/Dsl/SetupSpec.cs @@ -0,0 +1,172 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2023 Lightbend Inc. +// Copyright (C) 2013-2023 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using Akka.Streams.Dsl; +using Akka.TestKit; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Streams.Tests.Dsl +{ + public class SetupSpec : AkkaSpec + { + private ActorMaterializer Materializer { get; } + + public SetupSpec(ITestOutputHelper helper) + : base(helper) => Materializer = ActorMaterializer.Create(Sys); + + [Fact] + public void SourceSetup_should_expose_materializer() + { + var source = Source.Setup((mat, _) => Source.Single(mat.IsShutdown)); + source.RunWith(Sink.First(), Materializer).Result.Should().BeFalse(); + } + + [Fact] + public void SourceSetup_should_expose_attributes() + { + var source = Source.Setup((_, attr) => Source.Single(attr.AttributeList)); + source.RunWith(Sink.First>(), Materializer).Result.Should().NotBeEmpty(); + } + + [Fact] + public void SourceSetup_should_propagate_materialized_value() + { + var source = Source.Setup((_, _) => Source.Maybe()); + + var (completion, element) = source.ToMaterialized(Sink.First(), Keep.Both).Run(Materializer); + completion.Result.TrySetResult(NotUsed.Instance); + element.Result.ShouldBe(NotUsed.Instance); + } + + [Fact] + public void SourceSetup_should_propagate_attributes() + { + var source = Source.Setup((_, attr) => Source.Single(attr.GetNameLifted)).Named("my-name"); + source.RunWith(Sink.First>(), Materializer).Result.Invoke().ShouldBe("setup-my-name"); + } + + [Fact] + public void SourceSetup_should_propagate_attributes_when_nested() + { + var source = Source.Setup((_, _) => Source.Setup((_, attr) => Source.Single(attr.GetNameLifted))).Named("my-name"); + source.RunWith(Sink.First>(), Materializer).Result.Invoke().ShouldBe("setup-my-name-setup"); + } + + [Fact] + public void SourceSetup_should_handle_factory_failure() + { + var error = new ApplicationException("boom"); + var source = Source.Setup((_, _) => throw error); + + var (materialized, completion) = source.ToMaterialized(Sink.First(), Keep.Both).Run(Materializer); + + Assert.Throws(() => materialized.Result).InnerException?.Should().BeOfType(); + Assert.Throws(() => completion.Result).InnerException?.Should().BeOfType(); + } + + [Fact] + public void SourceSetup_should_handle_materialization_failure() + { + var error = new ApplicationException("boom"); + var source = Source.Setup((_, _) => Source.Empty().MapMaterializedValue(_ => throw error)); + + var (materialized, completion) = source.ToMaterialized(Sink.First(), Keep.Both).Run(Materializer); + + Assert.Throws(() => materialized.Result).InnerException?.Should().BeOfType(); + Assert.Throws(() => completion.Result).InnerException?.Should().BeOfType(); + } + + [Fact] + public void FlowSetup_should_expose_materializer() + { + var flow = Flow.Setup((mat, _) => Flow.FromSinkAndSource( + Sink.Ignore().MapMaterializedValue(_ => NotUsed.Instance), + Source.Single(mat.IsShutdown))); + + Source.Empty().Via(flow).RunWith(Sink.First(), Materializer).Result.Should().BeFalse(); + } + + [Fact] + public void FlowSetup_should_expose_attributes() + { + var flow = Flow.Setup((_, attr) => Flow.FromSinkAndSource( + Sink.Ignore().MapMaterializedValue(_ => NotUsed.Instance), + Source.Single(attr.AttributeList))); + + Source.Empty().Via(flow).RunWith(Sink.First>(), Materializer).Result.Should().NotBeEmpty(); + } + + [Fact] + public void FlowSetup_should_propagate_materialized_value() + { + var flow = Flow.Setup((_, _) => Flow.FromSinkAndSource( + Sink.Ignore().MapMaterializedValue(_ => NotUsed.Instance), + Source.Maybe(), Keep.Right)); + + var (completion, element) = Source.Empty() + .ViaMaterialized(flow, Keep.Right) + .ToMaterialized(Sink.First(), Keep.Both).Run(Materializer); + + completion.Result.TrySetResult(NotUsed.Instance); + element.Result.ShouldBe(NotUsed.Instance); + } + + [Fact] + public void FlowSetup_should_propagate_attributes() + { + var flow = Flow.Setup((_, attr) => Flow.FromSinkAndSource( + Sink.Ignore().MapMaterializedValue(_ => NotUsed.Instance), + Source.Single(attr.GetNameLifted))).Named("my-name"); + + Source.Empty().Via(flow).RunWith(Sink.First>(), Materializer).Result.Invoke().ShouldBe("setup-my-name"); + } + + [Fact] + public void FlowSetup_should_propagate_attributes_when_nested() + { + var flow = Flow.Setup((_, _) => Flow.Setup((_, attr) => Flow.FromSinkAndSource( + Sink.Ignore().MapMaterializedValue(_ => NotUsed.Instance), + Source.Single(attr.GetNameLifted)))).Named("my-name"); + + Source.Empty().Via(flow).RunWith(Sink.First>(), Materializer).Result.Invoke().ShouldBe("setup-my-name-setup"); + } + + [Fact] + public void FlowSetup_should_handle_factory_failure() + { + var error = new ApplicationException("boom"); + var flow = Flow.Setup((_, _) => throw error); + + var (materialized, completion) = Source.Empty() + .ViaMaterialized(flow, Keep.Right) + .ToMaterialized(Sink.First(), Keep.Both) + .Run(Materializer); + + Assert.Throws(() => materialized.Result).InnerException?.Should().BeOfType(); + Assert.Throws(() => completion.Result).InnerException?.Should().BeOfType(); + } + + [Fact] + public void FlowSetup_should_handle_materialization_failure() + { + var error = new ApplicationException("boom"); + var flow = Flow.Setup((_, _) => Flow.Create().MapMaterializedValue(_ => throw error)); + + var (materialized, completion) = Source.Empty() + .ViaMaterialized(flow, Keep.Right) + .ToMaterialized(Sink.First(), Keep.Both) + .Run(Materializer); + + Assert.Throws(() => materialized.Result).InnerException?.Should().BeOfType(); + Assert.Throws(() => completion.Result).InnerException?.Should().BeOfType(); + } + } +} diff --git a/src/core/Akka.Streams/Dsl/Flow.cs b/src/core/Akka.Streams/Dsl/Flow.cs index 256e42ac9c7..4a00c3f6f05 100644 --- a/src/core/Akka.Streams/Dsl/Flow.cs +++ b/src/core/Akka.Streams/Dsl/Flow.cs @@ -478,6 +478,19 @@ public static Flow FromFunction(Func f public static Flow FromGraph(IGraph, TMat> graph) => graph as Flow ?? new Flow(graph.Module); + /// + /// Defers the creation of a until materialization. The + /// function exposes which is going to be used during materialization and + /// of the returned by this method. + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + public static Flow> Setup(Func> factory) + => FromGraph(new SetupFlowStage(factory)); + /// /// Creates a from a and a where the flow's input /// will be sent to the sink and the flow's output will come from the source. diff --git a/src/core/Akka.Streams/Dsl/Source.cs b/src/core/Akka.Streams/Dsl/Source.cs index 4041c2c01a2..30fff6a70cc 100644 --- a/src/core/Akka.Streams/Dsl/Source.cs +++ b/src/core/Akka.Streams/Dsl/Source.cs @@ -675,6 +675,18 @@ public static Source Single(T element) public static Source FromGraph(IGraph, TMat> source) => source as Source ?? new Source(source.Module); + /// + /// Defers the creation of a until materialization. The + /// function exposes which is going to be used during materialization and + /// of the returned by this method. + /// + /// TBD + /// TBD + /// TBD + /// TBD + public static Source> Setup(Func> factory) + => FromGraph(new SetupSourceStage(factory)); + /// /// Start a new from the given . The stream will consist of /// one element when the is completed with a successful value, which diff --git a/src/core/Akka.Streams/Implementation/SetupStage.cs b/src/core/Akka.Streams/Implementation/SetupStage.cs new file mode 100644 index 00000000000..db9bf852fda --- /dev/null +++ b/src/core/Akka.Streams/Implementation/SetupStage.cs @@ -0,0 +1,176 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2023 Lightbend Inc. +// Copyright (C) 2013-2023 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Annotations; +using Akka.Streams.Dsl; +using Akka.Streams.Stage; + +namespace Akka.Streams.Implementation +{ + [InternalApi] + public sealed class SetupFlowStage : GraphStageWithMaterializedValue, Task> + { + #region Logic + + private sealed class Logic : GraphStageLogic + { + private readonly SetupFlowStage _stage; + private readonly TaskCompletionSource _matPromise; + private readonly Attributes _inheritedAttributes; + private readonly SubSinkInlet _subInlet; + private readonly SubSourceOutlet _subOutlet; + + public Logic(SetupFlowStage stage, TaskCompletionSource matPromise, Attributes inheritedAttributes) + : base(stage.Shape) + { + _stage = stage; + _matPromise = matPromise; + _inheritedAttributes = inheritedAttributes; + + _subInlet = new SubSinkInlet(this, "SetupFlowStage"); + _subOutlet = new SubSourceOutlet(this, "SetupFlowStage"); + + _subInlet.SetHandler(new LambdaInHandler( + onPush: () => Push(_stage.Out, _subInlet.Grab()), + onUpstreamFinish: () => Complete(_stage.Out), + onUpstreamFailure: ex => Fail(_stage.Out, ex))); + _subOutlet.SetHandler(new LambdaOutHandler( + onPull: () => Pull(_stage.In), + onDownstreamFinish: ex => Cancel(_stage.In, ex))); + + SetHandler(_stage.In, new LambdaInHandler( + onPush: () => Grab(_stage.In), + onUpstreamFinish: _subOutlet.Complete, + onUpstreamFailure: _subOutlet.Fail)); + SetHandler(_stage.Out, new LambdaOutHandler( + onPull: _subInlet.Pull, + onDownstreamFinish: _subInlet.Cancel)); + } + + public override void PreStart() + { + base.PreStart(); + + try + { + var flow = _stage._factory(ActorMaterializerHelper.Downcast(Materializer), _inheritedAttributes); + var mat = SubFusingMaterializer.Materialize( + Source.FromGraph(_subOutlet.Source) + .ViaMaterialized(flow, Keep.Right) + .To(Sink.FromGraph(_subInlet.Sink)), _inheritedAttributes); + _matPromise.SetResult(mat); + } + catch (Exception ex) + { + _matPromise.SetException(ex); + throw; + } + } + } + + #endregion + + private readonly Func> _factory; + + public SetupFlowStage(Func> factory) + { + _factory = factory; + Shape = new FlowShape(In, Out); + } + + public Inlet In { get; } = new Inlet("SetupFlowStage.in"); + + public Outlet Out { get; } = new Outlet("SetupFlowStage.out"); + + public override FlowShape Shape { get; } + + protected override Attributes InitialAttributes => Attributes.CreateName("setup"); + + public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + { + var matPromise = new TaskCompletionSource(); + var logic = new Logic(this, matPromise, inheritedAttributes); + return new LogicAndMaterializedValue>(logic, matPromise.Task); + } + } + + [InternalApi] + public sealed class SetupSourceStage : GraphStageWithMaterializedValue, Task> + { + #region Logic + + private sealed class Logic : GraphStageLogic + { + private readonly SetupSourceStage _stage; + private readonly TaskCompletionSource _matPromise; + private readonly Attributes _inheritedAttributes; + private readonly SubSinkInlet _subInlet; + + public Logic(SetupSourceStage stage, TaskCompletionSource matPromise, Attributes inheritedAttributes) + : base(stage.Shape) + { + _stage = stage; + _matPromise = matPromise; + _inheritedAttributes = inheritedAttributes; + + _subInlet = new SubSinkInlet(this, "SetupSourceStage"); + _subInlet.SetHandler(new LambdaInHandler( + onPush: () => Push(_stage.Out, _subInlet.Grab()), + onUpstreamFinish: () => Complete(_stage.Out), + onUpstreamFailure: ex => Fail(_stage.Out, ex))); + + SetHandler(_stage.Out, new LambdaOutHandler(onPull: _subInlet.Pull, onDownstreamFinish: _subInlet.Cancel)); + } + + public override void PreStart() + { + base.PreStart(); + + try + { + var source = _stage._factory(ActorMaterializerHelper.Downcast(Materializer), _inheritedAttributes); + var mat = SubFusingMaterializer.Materialize(source.To(Sink.FromGraph(_subInlet.Sink)), _inheritedAttributes); + _matPromise.SetResult(mat); + } + catch (Exception ex) + { + _matPromise.SetException(ex); + throw; + } + } + } + + #endregion + + private readonly Func> _factory; + + /// + /// Creates a new + /// + /// The factory that generates the source when needed + public SetupSourceStage(Func> factory) + { + _factory = factory; + Shape = new SourceShape(Out); + } + + public Outlet Out { get; } = new Outlet("SetupSourceStage.out"); + + public override SourceShape Shape { get; } + + protected override Attributes InitialAttributes => Attributes.CreateName("setup"); + + public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + { + var matPromise = new TaskCompletionSource(); + var logic = new Logic(this, matPromise, inheritedAttributes); + return new LogicAndMaterializedValue>(logic, matPromise.Task); + } + } +} diff --git a/src/core/Akka.Streams/Stage/GraphStage.cs b/src/core/Akka.Streams/Stage/GraphStage.cs index ee8a1e33f58..a55df889f00 100644 --- a/src/core/Akka.Streams/Stage/GraphStage.cs +++ b/src/core/Akka.Streams/Stage/GraphStage.cs @@ -814,7 +814,8 @@ internal GraphInterpreter Interpreter get { if (_interpreter == null) - throw new IllegalStateException("Not yet initialized: only SetHandler is allowed in GraphStageLogic constructor"); + throw new IllegalStateException("Not yet initialized: only SetHandler is allowed in GraphStageLogic constructor. " + + "To access materializer use Source/Flow/Sink.Setup factory"); return _interpreter; } set => _interpreter = value;