Skip to content

Commit bab0a93

Browse files
Do not use expression based props for long lived streams (akkadotnet#6807)
* Do not use expression based props for long lived streams * Update ActorMaterializerImpl.cs --------- Co-authored-by: Aaron Stannard <[email protected]>
1 parent 1828a01 commit bab0a93

9 files changed

+19
-9
lines changed

src/core/Akka.Streams/Implementation/ActorMaterializerImpl.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,7 @@ public Children(IImmutableSet<IActorRef> refs)
607607
/// <param name="haveShutdown">TBD</param>
608608
/// <returns>TBD</returns>
609609
public static Props Props(ActorMaterializerSettings settings, AtomicBoolean haveShutdown)
610-
=> Actor.Props.Create(() => new StreamSupervisor(settings, haveShutdown)).WithDeploy(Deploy.Local);
610+
=> Actor.Props.Create<StreamSupervisor>(settings, haveShutdown).WithDeploy(Deploy.Local);
611611

612612
/// <summary>
613613
/// TBD
@@ -631,6 +631,7 @@ public static Props Props(ActorMaterializerSettings settings, AtomicBoolean have
631631
/// </summary>
632632
/// <param name="settings">TBD</param>
633633
/// <param name="haveShutdown">TBD</param>
634+
/// If this changes you must also change StreamSupervisor.Props as well!
634635
public StreamSupervisor(ActorMaterializerSettings settings, AtomicBoolean haveShutdown)
635636
{
636637
Settings = settings;

src/core/Akka.Streams/Implementation/ActorRefSourceActor.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public static Props Props(int bufferSize, OverflowStrategy overflowStrategy, Act
3434
throw new NotSupportedException("Backpressure overflow strategy not supported");
3535

3636
var maxFixedBufferSize = settings.MaxFixedBufferSize;
37-
return Actor.Props.Create(() => new ActorRefSourceActor<T>(bufferSize, overflowStrategy, maxFixedBufferSize));
37+
return Actor.Props.Create<ActorRefSourceActor<T>>(bufferSize, overflowStrategy, maxFixedBufferSize);
3838
}
3939

4040
/// <summary>
@@ -58,6 +58,7 @@ public static Props Props(int bufferSize, OverflowStrategy overflowStrategy, Act
5858
/// <param name="bufferSize">TBD</param>
5959
/// <param name="overflowStrategy">TBD</param>
6060
/// <param name="maxFixedBufferSize">TBD</param>
61+
/// If this changes you must also change <see cref="ActorRefSourceActor{T}.Props"/> as well!
6162
public ActorRefSourceActor(int bufferSize, OverflowStrategy overflowStrategy, int maxFixedBufferSize)
6263
{
6364
BufferSize = bufferSize;

src/core/Akka.Streams/Implementation/FanOut.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,7 @@ internal static class Unzip
720720
/// <param name="settings">TBD</param>
721721
/// <returns>TBD</returns>
722722
public static Props Props<T>(ActorMaterializerSettings settings)
723-
=> Actor.Props.Create(() => new Unzip<T>(settings, 2)).WithDeploy(Deploy.Local);
723+
=> Actor.Props.Create<Unzip<T>>(settings, 2).WithDeploy(Deploy.Local);
724724
}
725725

726726
/// <summary>
@@ -740,6 +740,7 @@ internal sealed class Unzip<T> : FanOut<T>
740740
/// This exception is thrown when the elements in <see cref="Akka.Streams.Implementation.FanOut{T}.PrimaryInputs"/>
741741
/// are of an unknown type.
742742
/// </exception>>
743+
/// If this gets changed you must change <see cref="Akka.Streams.Implementation.FanOut.Unzip{T}"/> as well!
743744
public Unzip(ActorMaterializerSettings settings, int outputCount = 2) : base(settings, outputCount)
744745
{
745746
OutputBunch.MarkAllOutputs();

src/core/Akka.Streams/Implementation/FanoutProcessorImpl.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ internal sealed class FanoutProcessorImpl<T, TStreamBuffer> : ActorProcessorImpl
228228
/// <param name="onTerminated">TBD</param>
229229
/// <returns>TBD</returns>
230230
public static Props Props(ActorMaterializerSettings settings, Action onTerminated = null)
231-
=> Actor.Props.Create(() => new FanoutProcessorImpl<T, TStreamBuffer>(settings, onTerminated)).WithDeploy(Deploy.Local);
231+
=> Actor.Props.Create<FanoutProcessorImpl<T, TStreamBuffer>>(settings, onTerminated).WithDeploy(Deploy.Local);
232232

233233
/// <summary>
234234
/// TBD
@@ -240,6 +240,7 @@ public static Props Props(ActorMaterializerSettings settings, Action onTerminate
240240
/// </summary>
241241
/// <param name="settings">TBD</param>
242242
/// <param name="onTerminated">TBD</param>
243+
/// If this gets changed you must change <see cref="FanoutProcessorImpl{T,TStreamBuffer}.Props"/> as well!
243244
public FanoutProcessorImpl(ActorMaterializerSettings settings, Action onTerminated) : base(settings)
244245
{
245246
PrimaryOutputs = new FanoutOutputs<T, TStreamBuffer>(settings.MaxInputBufferSize,

src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs

+3-1
Original file line numberDiff line numberDiff line change
@@ -1329,7 +1329,8 @@ private void Complete()
13291329
/// </summary>
13301330
/// <param name="shell">TBD</param>
13311331
/// <returns>TBD</returns>
1332-
public static Props Props(GraphInterpreterShell shell) => Actor.Props.Create(() => new ActorGraphInterpreter(shell)).WithDeploy(Deploy.Local);
1332+
public static Props Props(GraphInterpreterShell shell) => Actor.Props
1333+
.Create<ActorGraphInterpreter>(shell).WithDeploy(Deploy.Local);
13331334

13341335
private ISet<GraphInterpreterShell> _activeInterpreters = new HashSet<GraphInterpreterShell>();
13351336
private readonly Queue<GraphInterpreterShell> _newShells = new();
@@ -1346,6 +1347,7 @@ private void Complete()
13461347
/// TBD
13471348
/// </summary>
13481349
/// <param name="shell">TBD</param>
1350+
/// If this ctor gets changed you -must- change <see cref="ActorGraphInterpreter.Props"/> as well!
13491351
public ActorGraphInterpreter(GraphInterpreterShell shell)
13501352
{
13511353
_initial = shell;

src/core/Akka.Streams/Implementation/IO/FilePublisher.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public static Props Props(FileInfo f, TaskCompletionSource<IOResult> completionP
5757
if (maxBuffer < initialBuffer)
5858
throw new ArgumentException($"maxBuffer must be >= initialBuffer (was {maxBuffer})", nameof(maxBuffer));
5959

60-
return Actor.Props.Create(() => new FilePublisher(f, completionPromise, chunkSize, startPosition, maxBuffer))
60+
return Actor.Props.Create<FilePublisher>( f, completionPromise, chunkSize, startPosition, maxBuffer)
6161
.WithDeploy(Deploy.Local);
6262
}
6363

@@ -86,6 +86,7 @@ private struct Continue : IDeadLetterSuppression
8686
/// <param name="chunkSize">TBD</param>
8787
/// <param name="startPosition">TBD</param>
8888
/// <param name="maxBuffer">TBD</param>
89+
/// If this changes you must also change <see cref="FilePublisher.Props"/> as well!
8990
public FilePublisher(FileInfo f, TaskCompletionSource<IOResult> completionPromise, int chunkSize, long startPosition, int maxBuffer)
9091
{
9192
_f = f;

src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public static Props Props(
4848
if (startPosition < 0)
4949
throw new ArgumentException($"startPosition must be >= 0 (was {startPosition})", nameof(startPosition));
5050

51-
return Actor.Props.Create(() => new FileSubscriber(f, completionPromise, bufferSize, startPosition, fileMode, autoFlush, flushCommand))
51+
return Actor.Props.Create<FileSubscriber>(f, completionPromise, bufferSize, startPosition, fileMode, autoFlush, flushCommand)
5252
.WithDeploy(Deploy.Local);
5353
}
5454

@@ -72,6 +72,7 @@ public static Props Props(
7272
/// <param name="fileMode">TBD</param>
7373
/// <param name="autoFlush"></param>
7474
/// <param name="flushSignaler"></param>
75+
/// If this changes you must change <see cref="FileSubscriber.Props"/> as well!
7576
public FileSubscriber(
7677
FileInfo f,
7778
TaskCompletionSource<IOResult> completionPromise,

src/core/Akka.Streams/Implementation/IO/InputStreamPublisher.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public static Props Props(Stream inputstream, TaskCompletionSource<IOResult> com
3737
if (chunkSize <= 0)
3838
throw new ArgumentException($"chunkSize must be > 0 was {chunkSize}", nameof(chunkSize));
3939

40-
return Actor.Props.Create(()=> new InputStreamPublisher(inputstream, completionSource, chunkSize)).WithDeploy(Deploy.Local);
40+
return Actor.Props.Create<InputStreamPublisher>(inputstream, completionSource, chunkSize).WithDeploy(Deploy.Local);
4141
}
4242

4343
private struct Continue : IDeadLetterSuppression
@@ -58,6 +58,7 @@ private struct Continue : IDeadLetterSuppression
5858
/// <param name="inputstream">TBD</param>
5959
/// <param name="completionSource">TBD</param>
6060
/// <param name="chunkSize">TBD</param>
61+
/// If this gets changed you must change <see cref="InputStreamPublisher.Props"/> as well!
6162
public InputStreamPublisher(Stream inputstream, TaskCompletionSource<IOResult> completionSource, int chunkSize)
6263
{
6364
_inputstream = inputstream;

src/core/Akka.Streams/Implementation/IO/OutputStreamSubscriber.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public static Props Props(Stream os, TaskCompletionSource<IOResult> completionPr
3636
throw new ArgumentException("Buffer size must be > 0");
3737

3838
return
39-
Actor.Props.Create(() => new OutputStreamSubscriber(os, completionPromise, bufferSize, autoFlush))
39+
Actor.Props.Create<OutputStreamSubscriber>(os, completionPromise, bufferSize, autoFlush)
4040
.WithDeploy(Deploy.Local);
4141
}
4242

@@ -53,6 +53,7 @@ public static Props Props(Stream os, TaskCompletionSource<IOResult> completionPr
5353
/// <param name="completionPromise">TBD</param>
5454
/// <param name="bufferSize">TBD</param>
5555
/// <param name="autoFlush">TBD</param>
56+
/// If this gets changed you must change <see cref="OutputStreamSubscriber.Props"/> as well!
5657
public OutputStreamSubscriber(Stream outputStream, TaskCompletionSource<IOResult> completionPromise, int bufferSize, bool autoFlush)
5758
{
5859
_outputStream = outputStream;

0 commit comments

Comments
 (0)