Skip to content

Commit f2737fe

Browse files
authored
[48-74] GraphUnzipWithSpec (#6595)
* [48-74] `GraphUnzipWithSpec` * Changes to `async` TestKit
1 parent bdc526b commit f2737fe

File tree

1 file changed

+79
-83
lines changed

1 file changed

+79
-83
lines changed

src/core/Akka.Streams.Tests/Dsl/GraphUnzipWithSpec.cs

+79-83
Original file line numberDiff line numberDiff line change
@@ -33,50 +33,49 @@ public GraphUnzipWithSpec(ITestOutputHelper helper) : base(helper)
3333
}
3434

3535
[Fact]
36-
public void UnzipWith_must_work_with_immediately_completed_publisher()
36+
public async Task UnzipWith_must_work_with_immediately_completed_publisher()
3737
{
38-
this.AssertAllStagesStopped(() =>
39-
{
38+
await this.AssertAllStagesStoppedAsync(() => {
4039
var subscribers = Setup(TestPublisher.Empty<int>());
4140
ValidateSubscriptionAndComplete(subscribers);
41+
return Task.CompletedTask;
4242
}, Materializer);
4343
}
4444

4545
[Fact]
46-
public void UnzipWith_must_work_with_delayed_completed_publisher()
46+
public async Task UnzipWith_must_work_with_delayed_completed_publisher()
4747
{
48-
this.AssertAllStagesStopped(() =>
49-
{
48+
await this.AssertAllStagesStoppedAsync(() => {
5049
var subscribers = Setup(TestPublisher.LazyEmpty<int>());
5150
ValidateSubscriptionAndComplete(subscribers);
51+
return Task.CompletedTask;
5252
}, Materializer);
5353
}
5454

5555
[Fact]
56-
public void UnzipWith_must_work_with_two_immediately_failed_publisher()
56+
public async Task UnzipWith_must_work_with_two_immediately_failed_publisher()
5757
{
58-
this.AssertAllStagesStopped(() =>
59-
{
58+
await this.AssertAllStagesStoppedAsync(() => {
6059
var subscribers = Setup(TestPublisher.Error<int>(TestException));
6160
ValidateSubscriptionAndError(subscribers);
61+
return Task.CompletedTask;
6262
}, Materializer);
6363
}
6464

6565
[Fact]
66-
public void UnzipWith_must_work_with_two_delayed_failed_publisher()
66+
public async Task UnzipWith_must_work_with_two_delayed_failed_publisher()
6767
{
68-
this.AssertAllStagesStopped(() =>
69-
{
68+
await this.AssertAllStagesStoppedAsync(() => {
7069
var subscribers = Setup(TestPublisher.LazyError<int>(TestException));
7170
ValidateSubscriptionAndError(subscribers);
71+
return Task.CompletedTask;
7272
}, Materializer);
7373
}
7474

7575
[Fact]
76-
public void UnzipWith_must_work_in_the_happy_case()
76+
public async Task UnzipWith_must_work_in_the_happy_case()
7777
{
78-
this.AssertAllStagesStopped(() =>
79-
{
78+
await this.AssertAllStagesStoppedAsync(async() => {
8079
var leftProbe = this.CreateManualSubscriberProbe<int>();
8180
var rightProbe = this.CreateManualSubscriberProbe<string>();
8281

@@ -96,49 +95,48 @@ public void UnzipWith_must_work_in_the_happy_case()
9695
return ClosedShape.Instance;
9796
})).Run(Materializer);
9897

99-
var leftSubscription = leftProbe.ExpectSubscription();
100-
var rightSubscription = rightProbe.ExpectSubscription();
98+
var leftSubscription = await leftProbe.ExpectSubscriptionAsync();
99+
var rightSubscription = await rightProbe.ExpectSubscriptionAsync();
101100

102101
leftSubscription.Request(2);
103102
rightSubscription.Request(1);
104103

105-
leftProbe.ExpectNext( 2, 4);
106-
leftProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
104+
leftProbe.ExpectNext(2, 4);
105+
await leftProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
106+
107+
await rightProbe.ExpectNextAsync("1+1");
108+
await rightProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
107109

108-
rightProbe.ExpectNext("1+1");
109-
rightProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
110-
111110
leftSubscription.Request(1);
112111
rightSubscription.Request(2);
113112

114113
leftProbe.ExpectNext(6);
115-
leftProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
114+
await leftProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
116115

117-
rightProbe.ExpectNext( "2+2", "3+3");
118-
rightProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
116+
rightProbe.ExpectNext("2+2", "3+3");
117+
await rightProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
119118

120119
leftSubscription.Request(1);
121120
rightSubscription.Request(1);
122121

123-
leftProbe.ExpectNext(8);
124-
rightProbe.ExpectNext("4+4");
122+
await leftProbe.ExpectNextAsync(8);
123+
await rightProbe.ExpectNextAsync("4+4");
125124

126-
leftProbe.ExpectComplete();
127-
rightProbe.ExpectComplete();
125+
await leftProbe.ExpectCompleteAsync();
126+
await rightProbe.ExpectCompleteAsync();
128127
}, Materializer);
129128
}
130129

131130
[Fact]
132-
public void UnzipWith_must_work_in_the_sad_case()
131+
public async Task UnzipWith_must_work_in_the_sad_case()
133132
{
134-
this.AssertAllStagesStopped(() =>
135-
{
133+
await this.AssertAllStagesStoppedAsync(async () => {
136134
var leftProbe = this.CreateManualSubscriberProbe<int>();
137135
var rightProbe = this.CreateManualSubscriberProbe<string>();
138136

139137
RunnableGraph.FromGraph(GraphDsl.Create(b =>
140138
{
141-
var unzip = b.Add(new UnzipWith<int, int, string>(i => (1/i, 1 + "/" + i)));
139+
var unzip = b.Add(new UnzipWith<int, int, string>(i => (1 / i, 1 + "/" + i)));
142140
var source = Source.From(Enumerable.Range(-2, 5));
143141

144142
b.From(source).To(unzip.In);
@@ -148,8 +146,8 @@ public void UnzipWith_must_work_in_the_sad_case()
148146
return ClosedShape.Instance;
149147
})).Run(Materializer);
150148

151-
var leftSubscription = leftProbe.ExpectSubscription();
152-
var rightSubscription = rightProbe.ExpectSubscription();
149+
var leftSubscription = await leftProbe.ExpectSubscriptionAsync();
150+
var rightSubscription = await rightProbe.ExpectSubscriptionAsync();
153151

154152
Action requestFromBoth = () =>
155153
{
@@ -158,28 +156,27 @@ public void UnzipWith_must_work_in_the_sad_case()
158156
};
159157

160158
requestFromBoth();
161-
leftProbe.ExpectNext(1/-2);
162-
rightProbe.ExpectNext("1/-2");
159+
await leftProbe.ExpectNextAsync(1 / -2);
160+
await rightProbe.ExpectNextAsync("1/-2");
163161

164162
requestFromBoth();
165-
leftProbe.ExpectNext(1 / -1);
166-
rightProbe.ExpectNext("1/-1");
163+
await leftProbe.ExpectNextAsync(1 / -1);
164+
await rightProbe.ExpectNextAsync("1/-1");
167165

168-
EventFilter.Exception<DivideByZeroException>().ExpectOne(requestFromBoth);
166+
await EventFilter.Exception<DivideByZeroException>().ExpectOneAsync(requestFromBoth);
169167

170168
leftProbe.ExpectError().Should().BeOfType<DivideByZeroException>();
171169
rightProbe.ExpectError().Should().BeOfType<DivideByZeroException>();
172170

173-
leftProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
174-
rightProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
171+
await leftProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
172+
await rightProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
175173
}, Materializer);
176174
}
177175

178176
[Fact]
179-
public void UnzipWith_must_propagate_last_downstream_cancellation_cause_once_all_downstream_have_cancelled()
177+
public async Task UnzipWith_must_propagate_last_downstream_cancellation_cause_once_all_downstream_have_cancelled()
180178
{
181-
this.AssertAllStagesStopped(() =>
182-
{
179+
await this.AssertAllStagesStoppedAsync(() => {
183180
var probe = CreateTestProbe();
184181
RunnableGraph.FromGraph(GraphDsl.Create(b =>
185182
{
@@ -196,7 +193,7 @@ public void UnzipWith_must_propagate_last_downstream_cancellation_cause_once_all
196193
var unzip = b.Add(new UnzipWith<int, int, string>(i => (1 / i, $"1 / {i}")));
197194

198195
b.From(source).To(unzip.In);
199-
196+
200197
Flow<T, T, NotUsed> KillSwitchFlow<T>()
201198
=> Flow.Create<T, NotUsed>()
202199
.ViaMaterialized(KillSwitches.Single<T>(), Keep.Right)
@@ -223,14 +220,14 @@ Flow<T, T, NotUsed> KillSwitchFlow<T>()
223220
t.Exception.Should().NotBeNull();
224221
t.Exception.InnerException.Should().Be(boom);
225222
});
223+
return Task.CompletedTask;
226224
}, Materializer);
227225
}
228226

229227
[Fact]
230-
public void UnzipWith_must_unzipWith_expanded_Person_unapply_3_outputs()
228+
public async Task UnzipWith_must_unzipWith_expanded_Person_unapply_3_outputs()
231229
{
232-
this.AssertAllStagesStopped(() =>
233-
{
230+
await this.AssertAllStagesStoppedAsync(async() => {
234231
var probe0 = this.CreateManualSubscriberProbe<string>();
235232
var probe1 = this.CreateManualSubscriberProbe<string>();
236233
var probe2 = this.CreateManualSubscriberProbe<int>();
@@ -248,31 +245,30 @@ public void UnzipWith_must_unzipWith_expanded_Person_unapply_3_outputs()
248245
return ClosedShape.Instance;
249246
})).Run(Materializer);
250247

251-
var subscription0 = probe0.ExpectSubscription();
252-
var subscription1 = probe1.ExpectSubscription();
253-
var subscription2 = probe2.ExpectSubscription();
248+
var subscription0 = await probe0.ExpectSubscriptionAsync();
249+
var subscription1 = await probe1.ExpectSubscriptionAsync();
250+
var subscription2 = await probe2.ExpectSubscriptionAsync();
254251

255252
subscription0.Request(1);
256253
subscription1.Request(1);
257254
subscription2.Request(1);
258255

259-
probe0.ExpectNext("Caplin");
260-
probe1.ExpectNext("Capybara");
261-
probe2.ExpectNext(55);
256+
await probe0.ExpectNextAsync("Caplin");
257+
await probe1.ExpectNextAsync("Capybara");
258+
await probe2.ExpectNextAsync(55);
262259

263-
probe0.ExpectComplete();
264-
probe1.ExpectComplete();
265-
probe2.ExpectComplete();
260+
await probe0.ExpectCompleteAsync();
261+
await probe1.ExpectCompleteAsync();
262+
await probe2.ExpectCompleteAsync();
266263
}, Materializer);
267264
}
268265

269266
[Fact]
270-
public void UnzipWith_must_work_with_up_to_6_outputs()
267+
public async Task UnzipWith_must_work_with_up_to_6_outputs()
271268
{
272269
// the jvm version uses 20 outputs but we have only 7 so changed this spec a little bit
273270

274-
this.AssertAllStagesStopped(() =>
275-
{
271+
await this.AssertAllStagesStoppedAsync(async() => {
276272
var probe0 = this.CreateManualSubscriberProbe<int>();
277273
var probe1 = this.CreateManualSubscriberProbe<string>();
278274
var probe2 = this.CreateManualSubscriberProbe<int>();
@@ -290,8 +286,8 @@ public void UnzipWith_must_work_with_up_to_6_outputs()
290286
(ints[0], ints[0].ToString(), ints[1], ints[1].ToString(), ints[2],
291287
ints[2].ToString())));
292288

293-
var source = Source.Single(Enumerable.Range(1,3).ToList());
294-
289+
var source = Source.Single(Enumerable.Range(1, 3).ToList());
290+
295291
b.From(source).To(unzip.In);
296292
b.From(unzip.Out0).To(Sink.FromSubscriber(probe0));
297293
b.From(unzip.Out1).To(Sink.FromSubscriber(probe1));
@@ -303,26 +299,26 @@ public void UnzipWith_must_work_with_up_to_6_outputs()
303299
return ClosedShape.Instance;
304300
})).Run(Materializer);
305301

306-
probe0.ExpectSubscription().Request(1);
307-
probe1.ExpectSubscription().Request(1);
308-
probe2.ExpectSubscription().Request(1);
309-
probe3.ExpectSubscription().Request(1);
310-
probe4.ExpectSubscription().Request(1);
311-
probe5.ExpectSubscription().Request(1);
312-
313-
probe0.ExpectNext(1);
314-
probe1.ExpectNext("1");
315-
probe2.ExpectNext(2);
316-
probe3.ExpectNext("2");
317-
probe4.ExpectNext(3);
318-
probe5.ExpectNext("3");
319-
320-
probe0.ExpectComplete();
321-
probe1.ExpectComplete();
322-
probe2.ExpectComplete();
323-
probe3.ExpectComplete();
324-
probe4.ExpectComplete();
325-
probe5.ExpectComplete();
302+
(await probe0.ExpectSubscriptionAsync()).Request(1);
303+
(await probe1.ExpectSubscriptionAsync()).Request(1);
304+
(await probe2.ExpectSubscriptionAsync()).Request(1);
305+
(await probe3.ExpectSubscriptionAsync()).Request(1);
306+
(await probe4.ExpectSubscriptionAsync()).Request(1);
307+
(await probe5.ExpectSubscriptionAsync()).Request(1);
308+
309+
await probe0.ExpectNextAsync(1);
310+
await probe1.ExpectNextAsync("1");
311+
await probe2.ExpectNextAsync(2);
312+
await probe3.ExpectNextAsync("2");
313+
await probe4.ExpectNextAsync(3);
314+
await probe5.ExpectNextAsync("3");
315+
316+
await probe0.ExpectCompleteAsync();
317+
await probe1.ExpectCompleteAsync();
318+
await probe2.ExpectCompleteAsync();
319+
await probe3.ExpectCompleteAsync();
320+
await probe4.ExpectCompleteAsync();
321+
await probe5.ExpectCompleteAsync();
326322
}, Materializer);
327323
}
328324

0 commit comments

Comments
 (0)