Skip to content

Commit 725f44b

Browse files
authored
[66-74] SubstreamSubscriptionTimeoutSpec (#6614)
* [66-74] `SubstreamSubscriptionTimeoutSpec` * Changes to `async`
1 parent d156ff4 commit 725f44b

File tree

1 file changed

+56
-44
lines changed

1 file changed

+56
-44
lines changed

src/core/Akka.Streams.Tests/Dsl/SubstreamSubscriptionTimeoutSpec.cs

+56-44
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
using System;
99
using System.Threading;
10+
using System.Threading.Tasks;
1011
using Akka.Streams.Dsl;
1112
using Akka.Streams.Implementation;
1213
using Akka.Streams.TestKit;
@@ -40,119 +41,130 @@ public SubstreamSubscriptionTimeoutSpec(ITestOutputHelper helper) : base(Config,
4041
}
4142

4243
[LocalFact(SkipLocal = "Racy on Azure DevOps")]
43-
public void GroupBy_and_SplitWhen_must_timeout_and_cancel_substream_publisher_when_no_one_subscribes_to_them_after_some_time()
44+
public async Task GroupBy_and_SplitWhen_must_timeout_and_cancel_substream_publisher_when_no_one_subscribes_to_them_after_some_time()
4445
{
45-
this.AssertAllStagesStopped(() =>
46-
{
46+
await this.AssertAllStagesStoppedAsync(async () => {
4747
var subscriber = this.CreateManualSubscriberProbe<(int, Source<int, NotUsed>)>();
4848
var publisherProbe = this.CreatePublisherProbe<int>();
4949
Source.FromPublisher(publisherProbe)
50-
.GroupBy(3, x => x%3)
51-
.Lift(x => x%3)
50+
.GroupBy(3, x => x % 3)
51+
.Lift(x => x % 3)
5252
.RunWith(Sink.FromSubscriber(subscriber), Materializer);
5353

54-
var downstreamSubscription = subscriber.ExpectSubscription();
54+
var downstreamSubscription = await subscriber.ExpectSubscriptionAsync();
5555
downstreamSubscription.Request(100);
5656

57-
publisherProbe.SendNext(1);
58-
publisherProbe.SendNext(2);
59-
publisherProbe.SendNext(3);
60-
57+
await publisherProbe.SendNextAsync(1);
58+
await publisherProbe.SendNextAsync(2);
59+
await publisherProbe.SendNextAsync(3);
60+
6161
/*
6262
* Why this spec is skipped: in the event that subscriber.ExpectSubscription() or (subscriber.ExpectNext()
6363
* + s1SubscriberProbe.ExpectSubscription()) exceeds 300ms, the next call to subscriber.ExpectNext will
6464
* fail. This test is too tightly fitted to the timeout duration to be reliable, although somewhat ironically
6565
* it does validate that the underlying cancellation does work!
6666
*/
6767

68-
var s1 = subscriber.ExpectNext().Item2;
68+
var item = await subscriber.ExpectNextAsync();
69+
var s1 = item.Item2;
6970
// should not break normal usage
7071
var s1SubscriberProbe = this.CreateManualSubscriberProbe<int>();
7172
s1.RunWith(Sink.FromSubscriber(s1SubscriberProbe), Materializer);
72-
var s1Subscription = s1SubscriberProbe.ExpectSubscription();
73+
var s1Subscription = await s1SubscriberProbe.ExpectSubscriptionAsync();
7374
s1Subscription.Request(100);
74-
s1SubscriberProbe.ExpectNext().Should().Be(1);
75+
var next = await s1SubscriberProbe.ExpectNextAsync();
76+
next.Should().Be(1);
7577

76-
var s2 = subscriber.ExpectNext().Item2;
78+
item = await subscriber.ExpectNextAsync();
79+
var s2 = item.Item2;
7780
// should not break normal usage
7881
var s2SubscriberProbe = this.CreateManualSubscriberProbe<int>();
7982
s2.RunWith(Sink.FromSubscriber(s2SubscriberProbe), Materializer);
80-
var s2Subscription = s2SubscriberProbe.ExpectSubscription();
83+
var s2Subscription = await s2SubscriberProbe.ExpectSubscriptionAsync();
8184
s2Subscription.Request(100);
82-
s2SubscriberProbe.ExpectNext().Should().Be(2);
85+
next = await s2SubscriberProbe.ExpectNextAsync();
86+
next.Should().Be(2);
8387

84-
var s3 = subscriber.ExpectNext().Item2;
88+
item = await subscriber.ExpectNextAsync();
89+
var s3 = item.Item2;
8590

8691
// sleep long enough for it to be cleaned up
87-
Thread.Sleep(1500);
92+
await Task.Delay(1500);
8893

8994
// Must be a Sink.seq, otherwise there is a race due to the concat in the `lift` implementation
9095
Action action = () => s3.RunWith(Sink.Seq<int>(), Materializer).Wait(RemainingOrDefault);
9196
action.Should().Throw<SubscriptionTimeoutException>();
9297

93-
publisherProbe.SendComplete();
98+
await publisherProbe.SendCompleteAsync();
9499
}, Materializer);
95100
}
96101

97102
[Fact]
98-
public void GroupBy_and_SplitWhen_must_timeout_and_stop_groupBy_parent_actor_if_none_of_the_substreams_are_actually_consumed()
103+
public async Task GroupBy_and_SplitWhen_must_timeout_and_stop_groupBy_parent_actor_if_none_of_the_substreams_are_actually_consumed()
99104
{
100-
this.AssertAllStagesStopped(() =>
101-
{
105+
await this.AssertAllStagesStoppedAsync(async() => {
102106
var subscriber = this.CreateManualSubscriberProbe<(int, Source<int, NotUsed>)>();
103107
var publisherProbe = this.CreatePublisherProbe<int>();
104108
Source.FromPublisher(publisherProbe)
105109
.GroupBy(2, x => x % 2)
106110
.Lift(x => x % 2).RunWith(Sink.FromSubscriber(subscriber), Materializer);
107111

108112

109-
var downstreamSubscription = subscriber.ExpectSubscription();
113+
var downstreamSubscription = await subscriber.ExpectSubscriptionAsync();
110114
downstreamSubscription.Request(100);
111115

112-
publisherProbe.SendNext(1);
113-
publisherProbe.SendNext(2);
114-
publisherProbe.SendNext(3);
115-
publisherProbe.SendComplete();
116+
await publisherProbe.SendNextAsync(1);
117+
await publisherProbe.SendNextAsync(2);
118+
await publisherProbe.SendNextAsync(3);
119+
await publisherProbe.SendCompleteAsync();
116120

117-
subscriber.ExpectNext();
118-
subscriber.ExpectNext();
121+
await subscriber.ExpectNextAsync();
122+
await subscriber.ExpectNextAsync();
119123
}, Materializer);
120124
}
121125

122126
[LocalFact(SkipLocal = "Racy on Azure DevOps")]
123-
public void GroupBy_and_SplitWhen_must_not_timeout_and_cancel_substream_publisher_when_they_have_been_subscribed_to()
127+
public async Task GroupBy_and_SplitWhen_must_not_timeout_and_cancel_substream_publisher_when_they_have_been_subscribed_to()
124128
{
125129
var subscriber = this.CreateManualSubscriberProbe<(int, Source<int, NotUsed>)>();
126130
var publisherProbe = this.CreatePublisherProbe<int>();
127131
Source.FromPublisher(publisherProbe)
128132
.GroupBy(2, x => x % 2)
129133
.Lift(x => x % 2).RunWith(Sink.FromSubscriber(subscriber), Materializer);
130134

131-
var downstreamSubscription = subscriber.ExpectSubscription();
135+
var downstreamSubscription = await subscriber.ExpectSubscriptionAsync();
132136
downstreamSubscription.Request(10);
133137

134-
publisherProbe.SendNext(1);
135-
publisherProbe.SendNext(2);
138+
await publisherProbe.SendNextAsync(1);
139+
await publisherProbe.SendNextAsync(2);
136140

137-
var s1 = subscriber.ExpectNext().Item2;
141+
var item = await subscriber.ExpectNextAsync();
142+
var s1 = item.Item2;
138143
var s1SubscriberProbe = this.CreateManualSubscriberProbe<int>();
139144
s1.RunWith(Sink.FromSubscriber(s1SubscriberProbe), Materializer);
140-
var s1Subscription = s1SubscriberProbe.ExpectSubscription();
145+
var s1Subscription = await s1SubscriberProbe.ExpectSubscriptionAsync();
141146
s1Subscription.Request(1);
142-
s1SubscriberProbe.ExpectNext().Should().Be(1);
147+
var s1Subscriber = await s1SubscriberProbe.ExpectNextAsync();
148+
s1Subscriber.Should().Be(1);
143149

144-
var s2 = subscriber.ExpectNext().Item2;
150+
item = await subscriber.ExpectNextAsync();
151+
var s2 = item.Item2;
145152
var s2SubscriberProbe = this.CreateManualSubscriberProbe<int>();
146153
s2.RunWith(Sink.FromSubscriber(s2SubscriberProbe), Materializer);
147-
var s2Subscription = s2SubscriberProbe.ExpectSubscription();
148-
Thread.Sleep(1500);
154+
var s2Subscription = await s2SubscriberProbe.ExpectSubscriptionAsync();
155+
await Task.Delay(1500);
149156
s2Subscription.Request(100);
150-
s2SubscriberProbe.ExpectNext().Should().Be(2);
157+
158+
var s2Subscriber = await s2SubscriberProbe.ExpectNextAsync();
159+
s2Subscriber.Should().Be(2);
160+
151161
s1Subscription.Request(100);
152-
publisherProbe.SendNext(3);
153-
publisherProbe.SendNext(4);
154-
s1SubscriberProbe.ExpectNext().Should().Be(3);
155-
s2SubscriberProbe.ExpectNext().Should().Be(4);
162+
await publisherProbe.SendNextAsync(3);
163+
await publisherProbe.SendNextAsync(4);
164+
var s1S = await s1SubscriberProbe.ExpectNextAsync();
165+
s1S.Should().Be(3);
166+
var s2S = await s2SubscriberProbe.ExpectNextAsync();
167+
s2S.Should().Be(4);
156168
}
157169
}
158170
}

0 commit comments

Comments
 (0)