Skip to content

Commit 6784296

Browse files
eabaAaronontheweb
andauthored
[4-74]FlowAggregateAsyncSpec: Change AssertAllStagesStopped to AssertAllStagesStoppedAsync (#6543)
Co-authored-by: Aaron Stannard <[email protected]>
1 parent 996cac7 commit 6784296

File tree

1 file changed

+33
-33
lines changed

1 file changed

+33
-33
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowAggregateAsyncSpec.cs

+33-33
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ await this.AssertAllStagesStoppedAsync(async() =>
6767
}
6868

6969
[Fact]
70-
public void A_AggregateAsync_must_work_when_using_Sink_AggregateAsync()
70+
public async Task A_AggregateAsync_must_work_when_using_Sink_AggregateAsync()
7171
{
72-
this.AssertAllStagesStopped(async() =>
72+
await this.AssertAllStagesStoppedAsync(async() =>
7373
{
7474
var task = InputSource.RunWith(AggregateSink, Materializer);
7575
var complete = await task.ShouldCompleteWithin(3.Seconds());
@@ -78,10 +78,10 @@ public void A_AggregateAsync_must_work_when_using_Sink_AggregateAsync()
7878
}
7979

8080
[LocalFact(SkipLocal = "Racy on Azure DevOps")]
81-
public void A_AggregateAsync_must_work_when_using_Flow_AggregateAsync()
81+
public async Task A_AggregateAsync_must_work_when_using_Flow_AggregateAsync()
8282
{
8383
var flowTimeout = TimeSpan.FromMilliseconds(FlowDelayInMs*Input.Count()) + TimeSpan.FromSeconds(3);
84-
this.AssertAllStagesStopped(async() =>
84+
await this.AssertAllStagesStoppedAsync(async() =>
8585
{
8686
var task = InputSource.Via(AggregateFlow).RunWith(Sink.First<int>(), Materializer);
8787
var complete = await task.ShouldCompleteWithin(flowTimeout);
@@ -90,9 +90,9 @@ public void A_AggregateAsync_must_work_when_using_Flow_AggregateAsync()
9090
}
9191

9292
[Fact]
93-
public void A_AggregateAsync_must_work_when_using_Source_AggregateAsync_and_Flow_AggregateAsync_and_Sink_AggregateAsync()
93+
public async Task A_AggregateAsync_must_work_when_using_Source_AggregateAsync_and_Flow_AggregateAsync_and_Sink_AggregateAsync()
9494
{
95-
this.AssertAllStagesStopped(async() =>
95+
await this.AssertAllStagesStoppedAsync(async() =>
9696
{
9797
var task = AggregateSource.Via(AggregateFlow).RunWith(AggregateSink, Materializer);
9898
var complete = await task.ShouldCompleteWithin(3.Seconds());
@@ -101,10 +101,9 @@ public void A_AggregateAsync_must_work_when_using_Source_AggregateAsync_and_Flow
101101
}
102102

103103
[Fact]
104-
public void A_AggregateAsync_must_propagate_an_error()
104+
public async Task A_AggregateAsync_must_propagate_an_error()
105105
{
106-
this.AssertAllStagesStopped(() =>
107-
{
106+
await this.AssertAllStagesStoppedAsync(() => {
108107
var error = new TestException("buh");
109108
var future = InputSource.Select(x =>
110109
{
@@ -116,14 +115,14 @@ public void A_AggregateAsync_must_propagate_an_error()
116115
future.Invoking(f => f.Wait(TimeSpan.FromSeconds(3)))
117116
.Should().Throw<TestException>()
118117
.And.Should().Be(error);
118+
return Task.CompletedTask;
119119
}, Materializer);
120120
}
121121

122122
[Fact]
123-
public void A_AggregateAsync_must_complete_task_with_failure_when_Aggregating_functions_throws()
123+
public async Task A_AggregateAsync_must_complete_task_with_failure_when_Aggregating_functions_throws()
124124
{
125-
this.AssertAllStagesStopped(() =>
126-
{
125+
await this.AssertAllStagesStoppedAsync(() => {
127126
var error = new TestException("buh");
128127
var future = InputSource.RunAggregateAsync(0, (x, y) =>
129128
{
@@ -139,6 +138,7 @@ public void A_AggregateAsync_must_complete_task_with_failure_when_Aggregating_fu
139138
future.Invoking(f => f.Wait(TimeSpan.FromSeconds(3)))
140139
.Should().Throw<TestException>()
141140
.And.Should().Be(error);
141+
return Task.CompletedTask;
142142
}, Materializer);
143143
}
144144

@@ -159,10 +159,9 @@ public void A_AggregateAsync_must_not_blow_up_with_high_request_count()
159159
}
160160

161161
[Fact]
162-
public void A_AggregateAsync_must_signal_task_failure()
162+
public async Task A_AggregateAsync_must_signal_task_failure()
163163
{
164-
this.AssertAllStagesStopped(() =>
165-
{
164+
await this.AssertAllStagesStoppedAsync(() => {
166165
var probe = this.CreateSubscriberProbe<int>();
167166
Source.From(Enumerable.Range(1, 5)).AggregateAsync(0, (_, n) => Task.Run(() =>
168167
{
@@ -174,14 +173,14 @@ public void A_AggregateAsync_must_signal_task_failure()
174173
var subscription = probe.ExpectSubscription();
175174
subscription.Request(100);
176175
probe.ExpectError().InnerException.Message.Should().Be("err1");
176+
return Task.CompletedTask;
177177
}, Materializer);
178178
}
179179

180180
[Fact]
181-
public void A_AggregateAsync_must_signal_error_from_AggregateAsync()
181+
public async Task A_AggregateAsync_must_signal_error_from_AggregateAsync()
182182
{
183-
this.AssertAllStagesStopped(() =>
184-
{
183+
await this.AssertAllStagesStoppedAsync(() => {
185184
var c = this.CreateManualSubscriberProbe<int>();
186185

187186
Source.From(Enumerable.Range(1, 5)).AggregateAsync(0, (_, n) =>
@@ -195,14 +194,14 @@ public void A_AggregateAsync_must_signal_error_from_AggregateAsync()
195194
var subscription = c.ExpectSubscription();
196195
subscription.Request(10);
197196
c.ExpectError().Message.Should().Be("err2");
197+
return Task.CompletedTask;
198198
}, Materializer);
199199
}
200200

201201
[Fact]
202-
public void A_AggregateAsync_must_resume_after_task_failure()
202+
public async Task A_AggregateAsync_must_resume_after_task_failure()
203203
{
204-
this.AssertAllStagesStopped(() =>
205-
{
204+
await this.AssertAllStagesStoppedAsync(() => {
206205
var probe = this.CreateSubscriberProbe<(int, int)>();
207206
Source.From(Enumerable.Range(1, 5)).AggregateAsync((0, 1), (t, n) =>
208207
{
@@ -213,7 +212,7 @@ public void A_AggregateAsync_must_resume_after_task_failure()
213212
if (n == 3)
214213
throw new Exception("err3");
215214

216-
return (n, i + res*n);
215+
return (n, i + res * n);
217216
});
218217
})
219218
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
@@ -224,14 +223,14 @@ public void A_AggregateAsync_must_resume_after_task_failure()
224223
subscription.Request(10);
225224
probe.ExpectNext((5, 74));
226225
probe.ExpectComplete();
226+
return Task.CompletedTask;
227227
}, Materializer);
228228
}
229229

230230
[Fact]
231-
public void A_AggregateAsync_must_restart_after_task_failure()
231+
public async Task A_AggregateAsync_must_restart_after_task_failure()
232232
{
233-
this.AssertAllStagesStopped(() =>
234-
{
233+
await this.AssertAllStagesStoppedAsync(() => {
235234
var probe = this.CreateSubscriberProbe<(int, int)>();
236235
Source.From(Enumerable.Range(1, 5)).AggregateAsync((0, 1), (t, n) =>
237236
{
@@ -253,6 +252,7 @@ public void A_AggregateAsync_must_restart_after_task_failure()
253252
subscription.Request(10);
254253
probe.ExpectNext((5, 24));
255254
probe.ExpectComplete();
255+
return Task.CompletedTask;
256256
}, Materializer);
257257
}
258258

@@ -399,10 +399,9 @@ public void A_AggregateAsync_must_restart_when_task_is_completed_with_null()
399399
}
400400

401401
[Fact]
402-
public void A_AggregateAsync_must_handle_cancel_properly()
402+
public async Task A_AggregateAsync_must_handle_cancel_properly()
403403
{
404-
this.AssertAllStagesStopped(() =>
405-
{
404+
await this.AssertAllStagesStoppedAsync(() => {
406405
var pub = this.CreateManualPublisherProbe<int>();
407406
var sub = this.CreateSubscriberProbe<int>();
408407

@@ -414,15 +413,16 @@ public void A_AggregateAsync_must_handle_cancel_properly()
414413
upstream.ExpectRequest();
415414

416415
sub.ExpectSubscription().Cancel();
417-
416+
418417
upstream.ExpectCancellation();
418+
return Task.CompletedTask;
419419
}, Materializer);
420420
}
421421

422422
[Fact]
423-
public void A_AggregateAsync_must_complete_task_and_return_zero_given_an_empty_stream()
423+
public async Task A_AggregateAsync_must_complete_task_and_return_zero_given_an_empty_stream()
424424
{
425-
this.AssertAllStagesStopped(async() =>
425+
await this.AssertAllStagesStoppedAsync(async() =>
426426
{
427427
var task = Source.From(Enumerable.Empty<int>())
428428
.RunAggregateAsync(0, (acc, element) => Task.FromResult(acc + element), Materializer);
@@ -432,9 +432,9 @@ public void A_AggregateAsync_must_complete_task_and_return_zero_given_an_empty_s
432432
}
433433

434434
[Fact]
435-
public void A_AggregateAsync_must_complete_task_and_return_zero_and_item_given_a_stream_of_one_item()
435+
public async Task A_AggregateAsync_must_complete_task_and_return_zero_and_item_given_a_stream_of_one_item()
436436
{
437-
this.AssertAllStagesStopped(async() =>
437+
await this.AssertAllStagesStoppedAsync(async() =>
438438
{
439439
var task = Source.Single(100)
440440
.RunAggregateAsync(5, (acc, element) => Task.FromResult(acc + element), Materializer);

0 commit comments

Comments
 (0)