6
6
//-----------------------------------------------------------------------
7
7
8
8
using System ;
9
+ using System . Threading . Tasks ;
9
10
using Akka . Streams . Dsl ;
10
11
using Akka . Streams . TestKit ;
11
12
using FluentAssertions ;
@@ -25,23 +26,22 @@ public HeadSinkSpec(ITestOutputHelper helper):base(helper)
25
26
}
26
27
27
28
[ Fact ]
28
- public void A_FLow_with_a_Sink_Head_must_yield_the_first_value ( )
29
+ public async Task A_FLow_with_a_Sink_Head_must_yield_the_first_value ( )
29
30
{
30
- this . AssertAllStagesStopped ( ( ) =>
31
- {
31
+ await this . AssertAllStagesStoppedAsync ( async ( ) => {
32
32
var p = this . CreateManualPublisherProbe < int > ( ) ;
33
- var task = Source . FromPublisher ( p ) . Select ( x=> x ) . RunWith ( Sink . First < int > ( ) , Materializer ) ;
34
- var proc = p . ExpectSubscription ( ) ;
35
- proc . ExpectRequest ( ) ;
33
+ var task = Source . FromPublisher ( p ) . Select ( x => x ) . RunWith ( Sink . First < int > ( ) , Materializer ) ;
34
+ var proc = await p . ExpectSubscriptionAsync ( ) ;
35
+ await proc . ExpectRequestAsync ( ) ;
36
36
proc . SendNext ( 42 ) ;
37
37
task . Wait ( 100 ) ;
38
38
task . Result . Should ( ) . Be ( 42 ) ;
39
- proc . ExpectCancellation ( ) ;
39
+ await proc . ExpectCancellationAsync ( ) ;
40
40
} , Materializer ) ;
41
41
}
42
42
43
43
[ Fact ]
44
- public void A_FLow_with_a_Sink_Head_must_yield_the_first_value_when_actively_constructing ( )
44
+ public async Task A_FLow_with_a_Sink_Head_must_yield_the_first_value_when_actively_constructing ( )
45
45
{
46
46
var p = this . CreateManualPublisherProbe < int > ( ) ;
47
47
var f = Sink . First < int > ( ) ;
@@ -51,79 +51,78 @@ public void A_FLow_with_a_Sink_Head_must_yield_the_first_value_when_actively_con
51
51
var future = t . Item2 ;
52
52
53
53
p . Subscribe ( subscriber ) ;
54
- var proc = p . ExpectSubscription ( ) ;
55
- proc . ExpectRequest ( ) ;
54
+ var proc = await p . ExpectSubscriptionAsync ( ) ;
55
+ await proc . ExpectRequestAsync ( ) ;
56
56
proc . SendNext ( 42 ) ;
57
57
future . Wait ( 100 ) ;
58
58
future . Result . Should ( ) . Be ( 42 ) ;
59
- proc . ExpectCancellation ( ) ;
59
+ await proc . ExpectCancellationAsync ( ) ;
60
60
}
61
61
62
62
[ Fact ]
63
- public void A_FLow_with_a_Sink_Head_must_yield_the_first_error ( )
63
+ public async Task A_FLow_with_a_Sink_Head_must_yield_the_first_error ( )
64
64
{
65
- this . AssertAllStagesStopped ( ( ) =>
66
- {
67
- Source . Failed < int > ( new Exception ( "ex" ) )
68
- . Invoking ( s => s . RunWith ( Sink . First < int > ( ) , Materializer ) . Wait ( TimeSpan . FromSeconds ( 1 ) ) )
69
- . Should ( ) . Throw < AggregateException > ( )
70
- . WithInnerException < Exception > ( )
71
- . WithMessage ( "ex" ) ;
65
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
66
+ Source . Failed < int > ( new Exception ( "ex" ) )
67
+ . Invoking ( s => s . RunWith ( Sink . First < int > ( ) , Materializer ) . Wait ( TimeSpan . FromSeconds ( 1 ) ) )
68
+ . Should ( ) . Throw < AggregateException > ( )
69
+ . WithInnerException < Exception > ( )
70
+ . WithMessage ( "ex" ) ;
71
+ return Task . CompletedTask ;
72
72
} , Materializer ) ;
73
73
}
74
74
75
75
[ Fact ]
76
- public void A_FLow_with_a_Sink_Head_must_yield_NoSuchElementException_for_empty_stream ( )
76
+ public async Task A_FLow_with_a_Sink_Head_must_yield_NoSuchElementException_for_empty_stream ( )
77
77
{
78
- this . AssertAllStagesStopped ( ( ) =>
79
- {
80
- Source . Empty < int > ( )
81
- . Invoking ( s => s . RunWith ( Sink . First < int > ( ) , Materializer ) . Wait ( TimeSpan . FromSeconds ( 1 ) ) )
82
- . Should ( ) . Throw < AggregateException > ( )
83
- . WithInnerException < NoSuchElementException > ( )
84
- . WithMessage ( "First of empty stream" ) ;
78
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
79
+ Source . Empty < int > ( )
80
+ . Invoking ( s => s . RunWith ( Sink . First < int > ( ) , Materializer ) . Wait ( TimeSpan . FromSeconds ( 1 ) ) )
81
+ . Should ( ) . Throw < AggregateException > ( )
82
+ . WithInnerException < NoSuchElementException > ( )
83
+ . WithMessage ( "First of empty stream" ) ;
84
+ return Task . CompletedTask ;
85
85
} , Materializer ) ;
86
86
}
87
87
88
88
89
89
90
90
[ Fact ]
91
- public void A_FLow_with_a_Sink_HeadOption_must_yield_the_first_value ( )
91
+ public async Task A_FLow_with_a_Sink_HeadOption_must_yield_the_first_value ( )
92
92
{
93
- this . AssertAllStagesStopped ( ( ) =>
94
- {
93
+ await this . AssertAllStagesStoppedAsync ( async ( ) => {
95
94
var p = this . CreateManualPublisherProbe < int > ( ) ;
96
95
var task = Source . FromPublisher ( p ) . Select ( x => x ) . RunWith ( Sink . FirstOrDefault < int > ( ) , Materializer ) ;
97
- var proc = p . ExpectSubscription ( ) ;
98
- proc . ExpectRequest ( ) ;
96
+ var proc = await p . ExpectSubscriptionAsync ( ) ;
97
+ await proc . ExpectRequestAsync ( ) ;
99
98
proc . SendNext ( 42 ) ;
100
99
task . Wait ( 100 ) ;
101
100
task . Result . Should ( ) . Be ( 42 ) ;
102
- proc . ExpectCancellation ( ) ;
101
+ await proc . ExpectCancellationAsync ( ) ;
103
102
} , Materializer ) ;
104
103
}
105
104
106
105
[ Fact ]
107
- public void A_FLow_with_a_Sink_HeadOption_must_yield_the_first_error ( )
106
+ public async Task A_FLow_with_a_Sink_HeadOption_must_yield_the_first_error ( )
108
107
{
109
- this . AssertAllStagesStopped ( ( ) =>
110
- {
111
- Source . Failed < int > ( new Exception ( "ex" ) )
112
- . Invoking ( s => s . RunWith ( Sink . FirstOrDefault < int > ( ) , Materializer ) . Wait ( TimeSpan . FromSeconds ( 1 ) ) )
113
- . Should ( ) . Throw < AggregateException > ( )
114
- . WithInnerException < Exception > ( )
115
- . WithMessage ( "ex" ) ;
108
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
109
+ Source . Failed < int > ( new Exception ( "ex" ) )
110
+ . Invoking ( s => s . RunWith ( Sink . FirstOrDefault < int > ( ) , Materializer ) . Wait ( TimeSpan . FromSeconds ( 1 ) ) )
111
+ . Should ( ) . Throw < AggregateException > ( )
112
+ . WithInnerException < Exception > ( )
113
+ . WithMessage ( "ex" ) ;
114
+ return Task . CompletedTask ;
116
115
} , Materializer ) ;
117
116
}
118
117
119
118
[ Fact ]
120
- public void A_FLow_with_a_Sink_HeadOption_must_yield_default_for_empty_stream ( )
119
+ public async Task A_FLow_with_a_Sink_HeadOption_must_yield_default_for_empty_stream ( )
121
120
{
122
- this . AssertAllStagesStopped ( ( ) =>
123
- {
121
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
124
122
var task = Source . Empty < int > ( ) . RunWith ( Sink . FirstOrDefault < int > ( ) , Materializer ) ;
125
123
task . Wait ( TimeSpan . FromSeconds ( 1 ) ) . Should ( ) . BeTrue ( ) ;
126
124
task . Result . Should ( ) . Be ( 0 ) ;
125
+ return Task . CompletedTask ;
127
126
} , Materializer ) ;
128
127
}
129
128
0 commit comments