Skip to content

Commit e18567a

Browse files
authored
[63-74] SinkForeachParallelSpec (#6611)
* [63-74] `SinkForeachParallelSpec` * Changes
1 parent 9135b85 commit e18567a

File tree

1 file changed

+35
-29
lines changed

1 file changed

+35
-29
lines changed

src/core/Akka.Streams.Tests/Dsl/SinkForeachParallelSpec.cs

+35-29
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
using System.Threading;
2020
using Akka.Streams.TestKit;
2121
using Akka.TestKit.Xunit2.Attributes;
22+
using System.Threading.Tasks;
2223

2324
namespace Akka.Streams.Tests.Dsl
2425
{
@@ -33,76 +34,76 @@ public SinkForeachParallelSpec(ITestOutputHelper helper) : base(helper)
3334
}
3435

3536
[LocalFact(SkipLocal = "Racy due to timing on Azure DevOps")]
36-
public void A_ForeachParallel_must_produce_elements_in_the_order_they_are_ready()
37+
public async Task A_ForeachParallel_must_produce_elements_in_the_order_they_are_ready()
3738
{
38-
this.AssertAllStagesStopped(() =>
39-
{
39+
await this.AssertAllStagesStoppedAsync(async() => {
4040
var probe = CreateTestProbe();
4141
var latch = Enumerable.Range(1, 4)
4242
.Select(i => (i, new TestLatch(1)))
4343
.ToDictionary(t => t.Item1, t => t.Item2);
44+
#pragma warning disable CS0618 // Type or member is obsolete
4445
var p = Source.From(Enumerable.Range(1, 4)).RunWith(Sink.ForEachParallel<int>(4, n =>
4546
{
4647
latch[n].Ready(TimeSpan.FromSeconds(5));
4748
probe.Ref.Tell(n);
4849
}), Materializer);
50+
#pragma warning restore CS0618 // Type or member is obsolete
4951
latch[2].CountDown();
50-
probe.ExpectMsg(2, TimeSpan.FromSeconds(5));
52+
await probe.ExpectMsgAsync(2, TimeSpan.FromSeconds(5));
5153
latch[4].CountDown();
52-
probe.ExpectMsg(4, TimeSpan.FromSeconds(5));
54+
await probe.ExpectMsgAsync(4, TimeSpan.FromSeconds(5));
5355
latch[3].CountDown();
54-
probe.ExpectMsg(3, TimeSpan.FromSeconds(5));
56+
await probe.ExpectMsgAsync(3, TimeSpan.FromSeconds(5));
5557

5658
p.IsCompleted.Should().BeFalse();
5759

5860
latch[1].CountDown();
59-
probe.ExpectMsg(1, TimeSpan.FromSeconds(5));
61+
await probe.ExpectMsgAsync(1, TimeSpan.FromSeconds(5));
6062

6163
p.Wait(TimeSpan.FromSeconds(4)).Should().BeTrue();
6264
p.IsCompleted.Should().BeTrue();
63-
6465
}, Materializer);
6566
}
6667

6768
[LocalFact(SkipLocal = "Racy - timing is rather sensitive on Azure DevOps")]
68-
public void A_ForeachParallel_must_not_run_more_functions_in_parallel_then_specified()
69+
public async Task A_ForeachParallel_must_not_run_more_functions_in_parallel_then_specified()
6970
{
70-
this.AssertAllStagesStopped(() =>
71-
{
71+
await this.AssertAllStagesStoppedAsync(async() => {
7272
var probe = CreateTestProbe();
7373
var latch = Enumerable.Range(1, 5)
7474
.Select(i => (i, new TestLatch()))
7575
.ToDictionary(t => t.Item1, t => t.Item2);
76+
#pragma warning disable CS0618 // Type or member is obsolete
7677
var p = Source.From(Enumerable.Range(1, 5)).RunWith(Sink.ForEachParallel<int>(4, n =>
7778
{
7879
probe.Ref.Tell(n);
7980
latch[n].Ready(TimeSpan.FromSeconds(5));
8081
}), Materializer);
82+
#pragma warning restore CS0618 // Type or member is obsolete
8183

82-
probe.ExpectMsgAllOf(new []{ 1, 2, 3, 4 });
83-
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
84+
probe.ExpectMsgAllOf(new[] { 1, 2, 3, 4 });
85+
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
8486

8587
p.IsCompleted.Should().BeFalse();
86-
88+
8789
Enumerable.Range(1, 4).ForEach(i => latch[i].CountDown());
8890

8991
latch[5].CountDown();
90-
probe.ExpectMsg(5);
92+
await probe.ExpectMsgAsync(5);
9193

9294
p.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue();
9395
p.IsCompleted.Should().BeTrue();
94-
9596
}, Materializer);
9697
}
9798

9899
[Fact]
99-
public void A_ForeachParallel_must_resume_after_function_failure()
100+
public async Task A_ForeachParallel_must_resume_after_function_failure()
100101
{
101-
this.AssertAllStagesStopped(() =>
102-
{
102+
await this.AssertAllStagesStoppedAsync(() => {
103103
var probe = CreateTestProbe();
104104
var latch = new TestLatch(1);
105105

106+
#pragma warning disable CS0618 // Type or member is obsolete
106107
var p = Source.From(Enumerable.Range(1, 5)).RunWith(Sink.ForEachParallel<int>(4, n =>
107108
{
108109
if (n == 3)
@@ -111,22 +112,24 @@ public void A_ForeachParallel_must_resume_after_function_failure()
111112
probe.Ref.Tell(n);
112113
latch.Ready(TimeSpan.FromSeconds(10));
113114
}).WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)), Materializer);
115+
#pragma warning restore CS0618 // Type or member is obsolete
114116

115117
latch.CountDown();
116-
probe.ExpectMsgAllOf(new []{ 1, 2, 4, 5 });
118+
probe.ExpectMsgAllOf(new[] { 1, 2, 4, 5 });
117119

118120
p.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue();
121+
return Task.CompletedTask;
119122
}, Materializer);
120123
}
121124

122125
[Fact]
123-
public void A_ForeachParallel_must_finish_after_function_thrown_exception()
126+
public async Task A_ForeachParallel_must_finish_after_function_thrown_exception()
124127
{
125-
this.AssertAllStagesStopped(() =>
126-
{
128+
await this.AssertAllStagesStoppedAsync(async() => {
127129
var probe = CreateTestProbe();
128130
var latch = new TestLatch(1);
129131

132+
#pragma warning disable CS0618 // Type or member is obsolete
130133
var p = Source.From(Enumerable.Range(1, 5)).RunWith(Sink.ForEachParallel<int>(3, n =>
131134
{
132135
if (n == 3)
@@ -135,11 +138,12 @@ public void A_ForeachParallel_must_finish_after_function_thrown_exception()
135138
probe.Ref.Tell(n);
136139
latch.Ready(TimeSpan.FromSeconds(10));
137140
}).WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.StoppingDecider)), Materializer);
138-
141+
#pragma warning restore CS0618 // Type or member is obsolete
142+
139143
// make sure the stream is up and running, otherwise the latch is maybe ready before the third message arrives
140-
Thread.Sleep(500);
144+
await Task.Delay(500);
141145
latch.CountDown();
142-
probe.ExpectMsgAllOf(new []{ 1, 2 });
146+
probe.ExpectMsgAllOf(new[] { 1, 2 });
143147

144148
var ex = p.Invoking(t => t.Wait(TimeSpan.FromSeconds(1))).Should().Throw<AggregateException>().Which;
145149
ex.Flatten().InnerException.Should().BeOfType<TestException>();
@@ -150,12 +154,14 @@ public void A_ForeachParallel_must_finish_after_function_thrown_exception()
150154
}
151155

152156
[Fact]
153-
public void A_ForeachParallel_must_handle_empty_source()
157+
public async Task A_ForeachParallel_must_handle_empty_source()
154158
{
155-
this.AssertAllStagesStopped(() =>
156-
{
159+
await this.AssertAllStagesStoppedAsync(() => {
160+
#pragma warning disable CS0618 // Type or member is obsolete
157161
var p = Source.From(new List<int>()).RunWith(Sink.ForEachParallel<int>(3, i => { }), Materializer);
162+
#pragma warning restore CS0618 // Type or member is obsolete
158163
p.Wait(TimeSpan.FromSeconds(2)).Should().BeTrue();
164+
return Task.CompletedTask;
159165
}, Materializer);
160166
}
161167
}

0 commit comments

Comments
 (0)