@@ -63,9 +63,9 @@ public StreamPuppet(IPublisher<int> p, TestKitBase kit)
63
63
public void Cancel ( ) => _subscription . Cancel ( ) ;
64
64
}
65
65
66
- private void WithSubstreamsSupport ( int splitWhen = 3 , int elementCount = 6 ,
66
+ private async Task WithSubstreamsSupportAsync ( int splitWhen = 3 , int elementCount = 6 ,
67
67
SubstreamCancelStrategy substreamCancelStrategy = SubstreamCancelStrategy . Drain ,
68
- Action < TestSubscriber . ManualProbe < Source < int , NotUsed > > , ISubscription , Func < Source < int , NotUsed > > > run = null )
68
+ Action < TestSubscriber . ManualProbe < Source < int , NotUsed > > , ISubscription , Func < Task < Source < int , NotUsed > > > > run = null )
69
69
{
70
70
71
71
var source = Source . From ( Enumerable . Range ( 1 , elementCount ) ) ;
@@ -75,23 +75,23 @@ private void WithSubstreamsSupport(int splitWhen = 3, int elementCount = 6,
75
75
. RunWith ( Sink . AsPublisher < Source < int , NotUsed > > ( false ) , Materializer ) ;
76
76
var masterSubscriber = TestSubscriber . CreateManualSubscriberProbe < Source < int , NotUsed > > ( this ) ;
77
77
groupStream . Subscribe ( masterSubscriber ) ;
78
- var masterSubscription = masterSubscriber . ExpectSubscription ( ) ;
78
+ var masterSubscription = await masterSubscriber . ExpectSubscriptionAsync ( ) ;
79
79
80
- run ? . Invoke ( masterSubscriber , masterSubscription , ( ) =>
80
+ run ? . Invoke ( masterSubscriber , masterSubscription , async ( ) =>
81
81
{
82
82
masterSubscription . Request ( 1 ) ;
83
- return masterSubscriber . ExpectNext ( ) ;
83
+ return await masterSubscriber . ExpectNextAsync ( ) ;
84
84
} ) ;
85
85
}
86
86
87
87
[ Fact ]
88
88
public async Task SplitWhen_must_work_in_the_happy_case ( )
89
89
{
90
- await this . AssertAllStagesStoppedAsync ( ( ) => {
91
- WithSubstreamsSupport ( elementCount : 4 ,
90
+ await this . AssertAllStagesStoppedAsync ( async ( ) => {
91
+ await WithSubstreamsSupportAsync ( elementCount : 4 ,
92
92
run : async ( masterSubscriber , masterSubscription , getSubFlow ) =>
93
93
{
94
- var p = getSubFlow ( )
94
+ var p = ( await getSubFlow ( ) )
95
95
. RunWith ( Sink . AsPublisher < int > ( false ) , Materializer ) ;
96
96
var kit = this ;
97
97
var probe = kit . CreateManualSubscriberProbe < int > ( ) ;
@@ -106,7 +106,7 @@ await this.AssertAllStagesStoppedAsync(() => {
106
106
subscription . Request ( 1 ) ;
107
107
await probe . ExpectCompleteAsync ( ) ;
108
108
109
- p = getSubFlow ( )
109
+ p = ( await getSubFlow ( ) )
110
110
. RunWith ( Sink . AsPublisher < int > ( false ) , Materializer ) ;
111
111
kit = this ;
112
112
probe = kit . CreateManualSubscriberProbe < int > ( ) ;
@@ -128,7 +128,7 @@ await this.AssertAllStagesStoppedAsync(() => {
128
128
masterSubscription . Request ( 1 ) ;
129
129
await masterSubscriber . ExpectCompleteAsync ( ) ;
130
130
} ) ;
131
- return Task . CompletedTask ;
131
+ // return Task.CompletedTask;
132
132
} , Materializer ) ;
133
133
}
134
134
@@ -153,12 +153,12 @@ await this.AssertAllStagesStoppedAsync(() => {
153
153
[ Fact ]
154
154
public async Task SplitWhen_must_work_when_first_element_is_split_by ( )
155
155
{
156
- await this . AssertAllStagesStoppedAsync ( ( ) =>
156
+ await this . AssertAllStagesStoppedAsync ( async ( ) =>
157
157
{
158
- WithSubstreamsSupport ( 1 , 3 ,
158
+ await WithSubstreamsSupportAsync ( 1 , 3 ,
159
159
run : async ( masterSubscriber , masterSubscription , getSubFlow ) =>
160
160
{
161
- var s1 = new StreamPuppet ( getSubFlow ( )
161
+ var s1 = new StreamPuppet ( ( await getSubFlow ( ) )
162
162
. RunWith ( Sink . AsPublisher < int > ( false ) , Materializer ) , this ) ;
163
163
164
164
s1 . Request ( 5 ) ;
@@ -170,22 +170,22 @@ await this.AssertAllStagesStoppedAsync(() =>
170
170
masterSubscription . Request ( 1 ) ;
171
171
await masterSubscriber . ExpectCompleteAsync ( ) ;
172
172
} ) ;
173
- return Task . CompletedTask ;
173
+ // return Task.CompletedTask;
174
174
} , Materializer ) ;
175
175
}
176
176
177
177
[ Fact ]
178
178
public async Task SplitWhen_must_support_cancelling_substreams ( )
179
179
{
180
- await this . AssertAllStagesStoppedAsync ( ( ) =>
180
+ await this . AssertAllStagesStoppedAsync ( async ( ) =>
181
181
{
182
- WithSubstreamsSupport ( 5 , 8 ,
182
+ await WithSubstreamsSupportAsync ( 5 , 8 ,
183
183
run : async ( masterSubscriber , masterSubscription , getSubFlow ) =>
184
184
{
185
- var s1 = new StreamPuppet ( getSubFlow ( )
185
+ var s1 = new StreamPuppet ( ( await getSubFlow ( ) )
186
186
. RunWith ( Sink . AsPublisher < int > ( false ) , Materializer ) , this ) ;
187
187
s1 . Cancel ( ) ;
188
- var s2 = new StreamPuppet ( getSubFlow ( )
188
+ var s2 = new StreamPuppet ( ( await getSubFlow ( ) )
189
189
. RunWith ( Sink . AsPublisher < int > ( false ) , Materializer ) , this ) ;
190
190
191
191
s2 . Request ( 4 ) ;
@@ -199,7 +199,7 @@ await this.AssertAllStagesStoppedAsync(() =>
199
199
masterSubscription . Request ( 1 ) ;
200
200
await masterSubscriber . ExpectCompleteAsync ( ) ;
201
201
} ) ;
202
- return Task . CompletedTask ;
202
+ // return Task.CompletedTask;
203
203
} , Materializer ) ;
204
204
}
205
205
@@ -275,12 +275,12 @@ await this.AssertAllStagesStoppedAsync(async() => {
275
275
[ Fact ]
276
276
public async Task SplitWhen_must_support_cancelling_the_master_stream ( )
277
277
{
278
- await this . AssertAllStagesStoppedAsync ( ( ) =>
278
+ await this . AssertAllStagesStoppedAsync ( async ( ) =>
279
279
{
280
- WithSubstreamsSupport ( 5 , 8 ,
280
+ await WithSubstreamsSupportAsync ( 5 , 8 ,
281
281
run : async ( masterSubscriber , masterSubscription , getSubFlow ) =>
282
282
{
283
- var s1 = new StreamPuppet ( getSubFlow ( )
283
+ var s1 = new StreamPuppet ( ( await getSubFlow ( ) )
284
284
. RunWith ( Sink . AsPublisher < int > ( false ) , Materializer ) , this ) ;
285
285
masterSubscription . Cancel ( ) ;
286
286
@@ -292,7 +292,7 @@ await this.AssertAllStagesStoppedAsync(() =>
292
292
s1 . Request ( 1 ) ;
293
293
await s1 . ExpectCompleteAsync ( ) ;
294
294
} ) ;
295
- return Task . CompletedTask ;
295
+ // return Task.CompletedTask;
296
296
} , Materializer ) ;
297
297
}
298
298
@@ -435,18 +435,18 @@ await this.AssertAllStagesStoppedAsync(async() => {
435
435
[ Fact ]
436
436
public async Task SplitWhen_must_support_eager_cancellation_of_master_stream_on_cancelling_substreams ( )
437
437
{
438
- await this . AssertAllStagesStoppedAsync ( ( ) => {
439
- WithSubstreamsSupport ( 5 , 8 , SubstreamCancelStrategy . Propagate ,
438
+ await this . AssertAllStagesStoppedAsync ( async ( ) => {
439
+ await WithSubstreamsSupportAsync ( 5 , 8 , SubstreamCancelStrategy . Propagate ,
440
440
async ( masterSubscriber , masterSubscription , expectSubFlow ) =>
441
441
{
442
- var s1 = new StreamPuppet ( expectSubFlow ( )
442
+ var s1 = new StreamPuppet ( ( await expectSubFlow ( ) )
443
443
. RunWith ( Sink . AsPublisher < int > ( false ) , Materializer ) ,
444
444
this ) ;
445
445
s1 . Cancel ( ) ;
446
446
await masterSubscriber . ExpectCompleteAsync ( ) ;
447
447
448
448
} ) ;
449
- return Task . CompletedTask ;
449
+ // return Task.CompletedTask;
450
450
} , Materializer ) ;
451
451
}
452
452
}
0 commit comments