Skip to content

Commit fb7bd28

Browse files
authored
[50-74] GraphZipSpec (#6597)
* [50-74] `GraphZipSpec` * Changes to `async/await`
1 parent a58cab8 commit fb7bd28

File tree

1 file changed

+65
-73
lines changed

1 file changed

+65
-73
lines changed

src/core/Akka.Streams.Tests/Dsl/GraphZipSpec.cs

+65-73
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,16 @@ public ZipFixture(GraphDsl.Builder<NotUsed> builder) : base(builder)
4343
}
4444

4545
[Fact]
46-
public void Zip_must_work_in_the_happy_case()
46+
public async Task Zip_must_work_in_the_happy_case()
4747
{
48-
this.AssertAllStagesStopped(() =>
49-
{
48+
await this.AssertAllStagesStoppedAsync(async() => {
5049
var probe = this.CreateManualSubscriberProbe<(int, string)>();
5150

5251
RunnableGraph.FromGraph(GraphDsl.Create(b =>
5352
{
5453
var zip = b.Add(new Zip<int, string>());
5554
var source1 = Source.From(Enumerable.Range(1, 4));
56-
var source2 = Source.From(new[] {"A", "B", "C", "D", "E", "F"});
55+
var source2 = Source.From(new[] { "A", "B", "C", "D", "E", "F" });
5756

5857
b.From(source1).To(zip.In0);
5958
b.From(source2).To(zip.In1);
@@ -62,24 +61,23 @@ public void Zip_must_work_in_the_happy_case()
6261
return ClosedShape.Instance;
6362
})).Run(Materializer);
6463

65-
var subscription = probe.ExpectSubscription();
64+
var subscription = await probe.ExpectSubscriptionAsync();
6665

6766
subscription.Request(2);
68-
probe.ExpectNext((1, "A"));
69-
probe.ExpectNext((2, "B"));
67+
await probe.ExpectNextAsync((1, "A"));
68+
await probe.ExpectNextAsync((2, "B"));
7069
subscription.Request(1);
71-
probe.ExpectNext((3, "C"));
70+
await probe.ExpectNextAsync((3, "C"));
7271
subscription.Request(1);
73-
probe.ExpectNext((4, "D"));
74-
probe.ExpectComplete();
72+
await probe.ExpectNextAsync((4, "D"));
73+
await probe.ExpectCompleteAsync();
7574
}, Materializer);
7675
}
7776

7877
[Fact]
79-
public void Zip_must_complete_if_one_side_is_available_but_other_already_completed()
78+
public async Task Zip_must_complete_if_one_side_is_available_but_other_already_completed()
8079
{
81-
this.AssertAllStagesStopped(() =>
82-
{
80+
await this.AssertAllStagesStoppedAsync(async() => {
8381
var upstream1 = this.CreatePublisherProbe<int>();
8482
var upstream2 = this.CreatePublisherProbe<string>();
8583

@@ -96,21 +94,20 @@ public void Zip_must_complete_if_one_side_is_available_but_other_already_complet
9694
return ClosedShape.Instance;
9795
})).Run(Materializer);
9896

99-
upstream1.SendNext(1);
100-
upstream1.SendNext(2);
101-
upstream2.SendNext("A");
102-
upstream2.SendComplete();
97+
await upstream1.SendNextAsync(1);
98+
await upstream1.SendNextAsync(2);
99+
await upstream2.SendNextAsync("A");
100+
await upstream2.SendCompleteAsync();
103101

104102
completed.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
105-
upstream1.ExpectCancellation();
103+
await upstream1.ExpectCancellationAsync();
106104
}, Materializer);
107105
}
108106

109107
[Fact]
110-
public void Zip_must_complete_even_if_no_pending_demand()
108+
public async Task Zip_must_complete_even_if_no_pending_demand()
111109
{
112-
this.AssertAllStagesStopped(() =>
113-
{
110+
await this.AssertAllStagesStoppedAsync(async() => {
114111
var upstream1 = this.CreatePublisherProbe<int>();
115112
var upstream2 = this.CreatePublisherProbe<string>();
116113
var downstream = this.CreateSubscriberProbe<(int, string)>();
@@ -130,21 +127,20 @@ public void Zip_must_complete_even_if_no_pending_demand()
130127

131128
downstream.Request(1);
132129

133-
upstream1.SendNext(1);
134-
upstream2.SendNext("A");
135-
downstream.ExpectNext((1, "A"));
130+
await upstream1.SendNextAsync(1);
131+
await upstream2.SendNextAsync("A");
132+
await downstream.ExpectNextAsync((1, "A"));
136133

137-
upstream2.SendComplete();
138-
downstream.ExpectComplete();
139-
upstream1.ExpectCancellation();
134+
await upstream2.SendCompleteAsync();
135+
await downstream.ExpectCompleteAsync();
136+
await upstream1.ExpectCancellationAsync();
140137
}, Materializer);
141138
}
142139

143140
[Fact]
144-
public void Zip_must_complete_if_both_sides_complete_before_requested_with_elements_pending_2()
141+
public async Task Zip_must_complete_if_both_sides_complete_before_requested_with_elements_pending_2()
145142
{
146-
this.AssertAllStagesStopped(() =>
147-
{
143+
await this.AssertAllStagesStoppedAsync(async() => {
148144
var upstream1 = this.CreatePublisherProbe<int>();
149145
var upstream2 = this.CreatePublisherProbe<string>();
150146
var downstream = this.CreateSubscriberProbe<(int, string)>();
@@ -162,22 +158,21 @@ public void Zip_must_complete_if_both_sides_complete_before_requested_with_eleme
162158
return ClosedShape.Instance;
163159
})).Run(Materializer);
164160

165-
upstream1.SendNext(1);
166-
upstream2.SendNext("A");
161+
await upstream1.SendNextAsync(1);
162+
await upstream2.SendNextAsync("A");
167163

168-
upstream1.SendComplete();
169-
upstream2.SendComplete();
164+
await upstream1.SendCompleteAsync();
165+
await upstream2.SendCompleteAsync();
170166

171-
downstream.RequestNext((1, "A"));
172-
downstream.ExpectComplete();
167+
await downstream.RequestNextAsync((1, "A"));
168+
await downstream.ExpectCompleteAsync();
173169
}, Materializer);
174170
}
175171

176172
[Fact]
177-
public void Zip_must_complete_if_one_side_complete_before_requested_with_elements_pending()
173+
public async Task Zip_must_complete_if_one_side_complete_before_requested_with_elements_pending()
178174
{
179-
this.AssertAllStagesStopped(() =>
180-
{
175+
await this.AssertAllStagesStoppedAsync(async() => {
181176
var upstream1 = this.CreatePublisherProbe<int>();
182177
var upstream2 = this.CreatePublisherProbe<string>();
183178
var downstream = this.CreateSubscriberProbe<(int, string)>();
@@ -195,23 +190,22 @@ public void Zip_must_complete_if_one_side_complete_before_requested_with_element
195190
return ClosedShape.Instance;
196191
})).Run(Materializer);
197192

198-
upstream1.SendNext(1);
199-
upstream1.SendNext(2);
200-
upstream2.SendNext("A");
193+
await upstream1.SendNextAsync(1);
194+
await upstream1.SendNextAsync(2);
195+
await upstream2.SendNextAsync("A");
201196

202-
upstream1.SendComplete();
203-
upstream2.SendComplete();
197+
await upstream1.SendCompleteAsync();
198+
await upstream2.SendCompleteAsync();
204199

205-
downstream.RequestNext((1, "A"));
206-
downstream.ExpectComplete();
200+
await downstream.RequestNextAsync((1, "A"));
201+
await downstream.ExpectCompleteAsync();
207202
}, Materializer);
208203
}
209204

210205
[Fact]
211-
public void Zip_must_complete_if_one_side_complete_before_requested_with_elements_pending_2()
206+
public async Task Zip_must_complete_if_one_side_complete_before_requested_with_elements_pending_2()
212207
{
213-
this.AssertAllStagesStopped(() =>
214-
{
208+
await this.AssertAllStagesStoppedAsync(async() => {
215209
var upstream1 = this.CreatePublisherProbe<int>();
216210
var upstream2 = this.CreatePublisherProbe<string>();
217211
var downstream = this.CreateSubscriberProbe<(int, string)>();
@@ -229,69 +223,67 @@ public void Zip_must_complete_if_one_side_complete_before_requested_with_element
229223
return ClosedShape.Instance;
230224
})).Run(Materializer);
231225

232-
downstream.EnsureSubscription();
226+
await downstream.EnsureSubscriptionAsync();
233227

234-
upstream1.SendNext(1);
235-
upstream1.SendComplete();
236-
downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
228+
await upstream1.SendNextAsync(1);
229+
await upstream1.SendCompleteAsync();
230+
await downstream.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
237231

238-
upstream2.SendNext("A");
239-
upstream2.SendComplete();
232+
await upstream2.SendNextAsync("A");
233+
await upstream2.SendCompleteAsync();
240234

241-
downstream.RequestNext((1, "A"));
242-
downstream.ExpectComplete();
235+
await downstream.RequestNextAsync((1, "A"));
236+
await downstream.ExpectCompleteAsync();
243237
}, Materializer);
244238
}
245239

246240
[Fact]
247-
public void Zip_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
241+
public async Task Zip_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
248242
{
249-
this.AssertAllStagesStopped(() =>
250-
{
243+
await this.AssertAllStagesStoppedAsync(async() => {
251244
var subscriber1 = Setup(CompletedPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
252-
subscriber1.ExpectSubscriptionAndComplete();
245+
await subscriber1.ExpectSubscriptionAndCompleteAsync();
253246

254247
var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), CompletedPublisher<int>());
255-
subscriber2.ExpectSubscriptionAndComplete();
248+
await subscriber2.ExpectSubscriptionAndCompleteAsync();
256249
}, Materializer);
257250
}
258251

259252
[Fact]
260-
public void Zip_must_work_with_one_delayed_completed_and_one_nonempty_publisher()
253+
public async Task Zip_must_work_with_one_delayed_completed_and_one_nonempty_publisher()
261254
{
262-
this.AssertAllStagesStopped(() =>
263-
{
255+
await this.AssertAllStagesStoppedAsync(async() => {
264256
var subscriber1 = Setup(SoonToCompletePublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
265-
subscriber1.ExpectSubscriptionAndComplete();
257+
await subscriber1.ExpectSubscriptionAndCompleteAsync();
266258

267259
var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), SoonToCompletePublisher<int>());
268-
subscriber2.ExpectSubscriptionAndComplete();
260+
await subscriber2.ExpectSubscriptionAndCompleteAsync();
269261
}, Materializer);
270262
}
271263

272264
[Fact]
273-
public void Zip_must_work_with_one_immediately_failed_and_one_nonempty_publisher()
265+
public async Task Zip_must_work_with_one_immediately_failed_and_one_nonempty_publisher()
274266
{
275-
this.AssertAllStagesStopped(() =>
276-
{
267+
await this.AssertAllStagesStoppedAsync(() => {
277268
var subscriber1 = Setup(FailedPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
278269
subscriber1.ExpectSubscriptionAndError().Should().Be(TestException());
279270

280271
var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), FailedPublisher<int>());
281272
subscriber2.ExpectSubscriptionAndError().Should().Be(TestException());
273+
return Task.CompletedTask;
282274
}, Materializer);
283275
}
284276

285277
[Fact]
286-
public void Zip_must_work_with_one_delayed_failed_and_one_nonempty_publisher()
278+
public async Task Zip_must_work_with_one_delayed_failed_and_one_nonempty_publisher()
287279
{
288-
this.AssertAllStagesStopped(() =>
289-
{
280+
await this.AssertAllStagesStoppedAsync(() => {
290281
var subscriber1 = Setup(SoonToFailPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
291282
subscriber1.ExpectSubscriptionAndError().Should().Be(TestException());
292283

293284
var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), SoonToFailPublisher<int>());
294285
subscriber2.ExpectSubscriptionAndError().Should().Be(TestException());
286+
return Task.CompletedTask;
295287
}, Materializer);
296288
}
297289
}

0 commit comments

Comments
 (0)