Skip to content

Commit f48394e

Browse files
eabaAaronontheweb
andauthored
[9-74]FlowConcatSpec (#6553)
* [9-74]`FlowConcatSpec` * Changes to `async` TestKit --------- Co-authored-by: Aaron Stannard <[email protected]>
1 parent 5164c8d commit f48394e

File tree

1 file changed

+77
-78
lines changed

1 file changed

+77
-78
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowConcatSpec.cs

+77-78
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ protected override TestSubscriber.Probe<int> Setup(IPublisher<int> p1, IPublishe
3636
}
3737

3838
[Fact]
39-
public void A_Concat_for_Flow_must_be_able_to_concat_Flow_with_Source()
39+
public async Task A_Concat_for_Flow_must_be_able_to_concat_Flow_with_Source()
4040
{
4141
var f1 = Flow.Create<int>().Select(x => x + "-s");
4242
var s1 = Source.From(new[] {1, 2, 3});
@@ -48,14 +48,17 @@ public void A_Concat_for_Flow_must_be_able_to_concat_Flow_with_Source()
4848
var res = f1.Concat(s2).RunWith(s1, subSink, Materializer).Item2;
4949

5050
res.Subscribe(subs);
51-
var sub = subs.ExpectSubscription();
51+
var sub = await subs.ExpectSubscriptionAsync();
5252
sub.Request(9);
53-
Enumerable.Range(1, 6).ForEach(e=>subs.ExpectNext(e + "-s"));
54-
subs.ExpectComplete();
53+
54+
foreach (var e in Enumerable.Range(1, 6))
55+
await subs.ExpectNextAsync(e + "-s");
56+
57+
await subs.ExpectCompleteAsync();
5558
}
5659

5760
[Fact]
58-
public void A_Concat_for_Flow_must_be_able_to_prepend_a_Source_to_a_Flow()
61+
public async Task A_Concat_for_Flow_must_be_able_to_prepend_a_Source_to_a_Flow()
5962
{
6063
var s1 = Source.From(new[] { 1, 2, 3 }).Select(x => x + "-s");
6164
var s2 = Source.From(new[] { 4, 5, 6 });
@@ -67,48 +70,56 @@ public void A_Concat_for_Flow_must_be_able_to_prepend_a_Source_to_a_Flow()
6770
var res = f2.Prepend(s1).RunWith(s2, subSink, Materializer).Item2;
6871

6972
res.Subscribe(subs);
70-
var sub = subs.ExpectSubscription();
73+
var sub = await subs.ExpectSubscriptionAsync();
7174
sub.Request(9);
72-
Enumerable.Range(1, 6).ForEach(e => subs.ExpectNext(e + "-s"));
73-
subs.ExpectComplete();
75+
76+
foreach (var e in Enumerable.Range(1, 6))
77+
await subs.ExpectNextAsync(e + "-s");
78+
79+
await subs.ExpectCompleteAsync();
7480
}
7581

7682
[Fact]
77-
public void A_Concat_for_Flow_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
83+
public async Task A_Concat_for_Flow_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
7884
{
79-
this.AssertAllStagesStopped(() =>
80-
{
85+
await this.AssertAllStagesStoppedAsync(async() => {
8186
var subscriber1 = Setup(CompletedPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
82-
var subscription1 = subscriber1.ExpectSubscription();
87+
var subscription1 = await subscriber1.ExpectSubscriptionAsync();
8388
subscription1.Request(5);
84-
Enumerable.Range(1, 4).ForEach(x => subscriber1.ExpectNext(x));
85-
subscriber1.ExpectComplete();
89+
90+
foreach (var x in Enumerable.Range(1, 4))
91+
await subscriber1.ExpectNextAsync(x);
92+
93+
await subscriber1.ExpectCompleteAsync();
8694

8795
var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), CompletedPublisher<int>());
88-
var subscription2 = subscriber2.ExpectSubscription();
96+
var subscription2 = await subscriber2.ExpectSubscriptionAsync();
8997
subscription2.Request(5);
90-
Enumerable.Range(1, 4).ForEach(x => subscriber2.ExpectNext(x));
91-
subscriber2.ExpectComplete();
98+
99+
foreach (var x in Enumerable.Range(1, 4))
100+
await subscriber2.ExpectNextAsync(x);
101+
102+
await subscriber2.ExpectCompleteAsync();
103+
92104
}, Materializer);
93105
}
94106

95107
[Fact]
96-
public void A_Concat_for_Flow_must_work_with_one_immediately_failed_and_one_nonempty_publisher()
108+
public async Task A_Concat_for_Flow_must_work_with_one_immediately_failed_and_one_nonempty_publisher()
97109
{
98-
this.AssertAllStagesStopped(() =>
99-
{
110+
await this.AssertAllStagesStoppedAsync(() => {
100111
var subscriber = Setup(FailedPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
101112
subscriber.ExpectSubscriptionAndError().Should().BeOfType<TestException>();
113+
return Task.CompletedTask;
102114
}, Materializer);
103115
}
104116

105117
[Fact]
106-
public void A_Concat_for_Flow_must_work_with_one_nonempty_and_one_immediately_failed_publisher()
118+
public async Task A_Concat_for_Flow_must_work_with_one_nonempty_and_one_immediately_failed_publisher()
107119
{
108-
this.AssertAllStagesStopped(() =>
109-
{
120+
await this.AssertAllStagesStoppedAsync(async() => {
110121
var subscriber = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), FailedPublisher<int>());
111-
subscriber.ExpectSubscription().Request(5);
122+
(await subscriber.ExpectSubscriptionAsync()).Request(5);
112123

113124
var errorSignalled = Enumerable.Range(1, 4)
114125
.Aggregate(false, (b, e) => b || subscriber.ExpectNextOrError() is TestException);
@@ -118,22 +129,21 @@ public void A_Concat_for_Flow_must_work_with_one_nonempty_and_one_immediately_fa
118129
}
119130

120131
[Fact]
121-
public void A_Concat_for_Flow_must_work_with_one_delayed_failed_and_one_nonempty_publisher()
132+
public async Task A_Concat_for_Flow_must_work_with_one_delayed_failed_and_one_nonempty_publisher()
122133
{
123-
this.AssertAllStagesStopped(() =>
124-
{
134+
await this.AssertAllStagesStoppedAsync(() => {
125135
var subscriber = Setup(SoonToFailPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
126136
subscriber.ExpectSubscriptionAndError().Should().BeOfType<TestException>();
137+
return Task.CompletedTask;
127138
}, Materializer);
128139
}
129140

130141
[Fact]
131-
public void A_Concat_for_Flow_must_work_with_one_nonempty_and_one_delayed_failed_publisher()
142+
public async Task A_Concat_for_Flow_must_work_with_one_nonempty_and_one_delayed_failed_publisher()
132143
{
133-
this.AssertAllStagesStopped(() =>
134-
{
144+
await this.AssertAllStagesStoppedAsync(async() => {
135145
var subscriber = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), SoonToFailPublisher<int>());
136-
subscriber.ExpectSubscription().Request(5);
146+
(await subscriber.ExpectSubscriptionAsync()).Request(5);
137147

138148
var errorSignalled = Enumerable.Range(1, 4)
139149
.Aggregate(false, (b, e) => b || subscriber.ExpectNextOrError() is TestException);
@@ -143,54 +153,56 @@ public void A_Concat_for_Flow_must_work_with_one_nonempty_and_one_delayed_failed
143153
}
144154

145155
[Fact]
146-
public void A_Concat_for_Flow_must_correctly_handle_async_errors_in_secondary_upstream()
156+
public async Task A_Concat_for_Flow_must_correctly_handle_async_errors_in_secondary_upstream()
147157
{
148-
this.AssertAllStagesStopped(() =>
149-
{
158+
await this.AssertAllStagesStoppedAsync(async() => {
150159
var promise = new TaskCompletionSource<int>();
151160
var subscriber = this.CreateManualSubscriberProbe<int>();
152161
Source.From(Enumerable.Range(1, 3))
153162
.Concat(Source.FromTask(promise.Task))
154163
.RunWith(Sink.FromSubscriber(subscriber), Materializer);
155164

156-
var subscription = subscriber.ExpectSubscription();
165+
var subscription = await subscriber.ExpectSubscriptionAsync();
157166
subscription.Request(4);
158-
Enumerable.Range(1, 3).ForEach(x => subscriber.ExpectNext(x));
167+
168+
foreach (var x in Enumerable.Range(1, 3))
169+
await subscriber.ExpectNextAsync(x);
170+
159171
promise.SetException(TestException());
160172
subscriber.ExpectError().Should().BeOfType<TestException>();
173+
161174
}, Materializer);
162175
}
163176

164177
[Fact]
165-
public void A_Concat_for_Flow_must_work_with_Source_DSL()
178+
public async Task A_Concat_for_Flow_must_work_with_Source_DSL()
166179
{
167-
this.AssertAllStagesStopped(() =>
168-
{
169-
var testSource =
170-
Source.From(Enumerable.Range(1, 5))
171-
.ConcatMaterialized(Source.From(Enumerable.Range(6, 5)), Keep.Both)
172-
.Grouped(1000);
180+
await this.AssertAllStagesStoppedAsync(() => {
181+
var testSource =
182+
Source.From(Enumerable.Range(1, 5))
183+
.ConcatMaterialized(Source.From(Enumerable.Range(6, 5)), Keep.Both)
184+
.Grouped(1000);
173185
var task = testSource.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
174186
task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
175-
task.Result.Should().BeEquivalentTo(Enumerable.Range(1,10));
187+
task.Result.Should().BeEquivalentTo(Enumerable.Range(1, 10));
176188

177189
var runnable = testSource.ToMaterialized(Sink.Ignore<IEnumerable<int>>(), Keep.Left);
178190
var t = runnable.Run(Materializer);
179191
t.Item1.Should().BeOfType<NotUsed>();
180192
t.Item2.Should().BeOfType<NotUsed>();
181193

182194
runnable.MapMaterializedValue(_ => "boo").Run(Materializer).Should().Be("boo");
195+
return Task.CompletedTask;
183196
}, Materializer);
184197
}
185198

186199
[Fact]
187-
public void A_Concat_for_Flow_must_work_with_Flow_DSL()
200+
public async Task A_Concat_for_Flow_must_work_with_Flow_DSL()
188201
{
189-
this.AssertAllStagesStopped(() =>
190-
{
191-
var testFlow = Flow.Create<int>()
192-
.ConcatMaterialized(Source.From(Enumerable.Range(6, 5)), Keep.Both)
193-
.Grouped(1000);
202+
await this.AssertAllStagesStoppedAsync(() => {
203+
var testFlow = Flow.Create<int>()
204+
.ConcatMaterialized(Source.From(Enumerable.Range(6, 5)), Keep.Both)
205+
.Grouped(1000);
194206
var task = Source.From(Enumerable.Range(1, 5))
195207
.ViaMaterialized(testFlow, Keep.Both)
196208
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
@@ -204,65 +216,52 @@ public void A_Concat_for_Flow_must_work_with_Flow_DSL()
204216
runnable.Invoking(r => r.Run(Materializer)).Should().NotThrow();
205217

206218
runnable.MapMaterializedValue(_ => "boo").Run(Materializer).Should().Be("boo");
219+
return Task.CompletedTask;
207220
}, Materializer);
208221
}
209222

210223
[Fact(Skip = "ConcatMaterialized type conflict")]
211-
public void A_Concat_for_Flow_must_work_with_Flow_DSL2()
224+
public async Task A_Concat_for_Flow_must_work_with_Flow_DSL2()
212225
{
213-
this.AssertAllStagesStopped(() =>
214-
{
215-
var testFlow = Flow.Create<int>()
216-
.ConcatMaterialized(Source.From(Enumerable.Range(6, 5)), Keep.Both)
217-
.Grouped(1000);
226+
await this.AssertAllStagesStoppedAsync(() => {
227+
var testFlow = Flow.Create<int>()
228+
.ConcatMaterialized(Source.From(Enumerable.Range(6, 5)), Keep.Both)
229+
.Grouped(1000);
218230
var task = Source.From(Enumerable.Range(1, 5))
219231
.ViaMaterialized(testFlow, Keep.Both)
220232
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
221233
task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
222234
task.Result.Should().BeEquivalentTo(Enumerable.Range(1, 10));
223-
224-
//var sink = testFlow.ConcatMaterialized(Source.From(Enumerable.Range(1, 5)), Keep.Both)
225-
// .To(Sink.Ignore<IEnumerable<int>>())
226-
// .MapMaterializedValue(
227-
// x =>
228-
// {
229-
// x.Item1.Item1.Should().BeOfType<NotUsed>();
230-
// x.Item1.Item2.Should().BeOfType<NotUsed>();
231-
// x.Item2.Should().BeOfType<NotUsed>();
232-
// return "boo";
233-
// });
234-
235-
//Source.From(Enumerable.Range(10, 6)).RunWith(sink, Materializer).Should().Be("boo");
235+
return Task.CompletedTask;
236236
}, Materializer);
237237
}
238238

239239
[Fact]
240-
public void A_Concat_for_Flow_must_subscribe_at_one_to_initial_source_and_to_one_that_it_is_concat_to()
240+
public async Task A_Concat_for_Flow_must_subscribe_at_one_to_initial_source_and_to_one_that_it_is_concat_to()
241241
{
242-
this.AssertAllStagesStopped(() =>
243-
{
242+
await this.AssertAllStagesStoppedAsync(async() => {
244243
var publisher1 = this.CreatePublisherProbe<int>();
245244
var publisher2 = this.CreatePublisherProbe<int>();
246245
var probeSink =
247246
Source.FromPublisher(publisher1)
248247
.Concat(Source.FromPublisher(publisher2))
249248
.RunWith(this.SinkProbe<int>(), Materializer);
250249

251-
var sub1 = publisher1.ExpectSubscription();
252-
var sub2 = publisher2.ExpectSubscription();
253-
var subSink = probeSink.ExpectSubscription();
250+
var sub1 = await publisher1.ExpectSubscriptionAsync();
251+
var sub2 = await publisher2.ExpectSubscriptionAsync();
252+
var subSink = await probeSink.ExpectSubscriptionAsync();
254253

255254
sub1.SendNext(1);
256255
subSink.Request(1);
257-
probeSink.ExpectNext(1);
256+
await probeSink.ExpectNextAsync(1);
258257
sub1.SendComplete();
259258

260259
sub2.SendNext(2);
261260
subSink.Request(1);
262-
probeSink.ExpectNext(2);
261+
await probeSink.ExpectNextAsync(2);
263262
sub2.SendComplete();
264263

265-
probeSink.ExpectComplete();
264+
await probeSink.ExpectCompleteAsync();
266265
}, Materializer);
267266
}
268267
}

0 commit comments

Comments
 (0)