8
8
using System ;
9
9
using System . Linq ;
10
10
using System . Threading ;
11
+ using System . Threading . Tasks ;
11
12
using Akka . Streams . Dsl ;
12
13
using Akka . Streams . TestKit ;
13
14
using Akka . TestKit ;
@@ -28,83 +29,81 @@ public FlowWatchTerminationSpec(ITestOutputHelper helper) : base(helper)
28
29
}
29
30
30
31
[ Fact ]
31
- public void A_WatchTermination_must_complete_the_future_when_stream_is_completed ( )
32
+ public async Task A_WatchTermination_must_complete_the_future_when_stream_is_completed ( )
32
33
{
33
- this . AssertAllStagesStopped ( ( ) =>
34
- {
35
- var t =
36
- Source . From ( Enumerable . Range ( 1 , 4 ) )
37
- . WatchTermination ( Keep . Right )
38
- . ToMaterialized ( this . SinkProbe < int > ( ) , Keep . Both )
39
- . Run ( Materializer ) ;
34
+ await this . AssertAllStagesStoppedAsync ( async ( ) => {
35
+ var t =
36
+ Source . From ( Enumerable . Range ( 1 , 4 ) )
37
+ . WatchTermination ( Keep . Right )
38
+ . ToMaterialized ( this . SinkProbe < int > ( ) , Keep . Both )
39
+ . Run ( Materializer ) ;
40
40
var future = t . Item1 ;
41
41
var p = t . Item2 ;
42
42
43
- p . Request ( 4 ) . ExpectNext ( 1 , 2 , 3 , 4 ) ;
43
+ p . Request ( 4 ) . ExpectNext ( 1 , 2 , 3 , 4 ) ;
44
44
future . Wait ( TimeSpan . FromSeconds ( 3 ) ) . Should ( ) . BeTrue ( ) ;
45
- p . ExpectComplete ( ) ;
45
+ await p . ExpectCompleteAsync ( ) ;
46
46
} , Materializer ) ;
47
47
}
48
48
49
49
[ Fact ]
50
- public void A_WatchTermination_must_complete_the_future_when_stream_is_cancelled_from_downstream ( )
50
+ public async Task A_WatchTermination_must_complete_the_future_when_stream_is_cancelled_from_downstream ( )
51
51
{
52
- this . AssertAllStagesStopped ( ( ) =>
53
- {
54
- var t =
55
- Source . From ( Enumerable . Range ( 1 , 4 ) )
56
- . WatchTermination ( Keep . Right )
57
- . ToMaterialized ( this . SinkProbe < int > ( ) , Keep . Both )
58
- . Run ( Materializer ) ;
52
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
53
+ var t =
54
+ Source . From ( Enumerable . Range ( 1 , 4 ) )
55
+ . WatchTermination ( Keep . Right )
56
+ . ToMaterialized ( this . SinkProbe < int > ( ) , Keep . Both )
57
+ . Run ( Materializer ) ;
59
58
var future = t . Item1 ;
60
59
var p = t . Item2 ;
61
60
62
- p . Request ( 3 ) . ExpectNext ( 1 , 2 , 3 ) ;
61
+ p . Request ( 3 ) . ExpectNext ( 1 , 2 , 3 ) ;
63
62
p . Cancel ( ) ;
64
63
future . Wait ( TimeSpan . FromSeconds ( 3 ) ) . Should ( ) . BeTrue ( ) ;
64
+ return Task . CompletedTask ;
65
65
} , Materializer ) ;
66
66
}
67
67
68
68
[ Fact ]
69
- public void A_WatchTermination_must_fail_the_future_when_stream_is_failed ( )
69
+ public async Task A_WatchTermination_must_fail_the_future_when_stream_is_failed ( )
70
70
{
71
- this . AssertAllStagesStopped ( ( ) =>
72
- {
71
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
73
72
var ex = new Exception ( "Stream failed." ) ;
74
73
var t = this . SourceProbe < int > ( ) . WatchTermination ( Keep . Both ) . To ( Sink . Ignore < int > ( ) ) . Run ( Materializer ) ;
75
74
var p = t . Item1 ;
76
75
var future = t . Item2 ;
77
76
p . SendNext ( 1 ) ;
78
77
p . SendError ( ex ) ;
79
78
future . Invoking ( f => f . Wait ( ) ) . Should ( ) . Throw < Exception > ( ) . WithMessage ( "Stream failed." ) ;
79
+ return Task . CompletedTask ;
80
80
} , Materializer ) ;
81
81
}
82
82
83
83
[ Fact ]
84
- public void A_WatchTermination_must_complete_the_future_for_an_empty_stream ( )
84
+ public async Task A_WatchTermination_must_complete_the_future_for_an_empty_stream ( )
85
85
{
86
- this . AssertAllStagesStopped ( ( ) =>
87
- {
88
- var t =
89
- Source . Empty < int > ( )
90
- . WatchTermination ( Keep . Right )
91
- . ToMaterialized ( this . SinkProbe < int > ( ) , Keep . Both )
92
- . Run ( Materializer ) ;
86
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
87
+ var t =
88
+ Source . Empty < int > ( )
89
+ . WatchTermination ( Keep . Right )
90
+ . ToMaterialized ( this . SinkProbe < int > ( ) , Keep . Both )
91
+ . Run ( Materializer ) ;
93
92
var future = t . Item1 ;
94
93
var p = t . Item2 ;
95
94
p . Request ( 1 ) ;
96
95
future . Wait ( TimeSpan . FromSeconds ( 3 ) ) . Should ( ) . BeTrue ( ) ;
96
+ return Task . CompletedTask ;
97
97
} , Materializer ) ;
98
98
}
99
99
100
100
[ Fact ( Skip = "We need a way to combine multiple sources with different materializer types" ) ]
101
- public void A_WatchTermination_must_complete_the_future_for_graph ( )
101
+ public async Task A_WatchTermination_must_complete_the_future_for_graph ( )
102
102
{
103
- this . AssertAllStagesStopped ( ( ) =>
104
- {
103
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
105
104
//var first = this.SourceProbe<int>().WatchTermination(Keep.Both);
106
105
//var second = Source.From(Enumerable.Range(2, 4)).MapMaterializedValue(new Func<NotUsed, (TestPublisher.Probe<int>, Task)>(_ => null));
107
-
106
+
108
107
//var t = Source.FromGraph(
109
108
// GraphDsl.Create<SourceShape<int>, (TestPublisher.Probe<int>, Task)>(b =>
110
109
// {
@@ -128,6 +127,7 @@ public void A_WatchTermination_must_complete_the_future_for_graph()
128
127
129
128
//sourceProbe.SendComplete();
130
129
//sinkProbe.ExpectNextN(new[] {2, 3, 4, 5}).ExpectComplete();
130
+ return Task . CompletedTask ;
131
131
} , Materializer ) ;
132
132
}
133
133
0 commit comments