7
7
8
8
using System ;
9
9
using System . Linq ;
10
+ using System . Threading . Tasks ;
10
11
using Akka . Actor ;
11
12
using Akka . Configuration ;
12
13
using Akka . Streams . Dsl ;
@@ -77,10 +78,9 @@ public ActorRefBackpressureSinkSpec(ITestOutputHelper output) : base(output, Str
77
78
private IActorRef CreateActor < T > ( ) => Sys . ActorOf ( Props . Create ( typeof ( T ) , TestActor ) . WithDispatcher ( "akka.test.stream-dispatcher" ) ) ;
78
79
79
80
[ Fact ]
80
- public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef ( )
81
+ public async Task ActorBackpressureSink_should_send_the_elements_to_the_ActorRef ( )
81
82
{
82
- this . AssertAllStagesStopped ( ( ) =>
83
- {
83
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
84
84
var fw = CreateActor < Fw > ( ) ;
85
85
Source . From ( Enumerable . Range ( 1 , 3 ) )
86
86
. RunWith ( Sink . ActorRefWithAck < int > ( fw , InitMessage , AckMessage , CompleteMessage ) , Materializer ) ;
@@ -89,14 +89,14 @@ public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef()
89
89
ExpectMsg ( 2 ) ;
90
90
ExpectMsg ( 3 ) ;
91
91
ExpectMsg ( CompleteMessage ) ;
92
+ return Task . CompletedTask ;
92
93
} , Materializer ) ;
93
94
}
94
95
95
96
[ Fact ]
96
- public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef2 ( )
97
+ public async Task ActorBackpressureSink_should_send_the_elements_to_the_ActorRef2 ( )
97
98
{
98
- this . AssertAllStagesStopped ( ( ) =>
99
- {
99
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
100
100
var fw = CreateActor < Fw > ( ) ;
101
101
var probe =
102
102
this . SourceProbe < int > ( )
@@ -111,14 +111,14 @@ public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef2()
111
111
ExpectMsg ( 3 ) ;
112
112
probe . SendComplete ( ) ;
113
113
ExpectMsg ( CompleteMessage ) ;
114
+ return Task . CompletedTask ;
114
115
} , Materializer ) ;
115
116
}
116
117
117
118
[ Fact ]
118
- public void ActorBackpressureSink_should_cancel_stream_when_actor_terminates ( )
119
+ public async Task ActorBackpressureSink_should_cancel_stream_when_actor_terminates ( )
119
120
{
120
- this . AssertAllStagesStopped ( ( ) =>
121
- {
121
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
122
122
var fw = CreateActor < Fw > ( ) ;
123
123
var publisher =
124
124
this . SourceProbe < int > ( )
@@ -129,14 +129,14 @@ public void ActorBackpressureSink_should_cancel_stream_when_actor_terminates()
129
129
ExpectMsg ( 1 ) ;
130
130
Sys . Stop ( fw ) ;
131
131
publisher . ExpectCancellation ( ) ;
132
+ return Task . CompletedTask ;
132
133
} , Materializer ) ;
133
134
}
134
135
135
136
[ Fact ]
136
- public void ActorBackpressureSink_should_send_message_only_when_backpressure_received ( )
137
+ public async Task ActorBackpressureSink_should_send_message_only_when_backpressure_received ( )
137
138
{
138
- this . AssertAllStagesStopped ( ( ) =>
139
- {
139
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
140
140
var fw = CreateActor < Fw2 > ( ) ;
141
141
var publisher = this . SourceProbe < int > ( )
142
142
. To ( Sink . ActorRefWithAck < int > ( fw , InitMessage , AckMessage , CompleteMessage ) )
@@ -156,14 +156,14 @@ public void ActorBackpressureSink_should_send_message_only_when_backpressure_rec
156
156
ExpectMsg ( 3 ) ;
157
157
158
158
ExpectMsg ( CompleteMessage ) ;
159
+ return Task . CompletedTask ;
159
160
} , Materializer ) ;
160
161
}
161
162
162
163
[ Fact ]
163
- public void ActorBackpressureSink_should_keep_on_sending_even_after_the_buffer_has_been_full ( )
164
+ public async Task ActorBackpressureSink_should_keep_on_sending_even_after_the_buffer_has_been_full ( )
164
165
{
165
- this . AssertAllStagesStopped ( ( ) =>
166
- {
166
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
167
167
var bufferSize = 16 ;
168
168
var streamElementCount = bufferSize + 4 ;
169
169
var fw = CreateActor < Fw2 > ( ) ;
@@ -187,14 +187,14 @@ public void ActorBackpressureSink_should_keep_on_sending_even_after_the_buffer_h
187
187
fw . Tell ( TriggerAckMessage . Instance ) ;
188
188
}
189
189
ExpectMsg ( CompleteMessage ) ;
190
+ return Task . CompletedTask ;
190
191
} , Materializer ) ;
191
192
}
192
193
193
194
[ Fact ]
194
- public void ActorBackpressureSink_should_work_with_one_element_buffer ( )
195
+ public async Task ActorBackpressureSink_should_work_with_one_element_buffer ( )
195
196
{
196
- this . AssertAllStagesStopped ( ( ) =>
197
- {
197
+ await this . AssertAllStagesStoppedAsync ( ( ) => {
198
198
var fw = CreateActor < Fw2 > ( ) ;
199
199
var publisher =
200
200
this . SourceProbe < int > ( )
@@ -216,6 +216,7 @@ public void ActorBackpressureSink_should_work_with_one_element_buffer()
216
216
217
217
publisher . SendComplete ( ) ;
218
218
ExpectMsg ( CompleteMessage ) ;
219
+ return Task . CompletedTask ;
219
220
} , Materializer ) ;
220
221
}
221
222
0 commit comments