7
7
8
8
using System ;
9
9
using System . Linq ;
10
+ using System . Threading . Tasks ;
10
11
using Akka . Streams . Dsl ;
11
12
using Akka . Streams . TestKit ;
12
13
using Akka . TestKit ;
@@ -26,22 +27,21 @@ public FlowDetacherSpec(ITestOutputHelper helper) : base(helper)
26
27
}
27
28
28
29
[ Fact ]
29
- public void A_Detacher_must_pass_through_all_elements ( )
30
+ public async Task A_Detacher_must_pass_through_all_elements ( )
30
31
{
31
- this . AssertAllStagesStopped ( ( ) =>
32
- {
33
- Source . From ( Enumerable . Range ( 1 , 100 ) )
34
- . Detach ( )
35
- . RunWith ( Sink . Seq < int > ( ) , Materializer )
36
- . Result . Should ( ) . BeEquivalentTo ( Enumerable . Range ( 1 , 100 ) ) ;
32
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
33
+ Source . From ( Enumerable . Range ( 1 , 100 ) )
34
+ . Detach ( )
35
+ . RunWith ( Sink . Seq < int > ( ) , Materializer )
36
+ . Result . Should ( ) . BeEquivalentTo ( Enumerable . Range ( 1 , 100 ) ) ;
37
+ return Task . CompletedTask ;
37
38
} , Materializer ) ;
38
39
}
39
40
40
41
[ Fact ]
41
- public void A_Detacher_must_pass_through_failure ( )
42
+ public async Task A_Detacher_must_pass_through_failure ( )
42
43
{
43
- this . AssertAllStagesStopped ( ( ) =>
44
- {
44
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
45
45
var ex = new TestException ( "buh" ) ;
46
46
var result = Source . From ( Enumerable . Range ( 1 , 100 ) ) . Select ( x =>
47
47
{
@@ -51,17 +51,17 @@ public void A_Detacher_must_pass_through_failure()
51
51
} ) . Detach ( ) . RunWith ( Sink . Seq < int > ( ) , Materializer ) ;
52
52
53
53
result . Invoking ( r => r . Wait ( TimeSpan . FromSeconds ( 2 ) ) ) . Should ( ) . Throw < TestException > ( ) . And . Should ( ) . Be ( ex ) ;
54
+ return Task . CompletedTask ;
54
55
} , Materializer ) ;
55
56
}
56
57
57
58
[ Fact ]
58
- public void A_Detacher_must_emit_the_last_element_when_completed_Without_demand ( )
59
+ public async Task A_Detacher_must_emit_the_last_element_when_completed_Without_demand ( )
59
60
{
60
- this . AssertAllStagesStopped ( ( ) =>
61
- {
61
+ await this . AssertAllStagesStoppedAsync ( async ( ) => {
62
62
var probe = Source . Single ( 42 ) . Detach ( ) . RunWith ( this . SinkProbe < int > ( ) , Materializer ) . EnsureSubscription ( ) ;
63
- probe . ExpectNoMsg ( TimeSpan . FromMilliseconds ( 500 ) ) ;
64
- probe . RequestNext ( 42 ) ;
63
+ await probe . ExpectNoMsgAsync ( TimeSpan . FromMilliseconds ( 500 ) ) ;
64
+ await probe . RequestNextAsync ( 42 ) ;
65
65
} , Materializer ) ;
66
66
}
67
67
}
0 commit comments