19
19
using FluentAssertions ;
20
20
using Xunit ;
21
21
using FluentAssertions . Extensions ;
22
+ using Xunit . Sdk ;
22
23
23
24
namespace Akka . Streams . Tests . Dsl
24
25
{
@@ -30,11 +31,11 @@ public LazySourceSpec()
30
31
}
31
32
32
33
private ActorMaterializer Materializer { get ; }
33
-
34
+
34
35
[ Fact ]
35
- public void A_lazy_source_must_work_like_a_normal_source_happy_path ( )
36
+ public async Task A_lazy_source_must_work_like_a_normal_source_happy_path ( )
36
37
{
37
- this . AssertAllStagesStopped ( async ( ) =>
38
+ await this . AssertAllStagesStoppedAsync ( async ( ) =>
38
39
{
39
40
var result = Source . Lazily ( ( ) => Source . From ( new [ ] { 1 , 2 , 3 } ) ) . RunWith ( Sink . Seq < int > ( ) , Materializer ) ;
40
41
var complete = await result . ShouldCompleteWithin ( 3 . Seconds ( ) ) ;
@@ -43,10 +44,9 @@ public void A_lazy_source_must_work_like_a_normal_source_happy_path()
43
44
}
44
45
45
46
[ Fact ]
46
- public void A_lazy_source_must_work_never_construct_the_source_when_there_was_no_demand ( )
47
+ public async Task A_lazy_source_must_work_never_construct_the_source_when_there_was_no_demand ( )
47
48
{
48
- this . AssertAllStagesStopped ( ( ) =>
49
- {
49
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
50
50
var probe = this . CreateSubscriberProbe < int > ( ) ;
51
51
var constructed = new AtomicBoolean ( ) ;
52
52
Source . Lazily ( ( ) =>
@@ -57,49 +57,48 @@ public void A_lazy_source_must_work_never_construct_the_source_when_there_was_no
57
57
58
58
probe . Cancel ( ) ;
59
59
constructed . Value . Should ( ) . BeFalse ( ) ;
60
+ return Task . CompletedTask ;
60
61
} , Materializer ) ;
61
62
}
62
63
63
64
[ Fact ]
64
- public void A_lazy_source_must_fail_the_materialized_value_when_downstream_cancels_without_ever_consuming_any_element ( )
65
+ public async Task A_lazy_source_must_fail_the_materialized_value_when_downstream_cancels_without_ever_consuming_any_element ( )
65
66
{
66
- this . AssertAllStagesStopped ( ( ) =>
67
- {
68
- var result = Source . Lazily ( ( ) => Source . From ( new [ ] { 1 , 2 , 3 } ) )
69
- . ToMaterialized ( Sink . Cancelled < int > ( ) , Keep . Left )
70
- . Run ( Materializer ) ;
67
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
68
+ var result = Source . Lazily ( ( ) => Source . From ( new [ ] { 1 , 2 , 3 } ) )
69
+ . ToMaterialized ( Sink . Cancelled < int > ( ) , Keep . Left )
70
+ . Run ( Materializer ) ;
71
71
72
- Intercept ( ( ) =>
72
+ AssertThrows < Exception > ( ( ) =>
73
73
{
74
74
var boom = result . Result ;
75
75
} ) ;
76
+ return Task . CompletedTask ;
76
77
} , Materializer ) ;
77
78
}
78
79
79
80
[ Fact ]
80
- public void A_lazy_source_must_stop_consuming_when_downstream_has_cancelled ( )
81
+ public async Task A_lazy_source_must_stop_consuming_when_downstream_has_cancelled ( )
81
82
{
82
- this . AssertAllStagesStopped ( ( ) =>
83
- {
83
+ await this . AssertAllStagesStoppedAsync ( async ( ) => {
84
84
var outProbe = this . CreateSubscriberProbe < int > ( ) ;
85
85
var inProbe = this . CreatePublisherProbe < int > ( ) ;
86
86
87
87
Source . Lazily ( ( ) => Source . FromPublisher ( inProbe ) ) . RunWith ( Sink . FromSubscriber ( outProbe ) , Materializer ) ;
88
88
89
89
outProbe . Request ( 1 ) ;
90
- inProbe . ExpectRequest ( ) ;
91
- inProbe . SendNext ( 27 ) ;
92
- outProbe . ExpectNext ( 27 ) ;
93
- outProbe . Cancel ( ) ;
94
- inProbe . ExpectCancellation ( ) ;
90
+ await inProbe . ExpectRequestAsync ( ) ;
91
+ await inProbe . SendNextAsync ( 27 ) ;
92
+ await outProbe . ExpectNextAsync ( 27 ) ;
93
+ await outProbe . CancelAsync ( ) ;
94
+ await inProbe . ExpectCancellationAsync ( ) ;
95
95
} , Materializer ) ;
96
96
}
97
97
98
98
[ Fact ]
99
- public void A_lazy_source_must_materialize_when_the_source_has_been_created ( )
99
+ public async Task A_lazy_source_must_materialize_when_the_source_has_been_created ( )
100
100
{
101
- this . AssertAllStagesStopped ( ( ) =>
102
- {
101
+ await this . AssertAllStagesStoppedAsync ( async ( ) => {
103
102
var probe = this . CreateSubscriberProbe < int > ( ) ;
104
103
105
104
var task = Source . Lazily ( ( ) => Source . From ( new [ ] { 1 , 2 , 3 } ) . MapMaterializedValue ( _ => Done . Instance ) )
@@ -108,18 +107,17 @@ public void A_lazy_source_must_materialize_when_the_source_has_been_created()
108
107
109
108
task . IsCompleted . Should ( ) . BeFalse ( ) ;
110
109
probe . Request ( 1 ) ;
111
- probe . ExpectNext ( 1 ) ;
110
+ await probe . ExpectNextAsync ( 1 ) ;
112
111
task . Result . Should ( ) . Be ( Done . Instance ) ;
113
112
114
113
probe . Cancel ( ) ;
115
114
} , Materializer ) ;
116
115
}
117
116
118
117
[ Fact ]
119
- public void A_lazy_source_must_propagate_downstream_cancellation_cause_when_inner_source_has_been_materialized ( )
118
+ public async Task A_lazy_source_must_propagate_downstream_cancellation_cause_when_inner_source_has_been_materialized ( )
120
119
{
121
- this . AssertAllStagesStopped ( ( ) =>
122
- {
120
+ await this . AssertAllStagesStoppedAsync ( async ( ) => {
123
121
var probe = CreateTestProbe ( ) ;
124
122
var ( doneF , killSwitch ) = Source . Lazily ( ( ) =>
125
123
{
@@ -138,7 +136,7 @@ public void A_lazy_source_must_propagate_downstream_cancellation_cause_when_inne
138
136
. Run ( Materializer ) ;
139
137
140
138
var boom = new TestException ( "boom" ) ;
141
- probe . ExpectMsg < Done > ( ) ;
139
+ await probe . ExpectMsgAsync < Done > ( ) ;
142
140
killSwitch . Abort ( boom ) ;
143
141
doneF . ContinueWith ( t =>
144
142
{
@@ -150,19 +148,18 @@ public void A_lazy_source_must_propagate_downstream_cancellation_cause_when_inne
150
148
}
151
149
152
150
[ Fact ]
153
- public void A_lazy_source_must_fail_stage_when_upstream_fails ( )
151
+ public async Task A_lazy_source_must_fail_stage_when_upstream_fails ( )
154
152
{
155
- this . AssertAllStagesStopped ( ( ) =>
156
- {
153
+ await this . AssertAllStagesStoppedAsync ( async ( ) => {
157
154
var outProbe = this . CreateSubscriberProbe < int > ( ) ;
158
155
var inProbe = this . CreatePublisherProbe < int > ( ) ;
159
156
160
157
Source . Lazily ( ( ) => Source . FromPublisher ( inProbe ) ) . RunWith ( Sink . FromSubscriber ( outProbe ) , Materializer ) ;
161
158
162
159
outProbe . Request ( 1 ) ;
163
- inProbe . ExpectRequest ( ) ;
164
- inProbe . SendNext ( 27 ) ;
165
- outProbe . ExpectNext ( 27 ) ;
160
+ await inProbe . ExpectRequestAsync ( ) ;
161
+ await inProbe . SendNextAsync ( 27 ) ;
162
+ await outProbe . ExpectNextAsync ( 27 ) ;
166
163
167
164
var testException = new TestException ( "OMG Who set that on fire !?!" ) ;
168
165
inProbe . SendError ( testException ) ;
@@ -171,9 +168,9 @@ public void A_lazy_source_must_fail_stage_when_upstream_fails()
171
168
}
172
169
173
170
[ Fact ]
174
- public void A_lazy_source_must_propagate_attributes_to_inner_stream ( )
171
+ public async Task A_lazy_source_must_propagate_attributes_to_inner_stream ( )
175
172
{
176
- this . AssertAllStagesStopped ( async ( ) =>
173
+ await this . AssertAllStagesStoppedAsync ( async ( ) =>
177
174
{
178
175
var attributesSource = Source . FromGraph ( new AttibutesSourceStage ( ) )
179
176
. AddAttributes ( Attributes . CreateName ( "inner" ) ) ;
@@ -193,9 +190,9 @@ public void A_lazy_source_must_propagate_attributes_to_inner_stream()
193
190
}
194
191
195
192
[ Fact ]
196
- public void A_lazy_source_must_fail_correctly_when_materialization_of_inner_source_fails ( )
193
+ public async Task A_lazy_source_must_fail_correctly_when_materialization_of_inner_source_fails ( )
197
194
{
198
- this . AssertAllStagesStopped ( ( ) =>
195
+ await this . AssertAllStagesStoppedAsync ( ( ) =>
199
196
{
200
197
var matFail = new TestException ( "fail!" ) ;
201
198
@@ -207,12 +204,12 @@ public void A_lazy_source_must_fail_correctly_when_materialization_of_inner_sour
207
204
{
208
205
task . Wait ( TimeSpan . FromSeconds ( 1 ) ) ;
209
206
}
210
- catch ( AggregateException ) { }
207
+ catch ( AggregateException ) { }
211
208
212
209
task . IsFaulted . ShouldBe ( true ) ;
213
210
task . Exception . ShouldNotBe ( null ) ;
214
211
task . Exception . InnerException . Should ( ) . BeEquivalentTo ( matFail ) ;
215
-
212
+ return Task . CompletedTask ;
216
213
} , Materializer ) ;
217
214
}
218
215
0 commit comments