Skip to content

Commit 23245ae

Browse files
eabaAaronontheweb
andauthored
[18-74]FlowJoinSpec (#6562)
Co-authored-by: Aaron Stannard <[email protected]>
1 parent 947e1ed commit 23245ae

File tree

1 file changed

+42
-42
lines changed

1 file changed

+42
-42
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowJoinSpec.cs

+42-42
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,15 @@ public FlowJoinSpec(ITestOutputHelper helper) : base(helper)
3030
}
3131

3232
[Fact]
33-
public void A_Flow_using_Join_must_allow_for_cycles()
33+
public async Task A_Flow_using_Join_must_allow_for_cycles()
3434
{
35-
this.AssertAllStagesStopped(() =>
36-
{
35+
await this.AssertAllStagesStoppedAsync(() => {
3736
const int end = 47;
38-
var t = Enumerable.Range(0, end + 1).GroupBy(i => i%2 == 0).ToList();
37+
var t = Enumerable.Range(0, end + 1).GroupBy(i => i % 2 == 0).ToList();
3938
var even = t.First(x => x.Key).ToList();
4039
var odd = t.First(x => !x.Key).ToList();
4140
var source = Source.From(Enumerable.Range(0, end + 1));
42-
var result = even.Concat(odd).Concat(odd.Select(x => x*10));
41+
var result = even.Concat(odd).Concat(odd.Select(x => x * 10));
4342
var probe = this.CreateManualSubscriberProbe<IEnumerable<int>>();
4443

4544
var flow1 = Flow.FromGraph(GraphDsl.Create(b =>
@@ -56,27 +55,27 @@ public void A_Flow_using_Join_must_allow_for_cycles()
5655

5756
var flow2 =
5857
Flow.Create<int>()
59-
.Where(x => x%2 == 1)
60-
.Select(x => x*10)
61-
.Buffer((end + 1)/2, OverflowStrategy.Backpressure)
62-
.Take((end + 1)/2);
58+
.Where(x => x % 2 == 1)
59+
.Select(x => x * 10)
60+
.Buffer((end + 1) / 2, OverflowStrategy.Backpressure)
61+
.Take((end + 1) / 2);
6362

6463
flow1.Join(flow2).Run(Materializer);
6564

6665
var sub = probe.ExpectSubscription();
6766
sub.Request(1);
6867
probe.ExpectNext().Should().BeEquivalentTo(result);
6968
sub.Cancel();
69+
return Task.CompletedTask;
7070
}, Materializer);
7171
}
7272

7373
[Fact]
74-
public void A_Flow_using_Join_must_allow_for_merge_cycle()
74+
public async Task A_Flow_using_Join_must_allow_for_merge_cycle()
7575
{
76-
this.AssertAllStagesStopped(() =>
77-
{
78-
var source =
79-
Source.Single("lonely traveler").MapMaterializedValue(_ => Task.FromResult(""));
76+
await this.AssertAllStagesStoppedAsync(() => {
77+
var source =
78+
Source.Single("lonely traveler").MapMaterializedValue(_ => Task.FromResult(""));
8079

8180
var flow1 = Flow.FromGraph(GraphDsl.Create(Sink.First<string>(), (b, sink) =>
8281
{
@@ -92,16 +91,16 @@ public void A_Flow_using_Join_must_allow_for_merge_cycle()
9291
var t = flow1.Join(Flow.Create<string>()).Run(Materializer);
9392
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
9493
t.Result.Should().Be("lonely traveler");
94+
return Task.CompletedTask;
9595
}, Materializer);
9696
}
9797

9898
[Fact]
99-
public void A_Flow_using_Join_must_allow_for_merge_preferred_cycle()
99+
public async Task A_Flow_using_Join_must_allow_for_merge_preferred_cycle()
100100
{
101-
this.AssertAllStagesStopped(() =>
102-
{
103-
var source =
104-
Source.Single("lonely traveler").MapMaterializedValue(_ => Task.FromResult(""));
101+
await this.AssertAllStagesStoppedAsync(() => {
102+
var source =
103+
Source.Single("lonely traveler").MapMaterializedValue(_ => Task.FromResult(""));
105104

106105
var flow1 = Flow.FromGraph(GraphDsl.Create(Sink.First<string>(), (b, sink) =>
107106
{
@@ -117,18 +116,17 @@ public void A_Flow_using_Join_must_allow_for_merge_preferred_cycle()
117116
var t = flow1.Join(Flow.Create<string>()).Run(Materializer);
118117
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
119118
t.Result.Should().Be("lonely traveler");
119+
return Task.CompletedTask;
120120
}, Materializer);
121121
}
122122

123123
[Fact]
124-
public void A_Flow_using_Join_must_allow_for_zip_cycle()
124+
public async Task A_Flow_using_Join_must_allow_for_zip_cycle()
125125
{
126-
this.AssertAllStagesStopped(() =>
127-
{
128-
var source = Source.From(new[] {"traveler1", "traveler2"})
129-
.MapMaterializedValue<TestSubscriber.Probe<(string, string)>>(_ => null);
130-
131-
var flow = Flow.FromGraph(GraphDsl.Create(this.SinkProbe<(string,string)>(), (b, sink) =>
126+
await this.AssertAllStagesStoppedAsync(() => {
127+
var source = Source.From(new[] { "traveler1", "traveler2" })
128+
.MapMaterializedValue<TestSubscriber.Probe<(string, string)>>(_ => null);
129+
var flow = Flow.FromGraph(GraphDsl.Create(this.SinkProbe<(string, string)>(), (b, sink) =>
132130
{
133131
var zip = b.Add(new Zip<string, string>());
134132
var broadcast = b.Add(new Broadcast<(string, string)>(2));
@@ -153,23 +151,24 @@ public void A_Flow_using_Join_must_allow_for_zip_cycle()
153151
var probe = flow.Join(feedback).Run(Materializer);
154152
probe.RequestNext(("traveler1", "ignition"));
155153
probe.RequestNext(("traveler2", "traveler1"));
154+
return Task.CompletedTask;
156155
}, Materializer);
157156
}
158157

159158
[Fact]
160-
public void A_Flow_using_Join_must_allow_for_concat_cycle()
159+
public async Task A_Flow_using_Join_must_allow_for_concat_cycle()
161160
{
162-
this.AssertAllStagesStopped(() =>
163-
{
164-
var flow = Flow.FromGraph(GraphDsl.Create(TestSource.SourceProbe<string>(this), Sink.First<string>(), Keep.Both, (b, source, sink) =>
165-
{
166-
var concat = b.Add(Concat.Create<string>());
167-
var broadcast = b.Add(new Broadcast<string>(2, true));
168-
169-
b.From(source).To(concat.In(0));
170-
b.From(concat.Out).To(broadcast.In);
171-
b.From(broadcast.Out(0)).To(sink);
172-
return new FlowShape<string, string>(concat.In(1), broadcast.Out(1));
161+
await this.AssertAllStagesStoppedAsync(() => {
162+
var flow =
163+
Flow.FromGraph(GraphDsl.Create(TestSource.SourceProbe<string>(this),
164+
Sink.First<string>(), Keep.Both, (b, source, sink) =>
165+
{
166+
var concat = b.Add(Concat.Create<string>());
167+
var broadcast = b.Add(new Broadcast<string>(2, true));
168+
b.From(source).To(concat.In(0));
169+
b.From(concat.Out).To(broadcast.In);
170+
b.From(broadcast.Out(0)).To(sink);
171+
return new FlowShape<string, string>(concat.In(1), broadcast.Out(1));
173172
}));
174173

175174
var tuple = flow.Join(Flow.Create<string>()).Run(Materializer);
@@ -179,14 +178,14 @@ public void A_Flow_using_Join_must_allow_for_concat_cycle()
179178
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
180179
t.Result.Should().Be("lonely traveler");
181180
probe.SendComplete();
181+
return Task.CompletedTask;
182182
}, Materializer);
183183
}
184184

185185
[Fact]
186-
public void A_Flow_using_Join_must_allow_for_interleave_cycle()
186+
public async Task A_Flow_using_Join_must_allow_for_interleave_cycle()
187187
{
188-
this.AssertAllStagesStopped(() =>
189-
{
188+
await this.AssertAllStagesStoppedAsync(() => {
190189
var source = Source.Single("lonely traveler").MapMaterializedValue(_ => Task.FromResult(""));
191190
var flow = Flow.FromGraph(GraphDsl.Create(Sink.First<string>(), (b, sink) =>
192191
{
@@ -198,10 +197,11 @@ public void A_Flow_using_Join_must_allow_for_interleave_cycle()
198197
b.From(broadcast.Out(0)).To(sink);
199198
return new FlowShape<string, string>(interleave.In(1), broadcast.Out(1));
200199
}));
201-
200+
202201
var t = flow.Join(Flow.Create<string>()).Run(Materializer);
203202
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
204203
t.Result.Should().Be("lonely traveler");
204+
return Task.CompletedTask;
205205
}, Materializer);
206206
}
207207
}

0 commit comments

Comments
 (0)