Skip to content

Commit 220cffb

Browse files
eabaAaronontheweb
andauthored
[21-74]FlowOnCompleteSpec (#6569)
* [21-74]`FlowOnCompleteSpec` * Changes to `async` TestKit --------- Co-authored-by: Aaron Stannard <[email protected]>
1 parent c1cbb7f commit 220cffb

File tree

1 file changed

+31
-33
lines changed

1 file changed

+31
-33
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowOnCompleteSpec.cs

+31-33
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
//-----------------------------------------------------------------------
77

88
using System;
9+
using System.Threading.Tasks;
910
using Akka.Actor;
1011
using Akka.Streams.Dsl;
1112
using Akka.Streams.TestKit;
@@ -25,66 +26,62 @@ public FlowOnCompleteSpec(ITestOutputHelper helper) : base(helper)
2526
}
2627

2728
[Fact]
28-
public void A_Flow_with_OnComplete_must_invoke_callback_on_normal_completion()
29+
public async Task A_Flow_with_OnComplete_must_invoke_callback_on_normal_completion()
2930
{
30-
this.AssertAllStagesStopped(() =>
31-
{
31+
await this.AssertAllStagesStoppedAsync(async() => {
3232
var onCompleteProbe = CreateTestProbe();
3333
var p = this.CreateManualPublisherProbe<int>();
3434
Source.FromPublisher(p)
3535
.To(Sink.OnComplete<int>(() => onCompleteProbe.Ref.Tell("done"), _ => { }))
3636
.Run(Materializer);
37-
var proc = p.ExpectSubscription();
38-
proc.ExpectRequest();
37+
var proc = await p.ExpectSubscriptionAsync();
38+
await proc.ExpectRequestAsync();
3939
proc.SendNext(42);
40-
onCompleteProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
40+
await onCompleteProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
4141
proc.SendComplete();
42-
onCompleteProbe.ExpectMsg("done");
42+
await onCompleteProbe.ExpectMsgAsync("done");
4343
}, Materializer);
4444
}
4545

4646
[Fact]
47-
public void A_Flow_with_OnComplete_must_yield_the_first_error()
47+
public async Task A_Flow_with_OnComplete_must_yield_the_first_error()
4848
{
49-
this.AssertAllStagesStopped(() =>
50-
{
49+
await this.AssertAllStagesStoppedAsync(async() => {
5150
var onCompleteProbe = CreateTestProbe();
5251
var p = this.CreateManualPublisherProbe<int>();
5352
Source.FromPublisher(p)
5453
.To(Sink.OnComplete<int>(() => { }, ex => onCompleteProbe.Ref.Tell(ex)))
5554
.Run(Materializer);
56-
var proc = p.ExpectSubscription();
57-
proc.ExpectRequest();
55+
var proc = await p.ExpectSubscriptionAsync();
56+
await proc.ExpectRequestAsync();
5857
var cause = new TestException("test");
5958
proc.SendError(cause);
60-
onCompleteProbe.ExpectMsg(cause);
61-
onCompleteProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
59+
await onCompleteProbe.ExpectMsgAsync(cause);
60+
await onCompleteProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
6261
}, Materializer);
6362
}
6463

6564
[Fact]
66-
public void A_Flow_with_OnComplete_must_invoke_callback_for_an_empty_stream()
65+
public async Task A_Flow_with_OnComplete_must_invoke_callback_for_an_empty_stream()
6766
{
68-
this.AssertAllStagesStopped(() =>
69-
{
67+
await this.AssertAllStagesStoppedAsync(async() => {
7068
var onCompleteProbe = CreateTestProbe();
7169
var p = this.CreateManualPublisherProbe<int>();
7270
Source.FromPublisher(p)
7371
.To(Sink.OnComplete<int>(() => onCompleteProbe.Ref.Tell("done"), _ => { }))
7472
.Run(Materializer);
75-
var proc = p.ExpectSubscription();
76-
proc.ExpectRequest();
73+
var proc = await p.ExpectSubscriptionAsync();
74+
await proc.ExpectRequestAsync();
7775
proc.SendComplete();
78-
onCompleteProbe.ExpectMsg("done");
79-
onCompleteProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
76+
await onCompleteProbe.ExpectMsgAsync("done");
77+
await onCompleteProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
8078
}, Materializer);
8179
}
8280

8381
[Fact]
84-
public void A_Flow_with_OnComplete_must_invoke_callback_after_transform_and_foreach_steps()
82+
public async Task A_Flow_with_OnComplete_must_invoke_callback_after_transform_and_foreach_steps()
8583
{
86-
this.AssertAllStagesStopped(() =>
87-
{
84+
await this.AssertAllStagesStoppedAsync(async() => {
8885
var onCompleteProbe = CreateTestProbe();
8986
var p = this.CreateManualPublisherProbe<int>();
9087
var foreachSink = Sink.ForEach<int>(x => onCompleteProbe.Ref.Tell("foreach-" + x));
@@ -95,18 +92,19 @@ public void A_Flow_with_OnComplete_must_invoke_callback_after_transform_and_fore
9592
}).RunWith(foreachSink, Materializer);
9693
future.ContinueWith(t => onCompleteProbe.Tell(t.IsCompleted ? "done" : "failure"));
9794

98-
var proc = p.ExpectSubscription();
99-
proc.ExpectRequest();
95+
var proc = await p.ExpectSubscriptionAsync();
96+
await proc.ExpectRequestAsync();
10097
proc.SendNext(42);
10198
proc.SendComplete();
102-
onCompleteProbe.ExpectMsg("map-42");
103-
onCompleteProbe.ExpectMsg("foreach-42");
104-
onCompleteProbe.ExpectMsg("done");
99+
await onCompleteProbe.ExpectMsgAsync("map-42");
100+
await onCompleteProbe.ExpectMsgAsync("foreach-42");
101+
await onCompleteProbe.ExpectMsgAsync("done");
102+
105103
}, Materializer);
106104
}
107105

108106
[Fact]
109-
public void A_Flow_with_OnComplete_must_yield_error_on_abrupt_termination()
107+
public async Task A_Flow_with_OnComplete_must_yield_error_on_abrupt_termination()
110108
{
111109
var materializer = ActorMaterializer.Create(Sys);
112110
var onCompleteProbe = CreateTestProbe();
@@ -115,11 +113,11 @@ public void A_Flow_with_OnComplete_must_yield_error_on_abrupt_termination()
115113
Source.FromPublisher(publisher).To(Sink.OnComplete<int>(() => onCompleteProbe.Ref.Tell("done"),
116114
ex => onCompleteProbe.Ref.Tell(ex)))
117115
.Run(materializer);
118-
var proc = publisher.ExpectSubscription();
119-
proc.ExpectRequest();
116+
var proc = await publisher.ExpectSubscriptionAsync();
117+
await proc.ExpectRequestAsync();
120118
materializer.Shutdown();
121119

122-
onCompleteProbe.ExpectMsg<AbruptTerminationException>();
120+
await onCompleteProbe.ExpectMsgAsync<AbruptTerminationException>();
123121
}
124122
}
125123
}

0 commit comments

Comments
 (0)