Skip to content

Commit fb507b1

Browse files
[BACKPORT] move Move Channel Stages from Alpakka to main project. (#6317)
* Move Channel Stages from Alpakka to main project. (#6268) * Move Channel Stages from Alpakka to main project. * added API approvals Co-authored-by: Aaron Stannard <[email protected]> * added API approvals Co-authored-by: Drew <[email protected]>
1 parent 791bd10 commit fb507b1

File tree

9 files changed

+815
-0
lines changed

9 files changed

+815
-0
lines changed

src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1278,6 +1278,16 @@ namespace Akka.Streams.Dsl
12781278
public Akka.Streams.Outlet<T> Out(int id) { }
12791279
public override string ToString() { }
12801280
}
1281+
public class static ChannelSink
1282+
{
1283+
public static Akka.Streams.Dsl.Sink<T, System.Threading.Channels.ChannelReader<T>> AsReader<T>(int bufferSize, bool singleReader = False, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { }
1284+
public static Akka.Streams.Dsl.Sink<T, Akka.NotUsed> FromWriter<T>(System.Threading.Channels.ChannelWriter<T> writer, bool isOwner) { }
1285+
}
1286+
public class static ChannelSource
1287+
{
1288+
public static Akka.Streams.Dsl.Source<T, System.Threading.Channels.ChannelWriter<T>> Create<T>(int bufferSize, bool singleWriter = False, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { }
1289+
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> FromReader<T>(System.Threading.Channels.ChannelReader<T> reader) { }
1290+
}
12811291
public class static Concat
12821292
{
12831293
public static Akka.Streams.IGraph<Akka.Streams.UniformFanInShape<T, T>, Akka.NotUsed> Create<T>(int inputPorts = 2) { }
@@ -1914,6 +1924,7 @@ namespace Akka.Streams.Dsl
19141924
public static Akka.Streams.Dsl.Sink<T, System.IObservable<T>> AsObservable<T>() { }
19151925
public static Akka.Streams.Dsl.Sink<TIn, Reactive.Streams.IPublisher<TIn>> AsPublisher<TIn>(bool fanout) { }
19161926
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> Cancelled<TIn>() { }
1927+
public static Akka.Streams.Dsl.Sink<T, System.Threading.Channels.ChannelReader<T>> ChannelReader<T>(int bufferSize, bool singleReader, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { }
19171928
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> Combine<TIn, TOut, TMat>(System.Func<int, Akka.Streams.IGraph<Akka.Streams.UniformFanOutShape<TIn, TOut>, TMat>> strategy, Akka.Streams.Dsl.Sink<TOut, Akka.NotUsed> first, Akka.Streams.Dsl.Sink<TOut, Akka.NotUsed> second, params Akka.Streams.Dsl.Sink<, >[] rest) { }
19181929
public static Akka.Streams.Dsl.Sink<TIn, object> Create<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
19191930
[Akka.Annotations.InternalApiAttribute()]
@@ -1925,6 +1936,7 @@ namespace Akka.Streams.Dsl
19251936
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
19261937
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
19271938
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
1939+
public static Akka.Streams.Dsl.Sink<T, Akka.NotUsed> FromWriter<T>(System.Threading.Channels.ChannelWriter<T> writer, bool isOwner) { }
19281940
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task> Ignore<TIn>() { }
19291941
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> Last<TIn>() { }
19301942
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> LastOrDefault<TIn>() { }
@@ -1970,6 +1982,8 @@ namespace Akka.Streams.Dsl
19701982
public static Akka.Streams.Dsl.Source<T, Akka.Actor.IActorRef> ActorPublisher<T>(Akka.Actor.Props props) { }
19711983
public static Akka.Streams.Dsl.Source<T, Akka.Actor.IActorRef> ActorRef<T>(int bufferSize, Akka.Streams.OverflowStrategy overflowStrategy) { }
19721984
public static Akka.Streams.Dsl.Source<T, Reactive.Streams.ISubscriber<T>> AsSubscriber<T>() { }
1985+
public static Akka.Streams.Dsl.Source<T, System.Threading.Channels.ChannelWriter<T>> Channel<T>(int bufferSize, bool singleWriter = False, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { }
1986+
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> ChannelReader<T>(System.Threading.Channels.ChannelReader<T> channelReader) { }
19731987
public static Akka.Streams.Dsl.Source<TOut2, Akka.NotUsed> Combine<T, TOut2>(Akka.Streams.Dsl.Source<T, Akka.NotUsed> first, Akka.Streams.Dsl.Source<T, Akka.NotUsed> second, System.Func<int, Akka.Streams.IGraph<Akka.Streams.UniformFanInShape<T, TOut2>, Akka.NotUsed>> strategy, params Akka.Streams.Dsl.Source<, >[] rest) { }
19741988
public static Akka.Streams.Dsl.Source<TOut2, TMatOut> CombineMaterialized<T, TOut2, TMat1, TMat2, TMatOut>(Akka.Streams.Dsl.Source<T, TMat1> first, Akka.Streams.Dsl.Source<T, TMat2> second, System.Func<int, Akka.Streams.IGraph<Akka.Streams.UniformFanInShape<T, TOut2>, Akka.NotUsed>> strategy, System.Func<TMat1, TMat2, TMatOut> combineMaterializers) { }
19751989
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Cycle<T>(System.Func<System.Collections.Generic.IEnumerator<T>> enumeratorFactory) { }
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
// //-----------------------------------------------------------------------
2+
// // <copyright file="ChannelSinkSpec.cs" company="Akka.NET Project">
3+
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// // </copyright>
6+
// //-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Threading;
10+
using System.Threading.Channels;
11+
using System.Threading.Tasks;
12+
using Akka.Streams.Dsl;
13+
using Akka.Streams.TestKit;
14+
using FluentAssertions;
15+
using FluentAssertions.Extensions;
16+
using Xunit;
17+
using Xunit.Abstractions;
18+
19+
namespace Akka.Streams.Tests.Implementation
20+
{
21+
public class ChannelSinkSpec : Akka.TestKit.Xunit2.TestKit
22+
{
23+
private readonly ActorMaterializer _materializer;
24+
25+
public ChannelSinkSpec(ITestOutputHelper output) : base(output: output)
26+
{
27+
_materializer = Sys.Materializer();
28+
}
29+
30+
#region from writer
31+
32+
[Fact]
33+
public async Task ChannelSink_writer_when_isOwner_should_complete_channel_with_success_when_upstream_completes()
34+
{
35+
var probe = this.CreateManualPublisherProbe<int>();
36+
var channel = Channel.CreateBounded<int>(10);
37+
38+
Source.FromPublisher(probe)
39+
.To(ChannelSink.FromWriter(channel.Writer, true))
40+
.Run(_materializer);
41+
42+
var subscription = probe.ExpectSubscription();
43+
subscription.SendComplete();
44+
45+
channel.Reader.Completion.Wait(1.Seconds()).Should().BeTrue();
46+
}
47+
48+
[Fact]
49+
public async Task ChannelSink_writer_isOwner_should_complete_channel_with_failure_when_upstream_fails()
50+
{
51+
var exception = new Exception("BOOM!");
52+
53+
try
54+
{
55+
var probe = this.CreateManualPublisherProbe<int>();
56+
var channel = Channel.CreateBounded<int>(10);
57+
58+
Source.FromPublisher(probe)
59+
.To(ChannelSink.FromWriter(channel.Writer, true))
60+
.Run(_materializer);
61+
62+
var subscription = probe.ExpectSubscription();
63+
subscription.SendError(exception);
64+
65+
await channel.Reader.Completion;
66+
}
67+
catch (Exception e)
68+
{
69+
e.Should().Be(exception);
70+
}
71+
}
72+
73+
[Fact]
74+
public async Task ChannelSink_writer_when_NOT_owner_should_leave_channel_active()
75+
{
76+
var probe = this.CreateManualPublisherProbe<int>();
77+
var channel = Channel.CreateBounded<int>(10);
78+
79+
Source.FromPublisher(probe)
80+
.To(ChannelSink.FromWriter(channel.Writer, false))
81+
.Run(_materializer);
82+
83+
var subscription = probe.ExpectSubscription();
84+
subscription.SendComplete();
85+
86+
channel.Reader.Completion.Wait(TimeSpan.FromSeconds(1)).Should().BeFalse();
87+
88+
var cancel = new CancellationTokenSource(TimeSpan.FromSeconds(5));
89+
await channel.Writer.WriteAsync(11, cancel.Token);
90+
var value = await channel.Reader.ReadAsync(cancel.Token);
91+
value.Should().Be(11);
92+
}
93+
94+
[Fact]
95+
public async Task ChannelSink_writer_NOT_owner_should_leave_channel_active()
96+
{
97+
var exception = new Exception("BOOM!");
98+
99+
var probe = this.CreateManualPublisherProbe<int>();
100+
var channel = Channel.CreateBounded<int>(10);
101+
102+
Source.FromPublisher(probe)
103+
.To(ChannelSink.FromWriter(channel.Writer, false))
104+
.Run(_materializer);
105+
106+
var subscription = probe.ExpectSubscription();
107+
subscription.SendError(exception);
108+
109+
channel.Reader.Completion.Wait(TimeSpan.FromSeconds(1)).Should().BeFalse();
110+
111+
var cancel = new CancellationTokenSource(TimeSpan.FromSeconds(5));
112+
await channel.Writer.WriteAsync(11, cancel.Token);
113+
var value = await channel.Reader.ReadAsync(cancel.Token);
114+
value.Should().Be(11);
115+
}
116+
117+
[Fact]
118+
public async Task ChannelSink_writer_should_propagate_elements_to_channel()
119+
{
120+
var probe = this.CreateManualPublisherProbe<int>();
121+
var channel = Channel.CreateBounded<int>(10);
122+
123+
Source.FromPublisher(probe)
124+
.To(ChannelSink.FromWriter(channel.Writer, true))
125+
.Run(_materializer);
126+
127+
var cancel = new CancellationTokenSource(TimeSpan.FromSeconds(5));
128+
var subscription = probe.ExpectSubscription();
129+
var n = subscription.ExpectRequest();
130+
131+
Sys.Log.Info("Requested for {0} elements", n);
132+
133+
var i = 1;
134+
135+
for (; i <= n; i++)
136+
subscription.SendNext(i);
137+
138+
for (int j = 0; j < n; j++)
139+
{
140+
var value = await channel.Reader.ReadAsync(cancel.Token);
141+
value.Should().Be(j + 1);
142+
}
143+
144+
var m = subscription.ExpectRequest() + n;
145+
Sys.Log.Info("Requested for {0} elements", m - n);
146+
147+
for (; i <= m; i++)
148+
{
149+
subscription.SendNext(i);
150+
var value = await channel.Reader.ReadAsync(cancel.Token);
151+
value.Should().Be(i);
152+
}
153+
}
154+
155+
#endregion
156+
157+
#region as reader
158+
159+
[Fact]
160+
public async Task ChannelSink_reader_should_complete_channel_with_success_when_upstream_completes()
161+
{
162+
var probe = this.CreateManualPublisherProbe<int>();
163+
164+
var reader = Source.FromPublisher(probe)
165+
.ToMaterialized(ChannelSink.AsReader<int>(10), Keep.Right)
166+
.Run(_materializer);
167+
168+
var subscription = probe.ExpectSubscription();
169+
subscription.SendComplete();
170+
171+
reader.Completion.Wait(1.Seconds()).Should().BeTrue();
172+
}
173+
174+
[Fact]
175+
public async Task ChannelSink_reader_should_complete_channel_with_failure_when_upstream_fails()
176+
{
177+
var exception = new Exception("BOOM!");
178+
179+
try
180+
{
181+
var probe = this.CreateManualPublisherProbe<int>();
182+
183+
var reader = Source.FromPublisher(probe)
184+
.ToMaterialized(ChannelSink.AsReader<int>(10), Keep.Right)
185+
.Run(_materializer);
186+
187+
var subscription = probe.ExpectSubscription();
188+
subscription.SendError(exception);
189+
190+
await reader.Completion;
191+
}
192+
catch (Exception e)
193+
{
194+
e.Should().Be(exception);
195+
}
196+
}
197+
198+
[Fact]
199+
public async Task ChannelSink_reader_should_propagate_elements_to_channel()
200+
{
201+
var probe = this.CreateManualPublisherProbe<int>();
202+
203+
var reader = Source.FromPublisher(probe)
204+
.ToMaterialized(ChannelSink.AsReader<int>(10), Keep.Right)
205+
.Run(_materializer);
206+
207+
var cancel = new CancellationTokenSource(TimeSpan.FromSeconds(5));
208+
var subscription = probe.ExpectSubscription();
209+
var n = subscription.ExpectRequest();
210+
211+
Sys.Log.Info("Requested for {0} elements", n);
212+
213+
var i = 1;
214+
215+
for (; i <= n; i++)
216+
subscription.SendNext(i);
217+
218+
for (int j = 0; j < n; j++)
219+
{
220+
Sys.Log.Info("Request: {0}",j);
221+
var value = await reader.ReadAsync(cancel.Token);
222+
Sys.Log.Info("Received: {0}",value);
223+
value.Should().Be(j + 1);
224+
}
225+
226+
var m = subscription.ExpectRequest() + n;
227+
Sys.Log.Info("Requested for {0} elements", m - n);
228+
229+
for (; i <= m; i++)
230+
{
231+
subscription.SendNext(i);
232+
var value = await reader.ReadAsync(cancel.Token);
233+
value.Should().Be(i);
234+
}
235+
}
236+
237+
#endregion
238+
}
239+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// //-----------------------------------------------------------------------
2+
// // <copyright file="ChannelSourceSpec.cs" company="Akka.NET Project">
3+
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// // </copyright>
6+
// //-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Threading;
10+
using System.Threading.Channels;
11+
using System.Threading.Tasks;
12+
using Akka.Streams.Dsl;
13+
using Akka.Streams.TestKit;
14+
using FluentAssertions;
15+
using Xunit;
16+
using Xunit.Abstractions;
17+
18+
namespace Akka.Streams.Tests.Implementation
19+
{
20+
public class ChannelSourceSpec : Akka.TestKit.Xunit2.TestKit
21+
{
22+
private readonly ActorMaterializer _materializer;
23+
24+
public ChannelSourceSpec(ITestOutputHelper output) : base(output: output)
25+
{
26+
_materializer = Sys.Materializer();
27+
}
28+
29+
[Fact]
30+
public void ChannelSource_must_complete_after_channel_completes()
31+
{
32+
var channel = Channel.CreateUnbounded<int>();
33+
var probe = this.CreateManualSubscriberProbe<int>();
34+
35+
ChannelSource.FromReader<int>(channel)
36+
.To(Sink.FromSubscriber(probe))
37+
.Run(_materializer);
38+
39+
var subscription = probe.ExpectSubscription();
40+
subscription.Request(2);
41+
42+
channel.Writer.Complete();
43+
44+
probe.ExpectComplete();
45+
}
46+
47+
48+
[Fact]
49+
public void ChannelSource_must_complete_after_channel_fails()
50+
{
51+
var channel = Channel.CreateUnbounded<int>();
52+
var probe = this.CreateManualSubscriberProbe<int>();
53+
var failure = new Exception("BOOM!");
54+
55+
ChannelSource.FromReader<int>(channel)
56+
.To(Sink.FromSubscriber(probe))
57+
.Run(_materializer);
58+
59+
var subscription = probe.ExpectSubscription();
60+
subscription.Request(2);
61+
62+
channel.Writer.Complete(failure);
63+
64+
probe.ExpectError().InnerException.Should().Be(failure);
65+
}
66+
67+
[Fact]
68+
public async Task ChannelSource_must_read_incoming_events()
69+
{
70+
var tcs = new CancellationTokenSource(TimeSpan.FromSeconds(5));
71+
var channel = Channel.CreateBounded<int>(3);
72+
await channel.Writer.WriteAsync(1, tcs.Token);
73+
await channel.Writer.WriteAsync(2, tcs.Token);
74+
await channel.Writer.WriteAsync(3, tcs.Token);
75+
76+
var probe = this.CreateManualSubscriberProbe<int>();
77+
78+
ChannelSource.FromReader<int>(channel)
79+
.To(Sink.FromSubscriber(probe))
80+
.Run(_materializer);
81+
82+
var subscription = probe.ExpectSubscription();
83+
subscription.Request(5);
84+
85+
probe.ExpectNext(1);
86+
probe.ExpectNext(2);
87+
88+
await channel.Writer.WriteAsync(4, tcs.Token);
89+
await channel.Writer.WriteAsync(5, tcs.Token);
90+
91+
probe.ExpectNext(3);
92+
probe.ExpectNext(4);
93+
probe.ExpectNext(5);
94+
}
95+
}
96+
}

0 commit comments

Comments
 (0)