Skip to content

Commit 81a710a

Browse files
authored
[42-74] GraphBroadcastSpec (#6589)
* [42-74] `GraphBroadcastSpec` * Changes to `async` TestKit
1 parent 48c8077 commit 81a710a

File tree

1 file changed

+72
-80
lines changed

1 file changed

+72
-80
lines changed

src/core/Akka.Streams.Tests/Dsl/GraphBroadcastSpec.cs

+72-80
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,12 @@ public GraphBroadcastSpec(ITestOutputHelper helper) : base(helper)
3131
}
3232

3333
[Fact]
34-
public void A_Broadcast_must_broadcast_to_other_subscriber()
34+
public async Task A_Broadcast_must_broadcast_to_other_subscriber()
3535
{
36-
this.AssertAllStagesStopped(() =>
37-
{
36+
await this.AssertAllStagesStoppedAsync(async() => {
3837
var c1 = this.CreateManualSubscriberProbe<int>();
3938
var c2 = this.CreateManualSubscriberProbe<int>();
40-
RunnableGraph.FromGraph(GraphDsl.Create (b =>
39+
RunnableGraph.FromGraph(GraphDsl.Create(b =>
4140
{
4241
var broadcast = b.Add(new Broadcast<int>(2));
4342
var source = Source.From(Enumerable.Range(1, 3));
@@ -51,50 +50,49 @@ public void A_Broadcast_must_broadcast_to_other_subscriber()
5150
return ClosedShape.Instance;
5251
})).Run(Materializer);
5352

54-
var sub1 = c1.ExpectSubscription();
55-
var sub2 = c2.ExpectSubscription();
53+
var sub1 = await c1.ExpectSubscriptionAsync();
54+
var sub2 = await c2.ExpectSubscriptionAsync();
5655

5756
sub1.Request(1);
5857
sub2.Request(2);
5958

60-
c1.ExpectNext(1).ExpectNoMsg(TimeSpan.FromMilliseconds(100));
61-
c2.ExpectNext( 1, 2).ExpectNoMsg(TimeSpan.FromMilliseconds(100));
59+
await c1.ExpectNext(1).ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
60+
await c2.ExpectNext(1, 2).ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
6261
sub1.Request(3);
63-
c1.ExpectNext( 2, 3).ExpectComplete();
62+
await c1.ExpectNext(2, 3).ExpectCompleteAsync();
6463
sub2.Request(3);
65-
c2.ExpectNext(3).ExpectComplete();
64+
await c2.ExpectNext(3).ExpectCompleteAsync();
6665
}, Materializer);
6766
}
6867

6968
[Fact]
70-
public void A_Broadcast_must_work_with_one_way_broadcast()
69+
public async Task A_Broadcast_must_work_with_one_way_broadcast()
7170
{
72-
this.AssertAllStagesStopped(() =>
73-
{
74-
var t = Source.FromGraph(GraphDsl.Create(b =>
75-
{
76-
var broadcast = b.Add(new Broadcast<int>(1));
71+
await this.AssertAllStagesStoppedAsync(() => {
72+
var t = Source.FromGraph(GraphDsl.Create(b =>
73+
{
74+
var broadcast = b.Add(new Broadcast<int>(1));
7775
var source = b.Add(Source.From(Enumerable.Range(1, 3)));
78-
76+
7977
b.From(source).To(broadcast.In);
80-
81-
return new SourceShape<int>(broadcast.Out(0));
82-
})).RunAggregate(new List<int>(), (list, i) =>
83-
{
84-
list.Add(i);
85-
return list;
78+
79+
return new SourceShape<int>(broadcast.Out(0));
80+
})).RunAggregate(new List<int>(), (list, i) =>
81+
{
82+
list.Add(i);
83+
return list;
8684
}, Materializer);
8785

8886
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
89-
t.Result.Should().BeEquivalentTo(new[] {1, 2, 3});
87+
t.Result.Should().BeEquivalentTo(new[] { 1, 2, 3 });
88+
return Task.CompletedTask;
9089
}, Materializer);
9190
}
9291

9392
[Fact]
94-
public void A_Broadcast_must_work_with_n_way_broadcast()
93+
public async Task A_Broadcast_must_work_with_n_way_broadcast()
9594
{
96-
this.AssertAllStagesStopped(() =>
97-
{
95+
await this.AssertAllStagesStoppedAsync(() => {
9896
var headSink = Sink.First<IEnumerable<int>>();
9997

10098
var t = RunnableGraph.FromGraph(GraphDsl.Create(headSink, headSink, headSink, headSink, headSink, ValueTuple.Create,
@@ -116,14 +114,14 @@ public void A_Broadcast_must_work_with_n_way_broadcast()
116114
task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
117115
foreach (var list in task.Result)
118116
list.Should().BeEquivalentTo(new[] { 1, 2, 3 });
117+
return Task.CompletedTask;
119118
}, Materializer);
120119
}
121120

122121
[Fact(Skip="We don't have enough overloads for GraphDsl.Create")]
123-
public void A_Broadcast_must_with_22_way_broadcast()
122+
public async Task A_Broadcast_must_with_22_way_broadcast()
124123
{
125-
this.AssertAllStagesStopped(() =>
126-
{
124+
await this.AssertAllStagesStoppedAsync(() => {
127125
//var headSink = Sink.First<IEnumerable<int>>();
128126

129127
//var t = RunnableGraph.FromGraph(GraphDsl.Create(headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, headSink, ValueTuple.Create,
@@ -162,15 +160,14 @@ public void A_Broadcast_must_with_22_way_broadcast()
162160
//task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
163161
//foreach (var list in task.Result)
164162
// list.Should().BeEquivalentTo(new[] { 1, 2, 3 });
165-
163+
return Task.CompletedTask;
166164
}, Materializer);
167165
}
168166

169167
[Fact]
170-
public void A_Broadcast_must_produce_to_other_even_though_downstream_cancels()
168+
public async Task A_Broadcast_must_produce_to_other_even_though_downstream_cancels()
171169
{
172-
this.AssertAllStagesStopped(() =>
173-
{
170+
await this.AssertAllStagesStoppedAsync(async() => {
174171
var c1 = this.CreateManualSubscriberProbe<int>();
175172
var c2 = this.CreateManualSubscriberProbe<int>();
176173
RunnableGraph.FromGraph(GraphDsl.Create(b =>
@@ -187,20 +184,19 @@ public void A_Broadcast_must_produce_to_other_even_though_downstream_cancels()
187184
return ClosedShape.Instance;
188185
})).Run(Materializer);
189186

190-
var sub1 = c1.ExpectSubscription();
187+
var sub1 = await c1.ExpectSubscriptionAsync();
191188
sub1.Cancel();
192-
var sub2 = c2.ExpectSubscription();
189+
var sub2 = await c2.ExpectSubscriptionAsync();
193190
sub2.Request(3);
194-
c2.ExpectNext( 1, 2, 3);
195-
c2.ExpectComplete();
191+
c2.ExpectNext(1, 2, 3);
192+
await c2.ExpectCompleteAsync();
196193
}, Materializer);
197194
}
198195

199196
[Fact]
200-
public void A_Broadcast_must_produce_to_downstream_even_though_other_cancels()
197+
public async Task A_Broadcast_must_produce_to_downstream_even_though_other_cancels()
201198
{
202-
this.AssertAllStagesStopped(() =>
203-
{
199+
await this.AssertAllStagesStoppedAsync(async() => {
204200
var c1 = this.CreateManualSubscriberProbe<int>();
205201
var c2 = this.CreateManualSubscriberProbe<int>();
206202
RunnableGraph.FromGraph(GraphDsl.Create(b =>
@@ -217,20 +213,19 @@ public void A_Broadcast_must_produce_to_downstream_even_though_other_cancels()
217213
return ClosedShape.Instance;
218214
})).Run(Materializer);
219215

220-
var sub1 = c1.ExpectSubscription();
221-
var sub2 = c2.ExpectSubscription();
216+
var sub1 = await c1.ExpectSubscriptionAsync();
217+
var sub2 = await c2.ExpectSubscriptionAsync();
222218
sub2.Cancel();
223219
sub1.Request(3);
224-
c1.ExpectNext( 1, 2, 3);
225-
c1.ExpectComplete();
220+
c1.ExpectNext(1, 2, 3);
221+
await c1.ExpectCompleteAsync();
226222
}, Materializer);
227223
}
228224

229225
[Fact]
230-
public void A_Broadcast_must_cancel_upstream_when_downstreams_cancel()
226+
public async Task A_Broadcast_must_cancel_upstream_when_downstreams_cancel()
231227
{
232-
this.AssertAllStagesStopped(() =>
233-
{
228+
await this.AssertAllStagesStoppedAsync(async() => {
234229
var p1 = this.CreateManualPublisherProbe<int>();
235230
var c1 = this.CreateManualSubscriberProbe<int>();
236231
var c2 = this.CreateManualSubscriberProbe<int>();
@@ -248,30 +243,29 @@ public void A_Broadcast_must_cancel_upstream_when_downstreams_cancel()
248243
return ClosedShape.Instance;
249244
})).Run(Materializer);
250245

251-
var bSub = p1.ExpectSubscription();
252-
var sub1 = c1.ExpectSubscription();
253-
var sub2 = c2.ExpectSubscription();
246+
var bSub = await p1.ExpectSubscriptionAsync();
247+
var sub1 = await c1.ExpectSubscriptionAsync();
248+
var sub2 = await c2.ExpectSubscriptionAsync();
254249

255250
sub1.Request(3);
256251
sub2.Request(3);
257-
p1.ExpectRequest(bSub, 16);
252+
await p1.ExpectRequestAsync(bSub, 16);
258253
bSub.SendNext(1);
259-
c1.ExpectNext(1);
260-
c2.ExpectNext(1);
254+
await c1.ExpectNextAsync(1);
255+
await c2.ExpectNextAsync(1);
261256
bSub.SendNext(2);
262-
c1.ExpectNext(2);
263-
c2.ExpectNext(2);
257+
await c1.ExpectNextAsync(2);
258+
await c2.ExpectNextAsync(2);
264259
sub1.Cancel();
265260
sub2.Cancel();
266-
bSub.ExpectCancellation();
261+
await bSub.ExpectCancellationAsync();
267262
}, Materializer);
268263
}
269264

270265
[Fact]
271-
public void A_Broadcast_must_pass_along_early_cancellation()
266+
public async Task A_Broadcast_must_pass_along_early_cancellation()
272267
{
273-
this.AssertAllStagesStopped(() =>
274-
{
268+
await this.AssertAllStagesStoppedAsync(async() => {
275269
var c1 = this.CreateManualSubscriberProbe<int>();
276270
var c2 = this.CreateManualSubscriberProbe<int>();
277271

@@ -287,22 +281,21 @@ public void A_Broadcast_must_pass_along_early_cancellation()
287281

288282
var up = this.CreateManualPublisherProbe<int>();
289283

290-
var downSub1 = c1.ExpectSubscription();
291-
var downSub2 = c2.ExpectSubscription();
284+
var downSub1 = await c1.ExpectSubscriptionAsync();
285+
var downSub2 = await c2.ExpectSubscriptionAsync();
292286
downSub1.Cancel();
293287
downSub2.Cancel();
294288

295289
up.Subscribe(s);
296-
var upSub = up.ExpectSubscription();
297-
upSub.ExpectCancellation();
290+
var upSub = await up.ExpectSubscriptionAsync();
291+
await upSub.ExpectCancellationAsync();
298292
}, Materializer);
299293
}
300294

301295
[Fact]
302-
public void A_Broadcast_must_AltoTo_must_broadcast()
296+
public async Task A_Broadcast_must_AltoTo_must_broadcast()
303297
{
304-
this.AssertAllStagesStopped(() =>
305-
{
298+
await this.AssertAllStagesStoppedAsync(async() => {
306299
var p = this.SinkProbe<int>();
307300
var p2 = this.SinkProbe<int>();
308301

@@ -315,20 +308,19 @@ public void A_Broadcast_must_AltoTo_must_broadcast()
315308
var ps1 = t.Item1;
316309
var ps2 = t.Item2;
317310

318-
ps1.Request(6);
319-
ps2.Request(6);
320-
ps1.ExpectNext( 1, 2, 3, 4, 5, 6);
321-
ps2.ExpectNext( 1, 2, 3, 4, 5, 6);
322-
ps1.ExpectComplete();
323-
ps2.ExpectComplete();
311+
await ps1.RequestAsync(6);
312+
await ps2.RequestAsync(6);
313+
ps1.ExpectNext(1, 2, 3, 4, 5, 6);
314+
ps2.ExpectNext(1, 2, 3, 4, 5, 6);
315+
await ps1.ExpectCompleteAsync();
316+
await ps2.ExpectCompleteAsync();
324317
}, Materializer);
325318
}
326319

327320
[Fact]
328-
public void A_Broadcast_must_AlsoTo_must_continue_if_sink_cancels()
321+
public async Task A_Broadcast_must_AlsoTo_must_continue_if_sink_cancels()
329322
{
330-
this.AssertAllStagesStopped(() =>
331-
{
323+
await this.AssertAllStagesStoppedAsync(async() => {
332324
var p = this.SinkProbe<int>();
333325
var p2 = this.SinkProbe<int>();
334326

@@ -340,11 +332,11 @@ public void A_Broadcast_must_AlsoTo_must_continue_if_sink_cancels()
340332

341333
var ps1 = t.Item1;
342334
var ps2 = t.Item2;
343-
344-
ps2.Request(6);
335+
336+
await ps2.RequestAsync(6);
345337
ps1.Cancel();
346-
ps2.ExpectNext( 1, 2, 3, 4, 5, 6);
347-
ps2.ExpectComplete();
338+
ps2.ExpectNext(1, 2, 3, 4, 5, 6);
339+
await ps2.ExpectCompleteAsync();
348340
}, Materializer);
349341
}
350342
}

0 commit comments

Comments
 (0)