Skip to content

Commit ff52eff

Browse files
eabaAaronontheweb
andauthored
[26-74] FlowSelectAsyncSpec (#6573)
* [26-74] `FlowSelectAsyncSpec` * Changes to `async` TestKit * Update FlowSelectAsyncSpec.cs --------- Co-authored-by: Aaron Stannard <[email protected]>
1 parent 3c02d9e commit ff52eff

File tree

1 file changed

+62
-61
lines changed

1 file changed

+62
-61
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs

+62-61
Original file line numberDiff line numberDiff line change
@@ -42,29 +42,28 @@ public FlowSelectAsyncSpec(ITestOutputHelper helper) : base(helper)
4242
}
4343

4444
[Fact]
45-
public void A_Flow_with_SelectAsync_must_produce_task_elements()
45+
public async Task A_Flow_with_SelectAsync_must_produce_task_elements()
4646
{
47-
this.AssertAllStagesStopped(() =>
48-
{
47+
await this.AssertAllStagesStoppedAsync(async() => {
4948
var c = this.CreateManualSubscriberProbe<int>();
5049
Source.From(Enumerable.Range(1, 3))
5150
.SelectAsync(4, Task.FromResult)
5251
.RunWith(Sink.FromSubscriber(c), Materializer);
53-
var sub = c.ExpectSubscription();
52+
var sub = await c.ExpectSubscriptionAsync();
5453

5554
sub.Request(2);
56-
c.ExpectNext(1)
55+
await c.ExpectNext(1)
5756
.ExpectNext(2)
58-
.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
57+
.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
5958
sub.Request(2);
6059

61-
c.ExpectNext(3)
62-
.ExpectComplete();
60+
await c.ExpectNext(3)
61+
.ExpectCompleteAsync();
6362
}, Materializer);
6463
}
6564

6665
[Fact]
67-
public void A_Flow_with_SelectAsync_must_produce_task_elements_in_order()
66+
public async void A_Flow_with_SelectAsync_must_produce_task_elements_in_order()
6867
{
6968
var c = this.CreateManualSubscriberProbe<int>();
7069
Source.From(Enumerable.Range(1, 50))
@@ -80,14 +79,16 @@ public void A_Flow_with_SelectAsync_must_produce_task_elements_in_order()
8079
});
8180
})
8281
.RunWith(Sink.FromSubscriber(c), Materializer);
83-
var sub = c.ExpectSubscription();
82+
var sub = await c.ExpectSubscriptionAsync();
8483
sub.Request(1000);
85-
Enumerable.Range(1, 50).ForEach(n => c.ExpectNext(n));
86-
c.ExpectComplete();
84+
foreach (var n in Enumerable.Range(1, 50))
85+
await c.ExpectNextAsync(n);
86+
//Enumerable.Range(1, 50).ForEach(n => c.ExpectNext(n));
87+
await c.ExpectCompleteAsync();
8788
}
8889

8990
[LocalFact(SkipLocal = "Racy on Azure DevOps")]
90-
public void A_Flow_with_SelectAsync_must_not_run_more_futures_than_requested_parallelism()
91+
public async Task A_Flow_with_SelectAsync_must_not_run_more_futures_than_requested_parallelism()
9192
{
9293
var probe = CreateTestProbe();
9394
var c = this.CreateManualSubscriberProbe<int>();
@@ -98,27 +99,28 @@ public void A_Flow_with_SelectAsync_must_not_run_more_futures_than_requested_par
9899
return n;
99100
}))
100101
.RunWith(Sink.FromSubscriber(c), Materializer);
101-
var sub = c.ExpectSubscription();
102-
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
102+
var sub = await c.ExpectSubscriptionAsync();
103+
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
103104
sub.Request(1);
104105
probe.ReceiveN(9).Should().BeEquivalentTo(Enumerable.Range(1, 9));
105-
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
106+
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
106107
sub.Request(2);
107108
probe.ReceiveN(2).Should().BeEquivalentTo(Enumerable.Range(10, 2));
108-
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
109+
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
109110
sub.Request(10);
110111
probe.ReceiveN(9).Should().BeEquivalentTo(Enumerable.Range(12, 9));
111-
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
112+
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
112113

113-
Enumerable.Range(1, 13).ForEach(n => c.ExpectNext(n));
114-
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
114+
foreach (var n in Enumerable.Range(1, 13))
115+
await c.ExpectNextAsync(n);
116+
//Enumerable.Range(1, 13).ForEach(n => c.ExpectNext(n));
117+
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
115118
}
116119

117120
[LocalFact(SkipLocal = "Racy on Azure DevOps")]
118-
public void A_Flow_with_SelectAsync_must_signal_task_failure()
121+
public async Task A_Flow_with_SelectAsync_must_signal_task_failure()
119122
{
120-
this.AssertAllStagesStopped(() =>
121-
{
123+
await this.AssertAllStagesStoppedAsync(async() => {
122124
var latch = new TestLatch(1);
123125
var c = this.CreateManualSubscriberProbe<int>();
124126
Source.From(Enumerable.Range(1, 5))
@@ -131,18 +133,17 @@ public void A_Flow_with_SelectAsync_must_signal_task_failure()
131133
return n;
132134
}))
133135
.To(Sink.FromSubscriber(c)).Run(Materializer);
134-
var sub = c.ExpectSubscription();
136+
var sub = await c.ExpectSubscriptionAsync();
135137
sub.Request(10);
136138
c.ExpectError().InnerException.Message.Should().Be("err1");
137139
latch.CountDown();
138140
}, Materializer);
139141
}
140142

141143
[Fact]
142-
public void A_Flow_with_SelectAsync_must_signal_task_failure_asap()
144+
public async Task A_Flow_with_SelectAsync_must_signal_task_failure_asap()
143145
{
144-
this.AssertAllStagesStopped(() =>
145-
{
146+
await this.AssertAllStagesStoppedAsync(() => {
146147
var latch = CreateTestLatch();
147148
var done = Source.From(Enumerable.Range(1, 5))
148149
.Select(n =>
@@ -155,7 +156,7 @@ public void A_Flow_with_SelectAsync_must_signal_task_failure_asap()
155156
})
156157
.SelectAsync(4, n =>
157158
{
158-
if (n == 1)
159+
if (n == 1)
159160
{
160161
var c = new TaskCompletionSource<int>();
161162
c.SetException(new Exception("err1"));
@@ -166,14 +167,14 @@ public void A_Flow_with_SelectAsync_must_signal_task_failure_asap()
166167

167168
done.Invoking(d => d.Wait(RemainingOrDefault)).Should().Throw<Exception>().WithMessage("err1");
168169
latch.CountDown();
170+
return Task.CompletedTask;
169171
}, Materializer);
170172
}
171173

172174
[Fact]
173-
public void A_Flow_with_SelectAsync_must_signal_error_from_SelectAsync()
175+
public async Task A_Flow_with_SelectAsync_must_signal_error_from_SelectAsync()
174176
{
175-
this.AssertAllStagesStopped(() =>
176-
{
177+
await this.AssertAllStagesStoppedAsync(async() => {
177178
var latch = new TestLatch(1);
178179
var c = this.CreateManualSubscriberProbe<int>();
179180
Source.From(Enumerable.Range(1, 5))
@@ -189,20 +190,19 @@ public void A_Flow_with_SelectAsync_must_signal_error_from_SelectAsync()
189190
});
190191
})
191192
.RunWith(Sink.FromSubscriber(c), Materializer);
192-
var sub = c.ExpectSubscription();
193+
var sub = await c.ExpectSubscriptionAsync();
193194
sub.Request(10);
194195
c.ExpectError().Message.Should().Be("err2");
195196
latch.CountDown();
196197
}, Materializer);
197198
}
198199

199200
[Fact]
200-
public void A_Flow_with_SelectAsync_must_resume_after_task_failure()
201+
public async Task A_Flow_with_SelectAsync_must_resume_after_task_failure()
201202
{
202-
this.AssertAllStagesStopped(() =>
203+
await this.AssertAllStagesStoppedAsync(async() =>
203204
{
204-
this.AssertAllStagesStopped(() =>
205-
{
205+
await this.AssertAllStagesStoppedAsync(async () => {
206206
var c = this.CreateManualSubscriberProbe<int>();
207207
Source.From(Enumerable.Range(1, 5))
208208
.SelectAsync(4, n => Task.Run(() =>
@@ -213,21 +213,21 @@ public void A_Flow_with_SelectAsync_must_resume_after_task_failure()
213213
}))
214214
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
215215
.RunWith(Sink.FromSubscriber(c), Materializer);
216-
var sub = c.ExpectSubscription();
216+
var sub = await c.ExpectSubscriptionAsync();
217217
sub.Request(10);
218-
new[] {1, 2, 4, 5}.ForEach(i => c.ExpectNext(i));
219-
c.ExpectComplete();
218+
foreach (var i in new[] { 1, 2, 4, 5 })
219+
await c.ExpectNextAsync(i);
220+
await c.ExpectCompleteAsync();
220221
}, Materializer);
221222
}, Materializer);
222223
}
223224

224225
[Fact]
225-
public void A_Flow_with_SelectAsync_must_resume_after_multiple_failures()
226+
public async Task A_Flow_with_SelectAsync_must_resume_after_multiple_failures()
226227
{
227-
this.AssertAllStagesStopped(() =>
228-
{
228+
await this.AssertAllStagesStoppedAsync(() => {
229229
var futures = new[]
230-
{
230+
{
231231
Task.Run(() => { throw new TestException("failure1"); return "";}),
232232
Task.Run(() => { throw new TestException("failure2"); return "";}),
233233
Task.Run(() => { throw new TestException("failure3"); return "";}),
@@ -243,6 +243,7 @@ public void A_Flow_with_SelectAsync_must_resume_after_multiple_failures()
243243

244244
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
245245
t.Result.Should().Be("happy");
246+
return Task.CompletedTask;
246247
}, Materializer);
247248
}
248249

@@ -268,7 +269,7 @@ await this.AssertAllStagesStoppedAsync(async() =>
268269
}
269270

270271
[Fact]
271-
public void A_Flow_with_SelectAsync_must_resume_when_SelectAsync_throws()
272+
public async Task A_Flow_with_SelectAsync_must_resume_when_SelectAsync_throws()
272273
{
273274
var c = this.CreateManualSubscriberProbe<int>();
274275
Source.From(Enumerable.Range(1, 5))
@@ -280,59 +281,59 @@ public void A_Flow_with_SelectAsync_must_resume_when_SelectAsync_throws()
280281
})
281282
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
282283
.RunWith(Sink.FromSubscriber(c), Materializer);
283-
var sub = c.ExpectSubscription();
284+
var sub = await c.ExpectSubscriptionAsync();
284285
sub.Request(10);
285-
new[] {1, 2, 4, 5}.ForEach(i => c.ExpectNext(i));
286-
c.ExpectComplete();
286+
foreach (var i in new[] { 1, 2, 4, 5 })
287+
await c.ExpectNextAsync(i);
288+
await c.ExpectCompleteAsync();
287289
}
288290

289291
[Fact]
290-
public void A_Flow_with_SelectAsync_must_signal_NPE_when_task_is_completed_with_null()
292+
public async Task A_Flow_with_SelectAsync_must_signal_NPE_when_task_is_completed_with_null()
291293
{
292294
var c = this.CreateManualSubscriberProbe<string>();
293295

294296
Source.From(new[] {"a", "b"})
295297
.SelectAsync(4, _ => Task.FromResult(null as string))
296298
.To(Sink.FromSubscriber(c)).Run(Materializer);
297299

298-
var sub = c.ExpectSubscription();
300+
var sub = await c.ExpectSubscriptionAsync();
299301
sub.Request(10);
300302
c.ExpectError().Message.Should().StartWith(ReactiveStreamsCompliance.ElementMustNotBeNullMsg);
301303
}
302304

303305
[Fact]
304-
public void A_Flow_with_SelectAsync_must_resume_when_task_is_completed_with_null()
306+
public async Task A_Flow_with_SelectAsync_must_resume_when_task_is_completed_with_null()
305307
{
306308
var c = this.CreateManualSubscriberProbe<string>();
307309
Source.From(new[] { "a", "b", "c" })
308310
.SelectAsync(4, s => s.Equals("b") ? Task.FromResult(null as string) : Task.FromResult(s))
309311
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
310312
.To(Sink.FromSubscriber(c)).Run(Materializer);
311-
var sub = c.ExpectSubscription();
313+
var sub = await c.ExpectSubscriptionAsync();
312314
sub.Request(10);
313-
c.ExpectNext("a");
314-
c.ExpectNext("c");
315-
c.ExpectComplete();
315+
await c.ExpectNextAsync("a");
316+
await c.ExpectNextAsync("c");
317+
await c.ExpectCompleteAsync();
316318
}
317319

318320
[Fact]
319-
public void A_Flow_with_SelectAsync_must_handle_cancel_properly()
321+
public async Task A_Flow_with_SelectAsync_must_handle_cancel_properly()
320322
{
321-
this.AssertAllStagesStopped(() =>
322-
{
323+
await this.AssertAllStagesStoppedAsync(async() => {
323324
var pub = this.CreateManualPublisherProbe<int>();
324325
var sub = this.CreateManualSubscriberProbe<int>();
325326

326327
Source.FromPublisher(pub)
327328
.SelectAsync(4, _ => Task.FromResult(0))
328329
.RunWith(Sink.FromSubscriber(sub), Materializer);
329330

330-
var upstream = pub.ExpectSubscription();
331-
upstream.ExpectRequest();
331+
var upstream = await pub.ExpectSubscriptionAsync();
332+
await upstream.ExpectRequestAsync();
332333

333-
sub.ExpectSubscription().Cancel();
334+
(await sub.ExpectSubscriptionAsync()).Cancel();
334335

335-
upstream.ExpectCancellation();
336+
await upstream.ExpectCancellationAsync();
336337
}, Materializer);
337338
}
338339

0 commit comments

Comments
 (0)