Skip to content

Commit addb9fb

Browse files
authored
[17-74]FlowIteratorSpec (#6561)
* [17-74]`FlowIteratorSpec` * Changes to `async` TestKit
1 parent c083fcd commit addb9fb

File tree

1 file changed

+70
-76
lines changed

1 file changed

+70
-76
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowIteratorSpec.cs

+70-76
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using System.Collections;
1010
using System.Collections.Generic;
1111
using System.Linq;
12+
using System.Threading.Tasks;
1213
using Akka.Pattern;
1314
using Akka.Streams.Dsl;
1415
using Akka.Streams.TestKit;
@@ -40,7 +41,7 @@ protected override Source<int, NotUsed> CreateSource(int elements)
4041
=> Source.From(Enumerable.Range(1, elements));
4142

4243
[Fact]
43-
public void A_Flow_based_on_an_iterable_must_produce_OnError_when_iterator_throws()
44+
public async Task A_Flow_based_on_an_iterable_must_produce_OnError_when_iterator_throws()
4445
{
4546
var iterable = Enumerable.Range(1, 3).Select(x =>
4647
{
@@ -51,37 +52,37 @@ public void A_Flow_based_on_an_iterable_must_produce_OnError_when_iterator_throw
5152
var p = Source.From(iterable).RunWith(Sink.AsPublisher<int>(false), Materializer);
5253
var c = this.CreateManualSubscriberProbe<int>();
5354
p.Subscribe(c);
54-
var sub = c.ExpectSubscription();
55+
var sub = await c.ExpectSubscriptionAsync();
5556
sub.Request(1);
56-
c.ExpectNext(1);
57-
c.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
57+
await c.ExpectNextAsync(1);
58+
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
5859
EventFilter.Exception<AggregateException>()
5960
.And.Exception<IllegalStateException>("not two").ExpectOne(() => sub.Request(2));
6061
var error = c.ExpectError().InnerException;
6162
error.Message.Should().Be("not two");
6263
sub.Request(2);
63-
c.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
64+
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
6465
}
6566

6667
[Fact]
67-
public void A_Flow_based_on_an_iterable_must_produce_OnError_when_Source_construction_throws()
68+
public async Task A_Flow_based_on_an_iterable_must_produce_OnError_when_Source_construction_throws()
6869
{
6970
var p = Source.From(new ThrowEnumerable()).RunWith(Sink.AsPublisher<int>(false), Materializer);
7071
var c = this.CreateManualSubscriberProbe<int>();
7172
p.Subscribe(c);
7273
c.ExpectSubscriptionAndError().Message.Should().Be("no good iterator");
73-
c.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
74+
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
7475
}
7576

7677
[Fact]
77-
public void A_Flow_based_on_an_iterable_must_produce_OnError_when_MoveNext_throws()
78+
public async Task A_Flow_based_on_an_iterable_must_produce_OnError_when_MoveNext_throws()
7879
{
7980
var p = Source.From(new ThrowEnumerable(false)).RunWith(Sink.AsPublisher<int>(false), Materializer);
8081
var c = this.CreateManualSubscriberProbe<int>();
8182
p.Subscribe(c);
8283
var error = c.ExpectSubscriptionAndError().InnerException;
8384
error.Message.Should().Be("no next");
84-
c.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
85+
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
8586
}
8687

8788
private sealed class ThrowEnumerable : IEnumerable<int>
@@ -145,158 +146,151 @@ protected AbstractFlowIteratorSpec(ITestOutputHelper helper) : base(helper)
145146
protected abstract Source<int, NotUsed> CreateSource(int elements);
146147

147148
[Fact]
148-
public void A_Flow_based_on_an_iterable_must_produce_elements()
149+
public async Task A_Flow_based_on_an_iterable_must_produce_elements()
149150
{
150-
this.AssertAllStagesStopped(() =>
151-
{
151+
await this.AssertAllStagesStoppedAsync(async() => {
152152
var p = CreateSource(3).RunWith(Sink.AsPublisher<int>(false), Materializer);
153153
var c = this.CreateManualSubscriberProbe<int>();
154-
154+
155155
p.Subscribe(c);
156-
var sub = c.ExpectSubscription();
156+
var sub = await c.ExpectSubscriptionAsync();
157157

158158
sub.Request(1);
159-
c.ExpectNext(1);
160-
c.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
159+
await c.ExpectNextAsync(1);
160+
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
161161
sub.Request(3);
162-
c.ExpectNext(2)
162+
await c.ExpectNext(2)
163163
.ExpectNext(3)
164-
.ExpectComplete();
164+
.ExpectCompleteAsync();
165165
}, Materializer);
166166
}
167167

168168
[Fact]
169-
public void A_Flow_based_on_an_iterable_must_complete_empty()
169+
public async Task A_Flow_based_on_an_iterable_must_complete_empty()
170170
{
171-
this.AssertAllStagesStopped(() =>
172-
{
171+
await this.AssertAllStagesStoppedAsync(async() => {
173172
var p = CreateSource(0).RunWith(Sink.AsPublisher<int>(false), Materializer);
174173
var c = this.CreateManualSubscriberProbe<int>();
175174

176175
p.Subscribe(c);
177-
c.ExpectSubscriptionAndComplete();
178-
c.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
176+
await c.ExpectSubscriptionAndCompleteAsync();
177+
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
179178
}, Materializer);
180179
}
181180

182181
[Fact]
183-
public void A_Flow_based_on_an_iterable_must_produce_elements_with_multiple_subscribers()
182+
public async Task A_Flow_based_on_an_iterable_must_produce_elements_with_multiple_subscribers()
184183
{
185-
this.AssertAllStagesStopped(() =>
186-
{
184+
await this.AssertAllStagesStoppedAsync(async() => {
187185
var p = CreateSource(3).RunWith(Sink.AsPublisher<int>(true), Materializer);
188186
var c1 = this.CreateManualSubscriberProbe<int>();
189187
var c2 = this.CreateManualSubscriberProbe<int>();
190-
188+
191189
p.Subscribe(c1);
192190
p.Subscribe(c2);
193-
var sub1 = c1.ExpectSubscription();
194-
var sub2 = c2.ExpectSubscription();
191+
var sub1 = await c1.ExpectSubscriptionAsync();
192+
var sub2 = await c2.ExpectSubscriptionAsync();
195193
sub1.Request(1);
196194
sub2.Request(2);
197-
c1.ExpectNext(1);
198-
c2.ExpectNext(1);
199-
c2.ExpectNext(2);
200-
c1.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
201-
c2.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
195+
await c1.ExpectNextAsync(1);
196+
await c2.ExpectNextAsync(1);
197+
await c2.ExpectNextAsync(2);
198+
await c1.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
199+
await c2.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
202200
sub1.Request(2);
203201
sub2.Request(2);
204-
c1.ExpectNext(2);
205-
c1.ExpectNext(3);
206-
c2.ExpectNext(3);
207-
c1.ExpectComplete();
208-
c2.ExpectComplete();
202+
await c1.ExpectNextAsync(2);
203+
await c1.ExpectNextAsync(3);
204+
await c2.ExpectNextAsync(3);
205+
await c1.ExpectCompleteAsync();
206+
await c2.ExpectCompleteAsync();
209207
}, Materializer);
210208
}
211209

212210
[Fact]
213-
public void A_Flow_based_on_an_iterable_must_produce_elements_to_later_subscriber()
211+
public async Task A_Flow_based_on_an_iterable_must_produce_elements_to_later_subscriber()
214212
{
215-
this.AssertAllStagesStopped(() =>
216-
{
213+
await this.AssertAllStagesStoppedAsync(async() => {
217214
var p = CreateSource(3).RunWith(Sink.AsPublisher<int>(true), Materializer);
218215
var c1 = this.CreateManualSubscriberProbe<int>();
219216
var c2 = this.CreateManualSubscriberProbe<int>();
220-
217+
221218
p.Subscribe(c1);
222-
var sub1 = c1.ExpectSubscription();
219+
var sub1 = await c1.ExpectSubscriptionAsync();
223220
sub1.Request(1);
224-
c1.ExpectNext(1, TimeSpan.FromSeconds(60));
225-
c1.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
221+
await c1.ExpectNextAsync(1, TimeSpan.FromSeconds(60));
222+
await c1.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
226223

227224
p.Subscribe(c2);
228-
var sub2 = c2.ExpectSubscription();
225+
var sub2 = await c2.ExpectSubscriptionAsync();
229226
sub2.Request(3);
230227
//element 1 is already gone
231-
c2.ExpectNext(2)
228+
await c2.ExpectNext(2)
232229
.ExpectNext(3)
233-
.ExpectComplete();
230+
.ExpectCompleteAsync();
234231

235232
sub1.Request(3);
236-
c1.ExpectNext(2)
233+
await c1.ExpectNext(2)
237234
.ExpectNext(3)
238-
.ExpectComplete();
235+
.ExpectCompleteAsync();
239236
}, Materializer);
240237
}
241238

242239
[Fact]
243-
public void A_Flow_based_on_an_iterable_must_produce_elements_with_one_transformation_step()
240+
public async Task A_Flow_based_on_an_iterable_must_produce_elements_with_one_transformation_step()
244241
{
245-
this.AssertAllStagesStopped(() =>
246-
{
247-
var p = CreateSource(3)
248-
.Select(x => x*2)
249-
.RunWith(Sink.AsPublisher<int>(false), Materializer);
242+
await this.AssertAllStagesStoppedAsync(async () => {
243+
var p = CreateSource(3)
244+
.Select(x => x * 2)
245+
.RunWith(Sink.AsPublisher<int>(false), Materializer);
250246
var c = this.CreateManualSubscriberProbe<int>();
251247

252248
p.Subscribe(c);
253-
var sub = c.ExpectSubscription();
249+
var sub = await c.ExpectSubscriptionAsync();
254250

255251
sub.Request(10);
256-
c.ExpectNext(2)
252+
await c.ExpectNext(2)
257253
.ExpectNext(4)
258254
.ExpectNext(6)
259-
.ExpectComplete();
255+
.ExpectCompleteAsync();
260256
}, Materializer);
261257
}
262258

263259
[Fact]
264-
public void A_Flow_based_on_an_iterable_must_produce_elements_with_two_transformation_steps()
260+
public async Task A_Flow_based_on_an_iterable_must_produce_elements_with_two_transformation_steps()
265261
{
266-
this.AssertAllStagesStopped(() =>
267-
{
268-
var p = CreateSource(4)
269-
.Where(x => x%2 == 0)
270-
.Select(x => x*2)
271-
.RunWith(Sink.AsPublisher<int>(false), Materializer);
262+
await this.AssertAllStagesStoppedAsync(async () => {
263+
var p = CreateSource(4)
264+
.Where(x => x % 2 == 0)
265+
.Select(x => x * 2)
266+
.RunWith(Sink.AsPublisher<int>(false), Materializer);
272267
var c = this.CreateManualSubscriberProbe<int>();
273268

274269
p.Subscribe(c);
275-
var sub = c.ExpectSubscription();
270+
var sub = await c.ExpectSubscriptionAsync();
276271

277272
sub.Request(10);
278-
c.ExpectNext(4)
273+
await c.ExpectNext(4)
279274
.ExpectNext(8)
280-
.ExpectComplete();
275+
.ExpectCompleteAsync();
281276
}, Materializer);
282277
}
283278

284279
[Fact]
285-
public void A_Flow_based_on_an_iterable_must_not_produce_after_cancel()
280+
public async Task A_Flow_based_on_an_iterable_must_not_produce_after_cancel()
286281
{
287-
this.AssertAllStagesStopped(() =>
288-
{
282+
await this.AssertAllStagesStoppedAsync(async() => {
289283
var p = CreateSource(3).RunWith(Sink.AsPublisher<int>(false), Materializer);
290284
var c = this.CreateManualSubscriberProbe<int>();
291285

292286
p.Subscribe(c);
293-
var sub = c.ExpectSubscription();
287+
var sub = await c.ExpectSubscriptionAsync();
294288

295289
sub.Request(1);
296-
c.ExpectNext(1);
290+
await c.ExpectNextAsync(1);
297291
sub.Cancel();
298292
sub.Request(2);
299-
c.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
293+
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
300294
}, Materializer);
301295
}
302296
}

0 commit comments

Comments
 (0)