Skip to content

Commit 48c8077

Browse files
authored
[43-74] GraphConcatSpec (#6590)
* [43-74] `GraphConcatSpec` * Changes to `async` TestKit
1 parent 6220253 commit 48c8077

File tree

1 file changed

+33
-39
lines changed

1 file changed

+33
-39
lines changed

src/core/Akka.Streams.Tests/Dsl/GraphConcatSpec.cs

+33-39
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,9 @@ public ConcatFixture(GraphDsl.Builder<NotUsed> builder) : base(builder)
4545
}
4646

4747
[Fact]
48-
public void Concat_must_work_in_the_happy_case()
48+
public async Task Concat_must_work_in_the_happy_case()
4949
{
50-
this.AssertAllStagesStopped(() =>
51-
{
50+
await this.AssertAllStagesStoppedAsync(async() => {
5251
var probe = this.CreateManualSubscriberProbe<int>();
5352

5453
RunnableGraph.FromGraph(GraphDsl.Create(b =>
@@ -65,71 +64,68 @@ public void Concat_must_work_in_the_happy_case()
6564
return ClosedShape.Instance;
6665
})).Run(Materializer);
6766

68-
var subscription = probe.ExpectSubscription();
67+
var subscription = await probe.ExpectSubscriptionAsync();
6968

7069
for (var i = 1; i <= 10; i++)
7170
{
7271
subscription.Request(1);
73-
probe.ExpectNext(i);
72+
await probe.ExpectNextAsync(i);
7473
}
7574

76-
probe.ExpectComplete();
75+
await probe.ExpectCompleteAsync();
7776
}, Materializer);
7877
}
7978

8079
[Fact]
81-
public void Concat_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
80+
public async Task Concat_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
8281
{
83-
this.AssertAllStagesStopped(() =>
84-
{
82+
await this.AssertAllStagesStoppedAsync(async() => {
8583
var subscriber1 = Setup(CompletedPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
86-
var subscription1 = subscriber1.ExpectSubscription();
84+
var subscription1 = await subscriber1.ExpectSubscriptionAsync();
8785

8886
subscription1.Request(5);
89-
subscriber1.ExpectNext( 1, 2, 3, 4).ExpectComplete();
87+
await subscriber1.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();
9088

9189
var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), CompletedPublisher<int>());
92-
var subscription2 = subscriber2.ExpectSubscription();
90+
var subscription2 = await subscriber2.ExpectSubscriptionAsync();
9391

9492
subscription2.Request(5);
95-
subscriber2.ExpectNext( 1, 2, 3, 4).ExpectComplete();
93+
await subscriber2.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();
9694
}, Materializer);
9795
}
9896

9997
[Fact]
100-
public void Concat_must_work_with_one_delayed_completed_and_one_nonempty_publisher()
98+
public async Task Concat_must_work_with_one_delayed_completed_and_one_nonempty_publisher()
10199
{
102-
this.AssertAllStagesStopped(() =>
103-
{
100+
await this.AssertAllStagesStoppedAsync(async() => {
104101
var subscriber1 = Setup(SoonToCompletePublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
105-
var subscription1 = subscriber1.ExpectSubscription();
102+
var subscription1 = await subscriber1.ExpectSubscriptionAsync();
106103

107104
subscription1.Request(5);
108-
subscriber1.ExpectNext( 1, 2, 3, 4).ExpectComplete();
105+
await subscriber1.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();
109106

110107
var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), SoonToCompletePublisher<int>());
111-
var subscription2 = subscriber2.ExpectSubscription();
108+
var subscription2 = await subscriber2.ExpectSubscriptionAsync();
112109

113110
subscription2.Request(5);
114-
subscriber2.ExpectNext( 1, 2, 3, 4).ExpectComplete();
111+
await subscriber2.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync();
115112
}, Materializer);
116113
}
117114

118115
[Fact]
119-
public void Concat_must_work_with_one_immediately_failed_and_one_nonempty_publisher()
116+
public async Task Concat_must_work_with_one_immediately_failed_and_one_nonempty_publisher()
120117
{
121-
this.AssertAllStagesStopped(() =>
122-
{
118+
await this.AssertAllStagesStoppedAsync(async() => {
123119
var subscriber1 = Setup(FailedPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
124120
subscriber1.ExpectSubscriptionAndError().Should().Be(TestException());
125121

126122
var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), FailedPublisher<int>());
127-
subscriber2.ExpectSubscription().Request(5);
128-
129-
foreach (var i in Enumerable.Range(1,4))
123+
(await subscriber2.ExpectSubscriptionAsync()).Request(5);
124+
125+
foreach (var i in Enumerable.Range(1, 4))
130126
{
131127
var result = subscriber2.ExpectNextOrError();
132-
if(result is int && (int)result == i)
128+
if (result is int && (int)result == i)
133129
continue;
134130
if (result.Equals(TestException()))
135131
return;
@@ -140,12 +136,11 @@ public void Concat_must_work_with_one_immediately_failed_and_one_nonempty_publis
140136
}
141137

142138
[Fact]
143-
public void Concat_must_work_with_one_nonempty_publisher_and_one_delayed_failed_and()
139+
public async Task Concat_must_work_with_one_nonempty_publisher_and_one_delayed_failed_and()
144140
{
145-
this.AssertAllStagesStopped(() =>
146-
{
141+
await this.AssertAllStagesStoppedAsync(async() => {
147142
var subscriber = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), SoonToFailPublisher<int>());
148-
subscriber.ExpectSubscription().Request(5);
143+
(await subscriber.ExpectSubscriptionAsync()).Request(5);
149144

150145
foreach (var i in Enumerable.Range(1, 4))
151146
{
@@ -161,20 +156,19 @@ public void Concat_must_work_with_one_nonempty_publisher_and_one_delayed_failed_
161156
}
162157

163158
[Fact]
164-
public void Concat_must_work_with_one_delayed_failed_and_one_nonempty_publisher()
159+
public async Task Concat_must_work_with_one_delayed_failed_and_one_nonempty_publisher()
165160
{
166-
this.AssertAllStagesStopped(() =>
167-
{
161+
await this.AssertAllStagesStoppedAsync(() => {
168162
var subscriber1 = Setup(SoonToFailPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
169163
subscriber1.ExpectSubscriptionAndError().Should().Be(TestException());
164+
return Task.CompletedTask;
170165
}, Materializer);
171166
}
172167

173168
[Fact]
174-
public void Concat_must_correctly_handle_async_errors_in_secondary_upstream()
169+
public async Task Concat_must_correctly_handle_async_errors_in_secondary_upstream()
175170
{
176-
this.AssertAllStagesStopped(() =>
177-
{
171+
await this.AssertAllStagesStoppedAsync(async() => {
178172
var promise = new TaskCompletionSource<int>();
179173
var subscriber = this.CreateManualSubscriberProbe<int>();
180174

@@ -191,9 +185,9 @@ public void Concat_must_correctly_handle_async_errors_in_secondary_upstream()
191185
})).Run(Materializer);
192186

193187

194-
var subscription = subscriber.ExpectSubscription();
188+
var subscription = await subscriber.ExpectSubscriptionAsync();
195189
subscription.Request(4);
196-
subscriber.ExpectNext( 1, 2, 3);
190+
subscriber.ExpectNext(1, 2, 3);
197191
promise.SetException(TestException());
198192
subscriber.ExpectError().Should().Be(TestException());
199193
}, Materializer);

0 commit comments

Comments
 (0)