Skip to content

Commit 3c02d9e

Browse files
authored
[27-74] FlowSelectAsyncUnorderedSpec (#6574)
* [27-74] `FlowSelectAsyncUnorderedSpec` * Changes to `async` TestKit
1 parent 2dd7d26 commit 3c02d9e

File tree

1 file changed

+79
-80
lines changed

1 file changed

+79
-80
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncUnorderedSpec.cs

+79-80
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,9 @@ public FlowSelectAsyncUnorderedSpec(ITestOutputHelper helper) : base(helper)
4141
}
4242

4343
[WindowsFact(Skip ="Racy in Linux")]
44-
public void A_Flow_with_SelectAsyncUnordered_must_produce_task_elements_in_the_order_they_are_ready()
44+
public async Task A_Flow_with_SelectAsyncUnordered_must_produce_task_elements_in_the_order_they_are_ready()
4545
{
46-
this.AssertAllStagesStopped(() =>
47-
{
46+
await this.AssertAllStagesStoppedAsync(async() => {
4847
var c = this.CreateManualSubscriberProbe<int>();
4948
var latch = Enumerable.Range(0, 4).Select(_ => new TestLatch(1)).ToArray();
5049

@@ -53,28 +52,28 @@ public void A_Flow_with_SelectAsyncUnordered_must_produce_task_elements_in_the_o
5352
latch[n].Ready(TimeSpan.FromSeconds(5));
5453
return n;
5554
})).To(Sink.FromSubscriber(c)).Run(Materializer);
56-
var sub = c.ExpectSubscription();
55+
var sub = await c.ExpectSubscriptionAsync();
5756
sub.Request(5);
5857

5958
latch[1].CountDown();
60-
c.ExpectNext(1);
59+
await c.ExpectNextAsync(1);
6160

6261
latch[3].CountDown();
63-
c.ExpectNext(3);
62+
await c.ExpectNextAsync(3);
6463

6564
latch[2].CountDown();
66-
c.ExpectNext(2);
65+
await c.ExpectNextAsync(2);
6766

6867
latch[0].CountDown();
69-
c.ExpectNext(0);
68+
await c.ExpectNextAsync(0);
7069

71-
c.ExpectComplete();
70+
await c.ExpectCompleteAsync();
7271
}, Materializer);
7372

7473
}
7574

7675
[LocalFact(SkipLocal = "Racy on Azure DevOps")]
77-
public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_requested_elements()
76+
public async Task A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_requested_elements()
7877
{
7978
var probe = CreateTestProbe();
8079
var c = this.CreateManualSubscriberProbe<int>();
@@ -94,29 +93,32 @@ public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_requ
9493
});
9594
})
9695
.To(Sink.FromSubscriber(c)).Run(Materializer);
97-
var sub = c.ExpectSubscription();
98-
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
99-
probe.ExpectNoMsg(TimeSpan.Zero);
96+
var sub = await c.ExpectSubscriptionAsync();
97+
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
98+
await probe.ExpectNoMsgAsync(TimeSpan.Zero);
10099
sub.Request(1);
101100
var got = new List<int> {c.ExpectNext()};
102101
probe.ExpectMsgAllOf(new []{ 1, 2, 3, 4, 5 });
103-
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
102+
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
104103
sub.Request(25);
105104
probe.ExpectMsgAllOf(Enumerable.Range(6, 15).ToArray());
106-
c.Within(TimeSpan.FromSeconds(3), () =>
105+
await c.WithinAsync(TimeSpan.FromSeconds(3), async () =>
107106
{
108-
Enumerable.Range(2, 19).ForEach(_ => got.Add(c.ExpectNext()));
107+
foreach(var i in Enumerable.Range(2, 19))
108+
{
109+
got.Add(await c.ExpectNextAsync());
110+
}
111+
//Enumerable.Range(2, 19).ForEach(_ => got.Add(c.ExpectNext()));
109112
return NotUsed.Instance;
110113
});
111114
got.Should().BeEquivalentTo(Enumerable.Range(1, 20));
112-
c.ExpectComplete();
115+
await c.ExpectCompleteAsync();
113116
}
114117

115118
[LocalFact(SkipLocal = "Racy on Azure DevOps")]
116-
public void A_Flow_with_SelectAsyncUnordered_must_signal_task_failure()
119+
public async Task A_Flow_with_SelectAsyncUnordered_must_signal_task_failure()
117120
{
118-
this.AssertAllStagesStopped(() =>
119-
{
121+
await this.AssertAllStagesStoppedAsync(async() => {
120122
var latch = new TestLatch(1);
121123
var c = this.CreateManualSubscriberProbe<int>();
122124
Source.From(Enumerable.Range(1, 5))
@@ -129,7 +131,7 @@ public void A_Flow_with_SelectAsyncUnordered_must_signal_task_failure()
129131
return n;
130132
}))
131133
.To(Sink.FromSubscriber(c)).Run(Materializer);
132-
var sub = c.ExpectSubscription();
134+
var sub = await c.ExpectSubscriptionAsync();
133135
sub.Request(10);
134136
c.ExpectError().InnerException.Message.Should().Be("err1");
135137
latch.CountDown();
@@ -138,10 +140,9 @@ public void A_Flow_with_SelectAsyncUnordered_must_signal_task_failure()
138140

139141

140142
[Fact]
141-
public void A_Flow_with_SelectAsyncUnordered_must_signal_task_failure_asap()
143+
public async Task A_Flow_with_SelectAsyncUnordered_must_signal_task_failure_asap()
142144
{
143-
this.AssertAllStagesStopped(() =>
144-
{
145+
await this.AssertAllStagesStoppedAsync(() => {
145146
var latch = CreateTestLatch();
146147
var done = Source.From(Enumerable.Range(1, 5))
147148
.Select(n =>
@@ -166,14 +167,14 @@ public void A_Flow_with_SelectAsyncUnordered_must_signal_task_failure_asap()
166167

167168
done.Invoking(d => d.Wait(RemainingOrDefault)).Should().Throw<Exception>().WithMessage("err1");
168169
latch.CountDown();
170+
return Task.CompletedTask;
169171
}, Materializer);
170172
}
171173

172174
[Fact]
173-
public void A_Flow_with_SelectAsyncUnordered_must_signal_error_from_SelectAsyncUnordered()
175+
public async Task A_Flow_with_SelectAsyncUnordered_must_signal_error_from_SelectAsyncUnordered()
174176
{
175-
this.AssertAllStagesStopped(() =>
176-
{
177+
await this.AssertAllStagesStoppedAsync(async() => {
177178
var latch = new TestLatch(1);
178179
var c = this.CreateManualSubscriberProbe<int>();
179180
Source.From(Enumerable.Range(1, 5))
@@ -189,40 +190,39 @@ public void A_Flow_with_SelectAsyncUnordered_must_signal_error_from_SelectAsyncU
189190
});
190191
})
191192
.RunWith(Sink.FromSubscriber(c), Materializer);
192-
var sub = c.ExpectSubscription();
193+
var sub = await c.ExpectSubscriptionAsync();
193194
sub.Request(10);
194195
c.ExpectError().Message.Should().Be("err2");
195196
latch.CountDown();
196197
}, Materializer);
197198
}
198199

199200
[Fact]
200-
public void A_Flow_with_SelectAsyncUnordered_must_resume_after_task_failure()
201+
public async Task A_Flow_with_SelectAsyncUnordered_must_resume_after_task_failure()
201202
{
202-
this.AssertAllStagesStopped(() =>
203+
await this.AssertAllStagesStoppedAsync(async() =>
203204
{
204-
this.AssertAllStagesStopped(() =>
205-
{
206-
Source.From(Enumerable.Range(1, 5))
207-
.SelectAsyncUnordered(4, n => Task.Run(() =>
208-
{
209-
if (n == 3)
210-
throw new TestException("err3");
211-
return n;
212-
}))
213-
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
214-
.RunWith(this.SinkProbe<int>(), Materializer)
215-
.Request(10)
216-
.ExpectNextUnordered(1, 2, 4, 5)
217-
.ExpectComplete();
205+
await this.AssertAllStagesStoppedAsync(async() => {
206+
await Source.From(Enumerable.Range(1, 5))
207+
.SelectAsyncUnordered(4, n => Task.Run(() =>
208+
{
209+
if (n == 3)
210+
throw new TestException("err3");
211+
return n;
212+
}))
213+
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
214+
.RunWith(this.SinkProbe<int>(), Materializer)
215+
.Request(10)
216+
.ExpectNextUnordered(1, 2, 4, 5)
217+
.ExpectCompleteAsync();
218218
}, Materializer);
219219
}, Materializer);
220220
}
221221

222222
[Fact]
223-
public void A_Flow_with_SelectAsyncUnordered_must_resume_after_multiple_failures()
223+
public async Task A_Flow_with_SelectAsyncUnordered_must_resume_after_multiple_failures()
224224
{
225-
this.AssertAllStagesStopped(async() =>
225+
await this.AssertAllStagesStoppedAsync(async() =>
226226
{
227227
var futures = new[]
228228
{
@@ -245,30 +245,29 @@ public void A_Flow_with_SelectAsyncUnordered_must_resume_after_multiple_failures
245245
}
246246

247247
[Fact]
248-
public void A_Flow_with_SelectAsyncUnordered_must_finish_after_task_failure()
248+
public async Task A_Flow_with_SelectAsyncUnordered_must_finish_after_task_failure()
249249
{
250-
this.AssertAllStagesStopped(() =>
251-
{
252-
var t = Source.From(Enumerable.Range(1, 3))
253-
.SelectAsyncUnordered(1, n => Task.Run(() =>
254-
{
255-
if (n == 3)
256-
throw new TestException("err3b");
257-
return n;
258-
}))
259-
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
260-
.Grouped(10)
261-
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
262-
250+
await this.AssertAllStagesStoppedAsync(() => {
251+
var t = Source.From(Enumerable.Range(1, 3))
252+
.SelectAsyncUnordered(1, n => Task.Run(() =>
253+
{
254+
if (n == 3)
255+
throw new TestException("err3b");
256+
return n;
257+
}))
258+
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
259+
.Grouped(10)
260+
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
263261
t.Wait(TimeSpan.FromSeconds(1)).Should().BeTrue();
264-
t.Result.Should().BeEquivalentTo(new[] {1, 2});
262+
t.Result.Should().BeEquivalentTo(new[] { 1, 2 });
263+
return Task.CompletedTask;
265264
}, Materializer);
266265
}
267266

268267
[Fact]
269-
public void A_Flow_with_SelectAsyncUnordered_must_resume_when_SelectAsyncUnordered_throws()
268+
public async Task A_Flow_with_SelectAsyncUnordered_must_resume_when_SelectAsyncUnordered_throws()
270269
{
271-
Source.From(Enumerable.Range(1, 5))
270+
await Source.From(Enumerable.Range(1, 5))
272271
.SelectAsyncUnordered(4, n =>
273272
{
274273
if (n == 3)
@@ -279,63 +278,61 @@ public void A_Flow_with_SelectAsyncUnordered_must_resume_when_SelectAsyncUnorder
279278
.RunWith(this.SinkProbe<int>(), Materializer)
280279
.Request(10)
281280
.ExpectNextUnordered(1, 2, 4, 5)
282-
.ExpectComplete();
281+
.ExpectCompleteAsync();
283282
}
284283

285284
[Fact]
286-
public void A_Flow_with_SelectAsyncUnordered_must_signal_NPE_when_task_is_completed_with_null()
285+
public async Task A_Flow_with_SelectAsyncUnordered_must_signal_NPE_when_task_is_completed_with_null()
287286
{
288287
var c = this.CreateManualSubscriberProbe<string>();
289288

290289
Source.From(new[] {"a", "b"})
291290
.SelectAsyncUnordered(4, _ => Task.FromResult(null as string))
292291
.To(Sink.FromSubscriber(c)).Run(Materializer);
293292

294-
var sub = c.ExpectSubscription();
293+
var sub = await c.ExpectSubscriptionAsync();
295294
sub.Request(10);
296295
c.ExpectError().Message.Should().StartWith(ReactiveStreamsCompliance.ElementMustNotBeNullMsg);
297296
}
298297

299298
[Fact]
300-
public void A_Flow_with_SelectAsyncUnordered_must_resume_when_task_is_completed_with_null()
299+
public async Task A_Flow_with_SelectAsyncUnordered_must_resume_when_task_is_completed_with_null()
301300
{
302301
var c = this.CreateManualSubscriberProbe<string>();
303302
Source.From(new[] { "a", "b", "c" })
304303
.SelectAsyncUnordered(4, s => s.Equals("b") ? Task.FromResult(null as string) : Task.FromResult(s))
305304
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
306305
.To(Sink.FromSubscriber(c)).Run(Materializer);
307-
var sub = c.ExpectSubscription();
306+
var sub = await c.ExpectSubscriptionAsync();
308307
sub.Request(10);
309308
c.ExpectNextUnordered("a", "c");
310-
c.ExpectComplete();
309+
await c.ExpectCompleteAsync();
311310
}
312311

313312
[Fact]
314-
public void A_Flow_with_SelectAsyncUnordered_must_handle_cancel_properly()
313+
public async Task A_Flow_with_SelectAsyncUnordered_must_handle_cancel_properly()
315314
{
316-
this.AssertAllStagesStopped(() =>
317-
{
315+
await this.AssertAllStagesStoppedAsync(async() => {
318316
var pub = this.CreateManualPublisherProbe<int>();
319317
var sub = this.CreateManualSubscriberProbe<int>();
320318

321319
Source.FromPublisher(pub)
322320
.SelectAsyncUnordered(4, _ => Task.FromResult(0))
323321
.RunWith(Sink.FromSubscriber(sub), Materializer);
324322

325-
var upstream = pub.ExpectSubscription();
326-
upstream.ExpectRequest();
323+
var upstream = await pub.ExpectSubscriptionAsync();
324+
await upstream.ExpectRequestAsync();
327325

328-
sub.ExpectSubscription().Cancel();
326+
(await sub.ExpectSubscriptionAsync()).Cancel();
329327

330-
upstream.ExpectCancellation();
328+
await upstream.ExpectCancellationAsync();
331329
}, Materializer);
332330
}
333331

334332
[LocalFact(SkipLocal = "Racy on Azure DevOps")]
335-
public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_configured()
333+
public async Task A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_configured()
336334
{
337-
this.AssertAllStagesStopped(() =>
338-
{
335+
await this.AssertAllStagesStoppedAsync(() => {
339336
const int parallelism = 8;
340337
var counter = new AtomicCounter();
341338
var queue = new BlockingQueue<(TaskCompletionSource<int>, long)>();
@@ -365,7 +362,7 @@ public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_conf
365362
}
366363
}
367364
}, cancellation.Token);
368-
365+
369366
Func<Task<int>> deferred = () =>
370367
{
371368
var promise = new TaskCompletionSource<int>();
@@ -390,6 +387,8 @@ public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_conf
390387
{
391388
cancellation.Cancel(false);
392389
}
390+
391+
return Task.CompletedTask;
393392
}, Materializer);
394393
}
395394
}

0 commit comments

Comments
 (0)