Skip to content

Commit b3e27a9

Browse files
eabaAaronontheweb
andauthored
[12-74]FlowFromTaskSpec (#6556)
* [12-74]`FlowFromTaskSpec` * Changes to `async` TestKit --------- Co-authored-by: Aaron Stannard <[email protected]>
1 parent 58435a0 commit b3e27a9

File tree

1 file changed

+37
-40
lines changed

1 file changed

+37
-40
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowFromTaskSpec.cs

+37-40
Original file line numberDiff line numberDiff line change
@@ -28,108 +28,105 @@ public FlowFromTaskSpec(ITestOutputHelper helper) : base(helper)
2828
}
2929

3030
[Fact]
31-
public void A_Flow_based_on_a_Task_must_produce_one_element_from_already_successful_Future()
31+
public async Task A_Flow_based_on_a_Task_must_produce_one_element_from_already_successful_Future()
3232
{
33-
this.AssertAllStagesStopped(() =>
34-
{
35-
var c = this.CreateManualSubscriberProbe<int>();
36-
var p = Source.FromTask(Task.FromResult(1)).RunWith(Sink.AsPublisher<int>(true), Materializer);
37-
p.Subscribe(c);
38-
var sub = c.ExpectSubscription();
39-
c.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
40-
sub.Request(1);
41-
c.ExpectNext(1);
42-
c.ExpectComplete();
33+
await this.AssertAllStagesStoppedAsync(async() => {
34+
var c = this.CreateManualSubscriberProbe<int>();
35+
var p = Source.FromTask(Task.FromResult(1)).RunWith(Sink.AsPublisher<int>(true), Materializer);
36+
p.Subscribe(c);
37+
var sub = await c.ExpectSubscriptionAsync();
38+
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
39+
sub.Request(1);
40+
await c.ExpectNextAsync(1);
41+
await c.ExpectCompleteAsync();
4342
}, Materializer);
4443
}
4544

4645
[Fact]
47-
public void A_Flow_based_on_a_Task_must_produce_error_from_already_failed_Task()
46+
public async Task A_Flow_based_on_a_Task_must_produce_error_from_already_failed_Task()
4847
{
49-
this.AssertAllStagesStopped(() =>
50-
{
48+
await this.AssertAllStagesStoppedAsync(() => {
5149
var ex = new TestException("test");
5250
var c = this.CreateManualSubscriberProbe<int>();
5351
var p =
5452
Source.FromTask(Task.Run(new Func<int>(() => { throw ex; })))
5553
.RunWith(Sink.AsPublisher<int>(false), Materializer);
5654
p.Subscribe(c);
5755
c.ExpectSubscriptionAndError().Should().Be(ex);
56+
return Task.CompletedTask;
5857
}, Materializer);
5958
}
6059

6160
[Fact]
62-
public void A_Flow_based_on_a_Task_must_produce_one_element_when_Task_is_completed()
61+
public async Task A_Flow_based_on_a_Task_must_produce_one_element_when_Task_is_completed()
6362
{
64-
this.AssertAllStagesStopped(() =>
65-
{
63+
await this.AssertAllStagesStoppedAsync(async() => {
6664
var promise = new TaskCompletionSource<int>();
6765
var c = this.CreateManualSubscriberProbe<int>();
6866
var p = Source.FromTask(promise.Task).RunWith(Sink.AsPublisher<int>(true), Materializer);
6967
p.Subscribe(c);
70-
var sub = c.ExpectSubscription();
68+
var sub = await c.ExpectSubscriptionAsync();
7169
sub.Request(1);
72-
c.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
70+
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
7371
promise.SetResult(1);
74-
c.ExpectNext(1);
75-
c.ExpectComplete();
76-
c.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
72+
await c.ExpectNextAsync(1);
73+
await c.ExpectCompleteAsync();
74+
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
7775
}, Materializer);
7876
}
7977

8078
[Fact]
81-
public void A_Flow_based_on_a_Task_must_produce_one_element_when_Task_is_completed_but_not_before_request()
79+
public async Task A_Flow_based_on_a_Task_must_produce_one_element_when_Task_is_completed_but_not_before_request()
8280
{
8381
var promise = new TaskCompletionSource<int>();
8482
var c = this.CreateManualSubscriberProbe<int>();
8583
var p = Source.FromTask(promise.Task).RunWith(Sink.AsPublisher<int>(true), Materializer);
8684
p.Subscribe(c);
87-
var sub = c.ExpectSubscription();
85+
var sub = await c.ExpectSubscriptionAsync();
8886
promise.SetResult(1);
89-
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
87+
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
9088
sub.Request(1);
91-
c.ExpectNext(1);
92-
c.ExpectComplete();
89+
await c.ExpectNextAsync(1);
90+
await c.ExpectCompleteAsync();
9391
}
9492

9593
[Fact]
96-
public void A_Flow_based_on_a_Task_must_produce_elements_with_multiple_subscribers()
94+
public async Task A_Flow_based_on_a_Task_must_produce_elements_with_multiple_subscribers()
9795
{
98-
this.AssertAllStagesStopped(() =>
99-
{
96+
await this.AssertAllStagesStoppedAsync(async() => {
10097
var promise = new TaskCompletionSource<int>();
10198
var p = Source.FromTask(promise.Task).RunWith(Sink.AsPublisher<int>(true), Materializer);
10299
var c1 = this.CreateManualSubscriberProbe<int>();
103100
var c2 = this.CreateManualSubscriberProbe<int>();
104101
p.Subscribe(c1);
105102
p.Subscribe(c2);
106-
var sub1 = c1.ExpectSubscription();
107-
var sub2 = c2.ExpectSubscription();
103+
var sub1 = await c1.ExpectSubscriptionAsync();
104+
var sub2 = await c2.ExpectSubscriptionAsync();
108105
sub1.Request(1);
109106
promise.SetResult(1);
110107
sub2.Request(2);
111-
c1.ExpectNext(1);
112-
c2.ExpectNext(1);
113-
c1.ExpectComplete();
114-
c2.ExpectComplete();
108+
await c1.ExpectNextAsync(1);
109+
await c2.ExpectNextAsync(1);
110+
await c1.ExpectCompleteAsync();
111+
await c2.ExpectCompleteAsync();
115112
}, Materializer);
116113
}
117114

118115
[Fact]
119-
public void A_Flow_based_on_a_Task_must_allow_cancel_before_receiving_element()
116+
public async Task A_Flow_based_on_a_Task_must_allow_cancel_before_receiving_element()
120117
{
121118
var promise = new TaskCompletionSource<int>();
122119
var c = this.CreateManualSubscriberProbe<int>();
123120
var p = Source.FromTask(promise.Task).RunWith(Sink.AsPublisher<int>(true), Materializer);
124121
var keepAlive = this.CreateManualSubscriberProbe<int>();
125122
p.Subscribe(keepAlive);
126123
p.Subscribe(c);
127-
var sub = c.ExpectSubscription();
124+
var sub = await c.ExpectSubscriptionAsync();
128125
sub.Request(1);
129126
sub.Cancel();
130-
c.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
127+
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
131128
promise.SetResult(1);
132-
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
129+
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
133130
}
134131
}
135132
}

0 commit comments

Comments
 (0)