Skip to content

Commit bdc526b

Browse files
authored
[49-74] GraphWireTapSpec (#6596)
* [49-74] `GraphWireTapSpec` * Changes to `async` TestKit
1 parent fb7bd28 commit bdc526b

File tree

1 file changed

+41
-46
lines changed

1 file changed

+41
-46
lines changed

src/core/Akka.Streams.Tests/Dsl/GraphWireTapSpec.cs

+41-46
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
using System.Linq;
99
using System.Threading;
10+
using System.Threading.Tasks;
1011
using Akka.Streams.Dsl;
1112
using Akka.Streams.TestKit;
1213
using Akka.TestKit;
@@ -27,76 +28,70 @@ public GraphWireTapSpec(ITestOutputHelper helper)
2728
}
2829

2930
[Fact]
30-
public void A_WireTap_must_broadcast_to_the_tap()
31+
public async Task A_WireTap_must_broadcast_to_the_tap()
3132
{
32-
this.AssertAllStagesStopped(() =>
33-
{
34-
var (tps, mps) = Source.From(Enumerable.Range(1, 2))
35-
.WireTapMaterialized(this.SinkProbe<int>(), Keep.Right)
36-
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
37-
.Run(Materializer);
33+
await this.AssertAllStagesStoppedAsync(async() => {
34+
var (tps, mps) = Source.From(Enumerable.Range(1, 2))
35+
.WireTapMaterialized(this.SinkProbe<int>(), Keep.Right)
36+
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
37+
.Run(Materializer);
3838

3939
tps.Request(2);
40-
mps.RequestNext(1);
41-
mps.RequestNext(2);
42-
tps.ExpectNext( 1, 2);
43-
mps.ExpectComplete();
44-
tps.ExpectComplete();
40+
await mps.RequestNextAsync(1);
41+
await mps.RequestNextAsync(2);
42+
tps.ExpectNext(1, 2);
43+
await mps.ExpectCompleteAsync();
44+
await tps.ExpectCompleteAsync();
4545
}, Materializer);
4646
}
4747

4848
[Fact]
49-
public void A_WireTap_must_drop_elements_while_the_tap_has_no_demand_buffering_up_to_one_element()
49+
public async Task A_WireTap_must_drop_elements_while_the_tap_has_no_demand_buffering_up_to_one_element()
5050
{
51-
this.AssertAllStagesStopped(() =>
52-
{
53-
var (tps, mps) = Source.From(Enumerable.Range(1, 6))
54-
.WireTapMaterialized(this.SinkProbe<int>(), Keep.Right)
55-
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
56-
.Run(Materializer);
57-
51+
await this.AssertAllStagesStoppedAsync(async() => {
52+
var (tps, mps) = Source.From(Enumerable.Range(1, 6))
53+
.WireTapMaterialized(this.SinkProbe<int>(), Keep.Right)
54+
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
55+
.Run(Materializer);
5856
mps.Request(3);
59-
mps.ExpectNext( 1, 2, 3);
57+
mps.ExpectNext(1, 2, 3);
6058
tps.Request(4);
61-
mps.RequestNext(4);
62-
mps.RequestNext(5);
63-
mps.RequestNext(6);
64-
tps.ExpectNext( 3, 4, 5, 6);
65-
mps.ExpectComplete();
66-
tps.ExpectComplete();
59+
await mps.RequestNextAsync(4);
60+
await mps.RequestNextAsync(5);
61+
await mps.RequestNextAsync(6);
62+
tps.ExpectNext(3, 4, 5, 6);
63+
await mps.ExpectCompleteAsync();
64+
await tps.ExpectCompleteAsync();
6765
}, Materializer);
6866
}
6967

7068
[Fact]
71-
public void A_WireTap_must_cancel_if_main_sink_cancels()
69+
public async Task A_WireTap_must_cancel_if_main_sink_cancels()
7270
{
73-
this.AssertAllStagesStopped(() =>
74-
{
75-
var (tps, mps) = Source.From(Enumerable.Range(1, 6))
76-
.WireTapMaterialized(this.SinkProbe<int>(), Keep.Right)
77-
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
78-
.Run(Materializer);
79-
71+
await this.AssertAllStagesStoppedAsync(async() => {
72+
var (tps, mps) = Source.From(Enumerable.Range(1, 6))
73+
.WireTapMaterialized(this.SinkProbe<int>(), Keep.Right)
74+
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
75+
.Run(Materializer);
76+
8077
tps.Request(6);
8178
mps.Cancel();
82-
tps.ExpectComplete();
79+
await tps.ExpectCompleteAsync();
8380
}, Materializer);
8481
}
8582

8683
[Fact]
87-
public void A_WireTap_must_continue_if_tap_sink_cancels()
84+
public async Task A_WireTap_must_continue_if_tap_sink_cancels()
8885
{
89-
this.AssertAllStagesStopped(() =>
90-
{
91-
var (tps, mps) = Source.From(Enumerable.Range(1, 6))
92-
.WireTapMaterialized(this.SinkProbe<int>(), Keep.Right)
93-
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
94-
.Run(Materializer);
95-
86+
await this.AssertAllStagesStoppedAsync(async() => {
87+
var (tps, mps) = Source.From(Enumerable.Range(1, 6))
88+
.WireTapMaterialized(this.SinkProbe<int>(), Keep.Right)
89+
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
90+
.Run(Materializer);
9691
tps.Cancel();
9792
mps.Request(6);
98-
mps.ExpectNext( 1, 2, 3, 4, 5, 6);
99-
mps.ExpectComplete();
93+
mps.ExpectNext(1, 2, 3, 4, 5, 6);
94+
await mps.ExpectCompleteAsync();
10095
}, Materializer);
10196
}
10297
}

0 commit comments

Comments
 (0)