Skip to content

Commit 5d68c3a

Browse files
authored
[46-74] GraphStageTimersSpec (#6593)
* [46-74] `GraphStageTimersSpec` * Changes to `async` TestKit
1 parent f2737fe commit 5d68c3a

File tree

1 file changed

+24
-25
lines changed

1 file changed

+24
-25
lines changed

src/core/Akka.Streams.Tests/Dsl/GraphStageTimersSpec.cs

+24-25
Original file line numberDiff line numberDiff line change
@@ -50,60 +50,59 @@ private SideChannel SetupIsolatedStage()
5050
public async Task GraphStage_timer_support_must_receive_single_shot_timer()
5151
{
5252
var driver = SetupIsolatedStage();
53-
await AwaitAssertAsync(() =>
53+
await AwaitAssertAsync(async() =>
5454
{
5555
driver.Tell(TestSingleTimer.Instance);
56-
ExpectMsg(new Tick(1), TimeSpan.FromSeconds(10));
57-
ExpectNoMsg(TimeSpan.FromSeconds(1));
56+
await ExpectMsgAsync(new Tick(1), TimeSpan.FromSeconds(10));
57+
await ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
5858
});
5959
driver.StopStage();
6060
}
6161

6262
[LocalFact(SkipLocal = "Racy on Azure DevOps")]
63-
public void GraphStage_timer_support_must_resubmit_single_shot_timer()
63+
public async Task GraphStage_timer_support_must_resubmit_single_shot_timer()
6464
{
6565
var driver = SetupIsolatedStage();
66-
Within(TimeSpan.FromSeconds(2.5), () =>
66+
await WithinAsync(TimeSpan.FromSeconds(2.5), async() =>
6767
{
68-
Within(TimeSpan.FromMilliseconds(500), TimeSpan.FromSeconds(1), () =>
68+
await WithinAsync(TimeSpan.FromMilliseconds(500), TimeSpan.FromSeconds(1), async() =>
6969
{
7070
driver.Tell(TestSingleTimerResubmit.Instance);
71-
ExpectMsg(new Tick(1));
71+
await ExpectMsgAsync(new Tick(1));
7272
});
73-
Within(TimeSpan.FromSeconds(1), () => ExpectMsg(new Tick(2)));
73+
await WithinAsync(TimeSpan.FromSeconds(1), async() => await ExpectMsgAsync(new Tick(2)));
7474

75-
ExpectNoMsg(TimeSpan.FromSeconds(1));
75+
await ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
7676
});
7777
driver.StopStage();
7878
}
7979

8080
[Fact]
81-
public void GraphStage_timer_support_must_correctly_cancel_a_named_timer()
81+
public async Task GraphStage_timer_support_must_correctly_cancel_a_named_timer()
8282
{
8383
var driver = SetupIsolatedStage();
8484
driver.Tell(TestCancelTimer.Instance);
85-
Within(TimeSpan.FromMilliseconds(5000), () => ExpectMsg<TestCancelTimerAck>());
86-
Within(TimeSpan.FromMilliseconds(200), TimeSpan.FromSeconds(3000), () => ExpectMsg(new Tick(1)));
87-
ExpectNoMsg(TimeSpan.FromSeconds(1));
85+
await WithinAsync(TimeSpan.FromMilliseconds(5000), async() => await ExpectMsgAsync<TestCancelTimerAck>());
86+
await WithinAsync(TimeSpan.FromMilliseconds(200), TimeSpan.FromSeconds(3000), async() => await ExpectMsgAsync(new Tick(1)));
87+
await ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
8888
driver.StopStage();
8989
}
9090

9191
[Fact]
92-
public void GraphStage_timer_support_must_receive_and_cancel_a_repeated_timer()
92+
public async Task GraphStage_timer_support_must_receive_and_cancel_a_repeated_timer()
9393
{
9494
var driver = SetupIsolatedStage();
9595
driver.Tell(TestRepeatedTimer.Instance);
9696
var seq = ReceiveWhile(TimeSpan.FromSeconds(30), o => (Tick)o, msgs: 5);
9797
seq.Should().HaveCount(5);
98-
ExpectNoMsg(TimeSpan.FromSeconds(1));
98+
await ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
9999
driver.StopStage();
100100
}
101101

102102
[Fact]
103-
public void GraphStage_timer_support_must_produce_scheduled_ticks_as_expected()
103+
public async Task GraphStage_timer_support_must_produce_scheduled_ticks_as_expected()
104104
{
105-
this.AssertAllStagesStopped(() =>
106-
{
105+
await this.AssertAllStagesStoppedAsync(async() => {
107106
var upstream = this.CreatePublisherProbe<int>();
108107
var downstream = this.CreateSubscriberProbe<int>();
109108

@@ -112,20 +111,19 @@ public void GraphStage_timer_support_must_produce_scheduled_ticks_as_expected()
112111
.RunWith(Sink.FromSubscriber(downstream), Materializer);
113112

114113
downstream.Request(5);
115-
downstream.ExpectNext( 1, 2, 3);
114+
downstream.ExpectNext(1, 2, 3);
116115

117-
downstream.ExpectNoMsg(TimeSpan.FromSeconds(1));
116+
await downstream.ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
118117

119-
upstream.SendComplete();
120-
downstream.ExpectComplete();
118+
await upstream.SendCompleteAsync();
119+
await downstream.ExpectCompleteAsync();
121120
}, Materializer);
122121
}
123122

124123
[Fact]
125-
public void GraphStage_timer_support_must_propagate_error_if_OnTimer_throws_an_Exception()
124+
public async Task GraphStage_timer_support_must_propagate_error_if_OnTimer_throws_an_Exception()
126125
{
127-
this.AssertAllStagesStopped(() =>
128-
{
126+
await this.AssertAllStagesStoppedAsync(() => {
129127
var exception = new TestException("Expected exception to the rule");
130128
var upstream = this.CreatePublisherProbe<int>();
131129
var downstream = this.CreateSubscriberProbe<int>();
@@ -136,6 +134,7 @@ public void GraphStage_timer_support_must_propagate_error_if_OnTimer_throws_an_E
136134

137135
downstream.Request(1);
138136
downstream.ExpectError().Should().Be(exception);
137+
return Task.CompletedTask;
139138
}, Materializer);
140139
}
141140

0 commit comments

Comments
 (0)