Skip to content

Commit 0e948db

Browse files
authored
[PORT] akka/akka#27266 - Propagate stream cancellation causes (#5949)
* [PORT] Akka#27266 - Propagate stream cancellation causes * Add MaybeSourceSpec * Fix tests * Update API Verify list
1 parent 1893800 commit 0e948db

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1004
-457
lines changed

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,10 @@ namespace Akka.Streams
723723
{
724724
Akka.Streams.Dsl.Source<TOut, Akka.NotUsed> Source { get; }
725725
}
726+
public interface ISubscriptionWithCancelException : Reactive.Streams.ISubscription
727+
{
728+
void Cancel(System.Exception cause);
729+
}
726730
public interface ITransformerLike<in TIn, out TOut>
727731
{
728732
bool IsComplete { get; }
@@ -919,6 +923,7 @@ namespace Akka.Streams
919923
public static readonly Akka.Streams.StreamDetachedException Instance;
920924
public StreamDetachedException() { }
921925
public StreamDetachedException(string message) { }
926+
public StreamDetachedException(string message, System.Exception innerException) { }
922927
}
923928
public class StreamLimitReachedException : System.Exception
924929
{
@@ -997,6 +1002,23 @@ namespace Akka.Streams
9971002
public StreamTcpException(string message, System.Exception innerException) { }
9981003
protected StreamTcpException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
9991004
}
1005+
public class static SubscriptionWithCancelException
1006+
{
1007+
public sealed class NoMoreElementsNeeded : Akka.Streams.SubscriptionWithCancelException.NonFailureCancellation
1008+
{
1009+
public static readonly Akka.Streams.SubscriptionWithCancelException.NoMoreElementsNeeded Instance;
1010+
}
1011+
[Akka.Annotations.DoNotInheritAttribute()]
1012+
public abstract class NonFailureCancellation : System.Exception
1013+
{
1014+
protected NonFailureCancellation() { }
1015+
public virtual string StackTrace { get; }
1016+
}
1017+
public sealed class StageWasCompleted : Akka.Streams.SubscriptionWithCancelException.NonFailureCancellation
1018+
{
1019+
public static readonly Akka.Streams.SubscriptionWithCancelException.StageWasCompleted Instance;
1020+
}
1021+
}
10001022
public enum SubstreamCancelStrategy
10011023
{
10021024
Propagate = 0,
@@ -3461,7 +3483,7 @@ namespace Akka.Streams.Implementation
34613483
public static void RequireNonNullException(System.Exception exception) { }
34623484
public static void RequireNonNullSubscriber<T>(Reactive.Streams.ISubscriber<T> subscriber) { }
34633485
public static void RequireNonNullSubscription(Reactive.Streams.ISubscription subscription) { }
3464-
public static void TryCancel(Reactive.Streams.ISubscription subscription) { }
3486+
public static void TryCancel(Reactive.Streams.ISubscription subscription, System.Exception cause) { }
34653487
public static void TryOnComplete<T>(Reactive.Streams.ISubscriber<T> subscriber) { }
34663488
public static void TryOnError<T>(Reactive.Streams.ISubscriber<T> subscriber, System.Exception cause) { }
34673489
public static void TryOnNext<T>(Reactive.Streams.ISubscriber<T> subscriber, T element) { }
@@ -3835,7 +3857,7 @@ namespace Akka.Streams.Implementation.Fusing
38353857
{
38363858
public BatchingActorInputBoundary(int size, int id) { }
38373859
public override Akka.Streams.Outlet Out { get; }
3838-
public void Cancel() { }
3860+
public void Cancel(System.Exception cause) { }
38393861
public void OnComplete() { }
38403862
public void OnError(System.Exception reason) { }
38413863
public void OnInternalError(System.Exception reason) { }
@@ -3856,17 +3878,19 @@ namespace Akka.Streams.Implementation.Fusing
38563878
public void OnNext(T element) { }
38573879
public void OnSubscribe(Reactive.Streams.ISubscription subscription) { }
38583880
}
3859-
public sealed class BoundarySubscription : Reactive.Streams.ISubscription
3881+
public sealed class BoundarySubscription : Akka.Streams.ISubscriptionWithCancelException, Reactive.Streams.ISubscription
38603882
{
38613883
public BoundarySubscription(Akka.Actor.IActorRef parent, Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, int id) { }
38623884
public void Cancel() { }
3885+
public void Cancel(System.Exception cause) { }
38633886
public void Request(long elements) { }
38643887
public override string ToString() { }
38653888
}
38663889
public struct Cancel : Akka.Actor.INoSerializationVerificationNeeded, Akka.Event.IDeadLetterSuppression, Akka.Streams.Implementation.Fusing.ActorGraphInterpreter.IBoundaryEvent
38673890
{
38683891
public readonly int Id;
3869-
public Cancel(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, int id) { }
3892+
public Cancel(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, int id, System.Exception cause) { }
3893+
public System.Exception Cause { get; }
38703894
public Akka.Streams.Implementation.Fusing.GraphInterpreterShell Shell { get; }
38713895
}
38723896
public struct ExposedPublisher : Akka.Actor.INoSerializationVerificationNeeded, Akka.Event.IDeadLetterSuppression, Akka.Streams.Implementation.Fusing.ActorGraphInterpreter.IBoundaryEvent
@@ -4063,6 +4087,11 @@ namespace Akka.Streams.Implementation.Fusing
40634087
public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IInHandler handler) { }
40644088
public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IOutHandler handler) { }
40654089
public override string ToString() { }
4090+
public sealed class Cancelled
4091+
{
4092+
public readonly System.Exception Cause;
4093+
public Cancelled(System.Exception cause) { }
4094+
}
40664095
[Akka.Annotations.InternalApiAttribute()]
40674096
public sealed class Connection
40684097
{
@@ -4627,7 +4656,7 @@ namespace Akka.Streams.Stage
46274656
protected AbstractStage() { }
46284657
protected virtual bool IsDetached { get; }
46294658
public virtual Akka.Streams.Supervision.Directive Decide(System.Exception cause) { }
4630-
public abstract Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context);
4659+
public abstract Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context, System.Exception cause);
46314660
public abstract Akka.Streams.Stage.IDirective OnPull(Akka.Streams.Stage.IContext context);
46324661
public abstract Akka.Streams.Stage.IDirective OnPush(TIn element, Akka.Streams.Stage.IContext context);
46334662
public abstract Akka.Streams.Stage.ITerminationDirective OnUpstreamFailure(System.Exception cause, Akka.Streams.Stage.IContext context);
@@ -4644,8 +4673,8 @@ namespace Akka.Streams.Stage
46444673
{
46454674
protected TContext Context;
46464675
protected AbstractStage() { }
4647-
public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context) { }
4648-
public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(TContext context) { }
4676+
public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context, System.Exception cause) { }
4677+
public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(TContext context, System.Exception cause) { }
46494678
public abstract TPullDirective OnPull(TContext context);
46504679
public override Akka.Streams.Stage.IDirective OnPull(Akka.Streams.Stage.IContext context) { }
46514680
public abstract TPushDirective OnPush(TIn element, TContext context);
@@ -4666,7 +4695,7 @@ namespace Akka.Streams.Stage
46664695
public class ConditionalTerminateOutput : Akka.Streams.Stage.OutHandler
46674696
{
46684697
public ConditionalTerminateOutput(System.Func<bool> predicate) { }
4669-
public override void OnDownstreamFinish() { }
4698+
public override void OnDownstreamFinish(System.Exception cause) { }
46704699
public override void OnPull() { }
46714700
}
46724701
[System.ObsoleteAttribute("Please use GraphStage instead. [1.1.0]")]
@@ -4713,7 +4742,9 @@ namespace Akka.Streams.Stage
47134742
protected void AbortReading<T>(Akka.Streams.Inlet<T> inlet) { }
47144743
protected virtual void AfterPostStop() { }
47154744
protected virtual void BeforePreStart() { }
4745+
protected void Cancel<T>(Akka.Streams.Inlet<T> inlet, System.Exception cause) { }
47164746
protected void Cancel<T>(Akka.Streams.Inlet<T> inlet) { }
4747+
public void CancelStage(System.Exception cause) { }
47174748
protected void Complete<T>(Akka.Streams.Outlet<T> outlet) { }
47184749
public void CompleteStage() { }
47194750
public static Akka.Streams.Stage.InHandler ConditionalTerminateInput(System.Func<bool> predicate) { }
@@ -4735,6 +4766,8 @@ namespace Akka.Streams.Stage
47354766
protected Akka.Streams.Stage.StageActor GetStageActor(Akka.Streams.Stage.StageActorRef.Receive receive) { }
47364767
protected T Grab<T>(Akka.Streams.Inlet<T> inlet) { }
47374768
protected bool HasBeenPulled<T>(Akka.Streams.Inlet<T> inlet) { }
4769+
[Akka.Annotations.InternalApiAttribute()]
4770+
public void InternalOnDownstreamFinish(System.Exception cause) { }
47384771
protected bool IsAvailable<T>(Akka.Streams.Inlet<T> inlet) { }
47394772
protected bool IsAvailable<T>(Akka.Streams.Outlet<T> outlet) { }
47404773
protected bool IsClosed<T>(Akka.Streams.Inlet<T> inlet) { }
@@ -4750,7 +4783,7 @@ namespace Akka.Streams.Stage
47504783
protected void SetHandler<T>(Akka.Streams.Inlet<T> inlet, Akka.Streams.Stage.IInHandler handler) { }
47514784
protected void SetHandler<T>(Akka.Streams.Inlet<T> inlet, System.Action onPush, System.Action onUpstreamFinish = null, System.Action<System.Exception> onUpstreamFailure = null) { }
47524785
protected void SetHandler<T>(Akka.Streams.Outlet<T> outlet, Akka.Streams.Stage.IOutHandler handler) { }
4753-
protected void SetHandler<T>(Akka.Streams.Outlet<T> outlet, System.Action onPull, System.Action onDownstreamFinish = null) { }
4786+
protected void SetHandler<T>(Akka.Streams.Outlet<T> outlet, System.Action onPull, System.Action<System.Exception> onDownstreamFinish = null) { }
47544787
[System.ObsoleteAttribute("Use method `SetHandlers` instead. Will be removed in v1.5")]
47554788
protected void SetHandler<TIn, TOut>(Akka.Streams.Inlet<TIn> inlet, Akka.Streams.Outlet<TOut> outlet, Akka.Streams.Stage.InAndOutGraphStageLogic handler) { }
47564789
protected void SetHandlers<TIn, TOut>(Akka.Streams.Inlet<TIn> inlet, Akka.Streams.Outlet<TOut> outlet, Akka.Streams.Stage.InAndOutGraphStageLogic handler) { }
@@ -4765,8 +4798,8 @@ namespace Akka.Streams.Stage
47654798
}
47664799
protected sealed class LambdaOutHandler : Akka.Streams.Stage.OutHandler
47674800
{
4768-
public LambdaOutHandler(System.Action onPull, System.Action onDownstreamFinish = null) { }
4769-
public override void OnDownstreamFinish() { }
4801+
public LambdaOutHandler(System.Action onPull, System.Action<System.Exception> onDownstreamFinish = null) { }
4802+
public override void OnDownstreamFinish(System.Exception cause) { }
47704803
public override void OnPull() { }
47714804
}
47724805
[Akka.Annotations.InternalApiAttribute()]
@@ -4778,6 +4811,7 @@ namespace Akka.Streams.Stage
47784811
public bool IsClosed { get; }
47794812
public Akka.Streams.IGraph<Akka.Streams.SinkShape<T>, Akka.NotUsed> Sink { get; }
47804813
public void Cancel() { }
4814+
public void Cancel(System.Exception cause) { }
47814815
public T Grab() { }
47824816
public void Pull() { }
47834817
public void SetHandler(Akka.Streams.Stage.IInHandler handler) { }
@@ -4838,6 +4872,7 @@ namespace Akka.Streams.Stage
48384872
Akka.Streams.Stage.ITerminationDirective AbsorbTermination();
48394873
Akka.Streams.Stage.FreeDirective Fail(System.Exception cause);
48404874
Akka.Streams.Stage.FreeDirective Finish();
4875+
Akka.Streams.Stage.FreeDirective Finish(System.Exception cause);
48414876
Akka.Streams.Stage.IUpstreamDirective Pull();
48424877
Akka.Streams.Stage.IDownstreamDirective Push(object element);
48434878
Akka.Streams.Stage.IDownstreamDirective PushAndFinish(object element);
@@ -4888,7 +4923,7 @@ namespace Akka.Streams.Stage
48884923
}
48894924
public interface IOutHandler
48904925
{
4891-
void OnDownstreamFinish();
4926+
void OnDownstreamFinish(System.Exception cause);
48924927
void OnPull();
48934928
}
48944929
public interface IStageLogging
@@ -4909,14 +4944,14 @@ namespace Akka.Streams.Stage
49094944
public sealed class IgnoreTerminateOutput : Akka.Streams.Stage.OutHandler
49104945
{
49114946
public static readonly Akka.Streams.Stage.IgnoreTerminateOutput Instance;
4912-
public override void OnDownstreamFinish() { }
4947+
public override void OnDownstreamFinish(System.Exception cause) { }
49134948
public override void OnPull() { }
49144949
}
49154950
public abstract class InAndOutGraphStageLogic : Akka.Streams.Stage.GraphStageLogic, Akka.Streams.Stage.IInHandler, Akka.Streams.Stage.IOutHandler
49164951
{
49174952
protected InAndOutGraphStageLogic(int inCount, int outCount) { }
49184953
protected InAndOutGraphStageLogic(Akka.Streams.Shape shape) { }
4919-
public virtual void OnDownstreamFinish() { }
4954+
public virtual void OnDownstreamFinish(System.Exception cause) { }
49204955
public abstract void OnPull();
49214956
public abstract void OnPush();
49224957
public virtual void OnUpstreamFailure(System.Exception e) { }
@@ -4925,7 +4960,7 @@ namespace Akka.Streams.Stage
49254960
public abstract class InAndOutHandler : Akka.Streams.Stage.IInHandler, Akka.Streams.Stage.IOutHandler
49264961
{
49274962
protected InAndOutHandler() { }
4928-
public virtual void OnDownstreamFinish() { }
4963+
public virtual void OnDownstreamFinish(System.Exception cause) { }
49294964
public abstract void OnPull();
49304965
public abstract void OnPush();
49314966
public virtual void OnUpstreamFailure(System.Exception e) { }
@@ -4956,13 +4991,13 @@ namespace Akka.Streams.Stage
49564991
{
49574992
protected OutGraphStageLogic(int inCount, int outCount) { }
49584993
protected OutGraphStageLogic(Akka.Streams.Shape shape) { }
4959-
public virtual void OnDownstreamFinish() { }
4994+
public virtual void OnDownstreamFinish(System.Exception cause) { }
49604995
public abstract void OnPull();
49614996
}
49624997
public abstract class OutHandler : Akka.Streams.Stage.IOutHandler
49634998
{
49644999
protected OutHandler() { }
4965-
public virtual void OnDownstreamFinish() { }
5000+
public virtual void OnDownstreamFinish(System.Exception cause) { }
49665001
public abstract void OnPull();
49675002
}
49685003
public class PushPullGraphStageWithMaterializedValue<TIn, TOut, TMat> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.FlowShape<TIn, TOut>, TMat>

0 commit comments

Comments
 (0)