Skip to content

Commit 2dd7d26

Browse files
authored
[31-74] FlowSplitAfterSpec (#6578)
* [31-74] `FlowSplitAfterSpec` * Changes to `async` TestKit * change `await` * Changes * Add `return Task.CompletedTask;` * Revert `"https://github.com/akkadotnet/akka.net/pull/6578/commits/b4a4006e1364b8bc50ab36772e78e765359288fd"`
1 parent f13062e commit 2dd7d26

File tree

1 file changed

+97
-95
lines changed

1 file changed

+97
-95
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowSplitAfterSpec.cs

+97-95
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using System.Collections.Immutable;
1212
using System.Linq;
1313
using System.Threading;
14+
using System.Threading.Tasks;
1415
using Akka.Streams.Dsl;
1516
using Akka.Streams.Implementation;
1617
using Akka.Streams.TestKit;
@@ -86,112 +87,115 @@ private void WithSubstreamsSupport(int splitAfter = 3, int elementCount = 6,
8687
}
8788

8889
[Fact]
89-
public void SplitAfter_must_work_in_the_happy_case()
90+
public async Task SplitAfter_must_work_in_the_happy_case()
9091
{
91-
this.AssertAllStagesStopped(() =>
92-
{
93-
WithSubstreamsSupport(3,5,run: (masterSubscriber, masterSubscription, expectSubFlow) =>
94-
{
95-
var s1 = new StreamPuppet(expectSubFlow().RunWith(Sink.AsPublisher<int>(false), Materializer), this);
96-
masterSubscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
97-
98-
s1.Request(2);
99-
s1.ExpectNext(1);
100-
s1.ExpectNext(2);
101-
s1.Request(1);
102-
s1.ExpectNext(3);
103-
s1.Request(1);
104-
s1.ExpectComplete();
105-
106-
var s2 = new StreamPuppet(expectSubFlow().RunWith(Sink.AsPublisher<int>(false), Materializer), this);
107-
s2.Request(2);
108-
s2.ExpectNext(4);
109-
s2.ExpectNext(5);
110-
s2.ExpectComplete();
111-
112-
masterSubscription.Request(1);
113-
masterSubscriber.ExpectComplete();
114-
});
92+
await this.AssertAllStagesStoppedAsync(() => {
93+
WithSubstreamsSupport(3, 5,
94+
run: (masterSubscriber, masterSubscription, expectSubFlow) =>
95+
{
96+
var s1 = new StreamPuppet(expectSubFlow()
97+
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
98+
masterSubscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
99+
100+
s1.Request(2);
101+
s1.ExpectNext(1);
102+
s1.ExpectNext(2);
103+
s1.Request(1);
104+
s1.ExpectNext(3);
105+
s1.Request(1);
106+
s1.ExpectComplete();
107+
108+
var s2 = new StreamPuppet(expectSubFlow()
109+
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
110+
s2.Request(2);
111+
s2.ExpectNext(4);
112+
s2.ExpectNext(5);
113+
s2.ExpectComplete();
114+
115+
masterSubscription.Request(1);
116+
masterSubscriber.ExpectComplete();
117+
});
118+
return Task.CompletedTask;
115119
}, Materializer);
116120
}
117121

118122
[Fact]
119-
public void SplitAfter_must_work_when_first_element_is_split_by()
123+
public async Task SplitAfter_must_work_when_first_element_is_split_by()
120124
{
121-
this.AssertAllStagesStopped(() =>
122-
{
123-
WithSubstreamsSupport(1, 3, run: (masterSubscriber, masterSubscription, expectSubFlow) =>
124-
{
125-
var s1 = new StreamPuppet(expectSubFlow().RunWith(Sink.AsPublisher<int>(false), Materializer), this);
126-
masterSubscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
127-
128-
s1.Request(3);
129-
s1.ExpectNext(1);
130-
s1.ExpectComplete();
131-
132-
var s2 = new StreamPuppet(expectSubFlow().RunWith(Sink.AsPublisher<int>(false), Materializer), this);
133-
s2.Request(3);
134-
s2.ExpectNext(2);
135-
s2.ExpectNext(3);
136-
s2.ExpectComplete();
137-
138-
masterSubscription.Request(1);
139-
masterSubscriber.ExpectComplete();
140-
});
125+
await this.AssertAllStagesStoppedAsync(() => {
126+
WithSubstreamsSupport(1, 3,
127+
run: (masterSubscriber, masterSubscription, expectSubFlow) =>
128+
{
129+
var s1 = new StreamPuppet(expectSubFlow()
130+
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
131+
masterSubscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
132+
s1.Request(3);
133+
s1.ExpectNext(1);
134+
s1.ExpectComplete();
135+
var s2 = new StreamPuppet(expectSubFlow()
136+
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
137+
s2.Request(3);
138+
s2.ExpectNext(2);
139+
s2.ExpectNext(3);
140+
s2.ExpectComplete();
141+
masterSubscription.Request(1);
142+
masterSubscriber.ExpectComplete();
143+
});
144+
return Task.CompletedTask;
141145
}, Materializer);
142146
}
143147

144148
[Fact]
145-
public void SplitAfter_must_work_with_single_element_splits_by()
149+
public async Task SplitAfter_must_work_with_single_element_splits_by()
146150
{
147-
this.AssertAllStagesStopped(() =>
148-
{
151+
await this.AssertAllStagesStoppedAsync(() => {
149152
var task = Source.From(Enumerable.Range(1, 10))
150-
.SplitAfter(_ => true)
151-
.Lift()
152-
.SelectAsync(1, s => s.RunWith(Sink.First<int>(), Materializer))
153-
.Grouped(10)
154-
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
153+
.SplitAfter(_ => true)
154+
.Lift()
155+
.SelectAsync(1, s => s.RunWith(Sink.First<int>(), Materializer))
156+
.Grouped(10)
157+
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
155158
task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
156159
task.Result.Should().BeEquivalentTo(Enumerable.Range(1, 10));
160+
return Task.CompletedTask;
157161
}, Materializer);
158162
}
159163

160164
[Fact]
161-
public void SplitAfter_must_support_cancelling_substreams()
165+
public async Task SplitAfter_must_support_cancelling_substreams()
162166
{
163-
this.AssertAllStagesStopped(() =>
164-
{
165-
WithSubstreamsSupport(5, 8, run: (masterSubscriber, masterSubscription, expectSubFlow) =>
166-
{
167-
var s1 = new StreamPuppet(expectSubFlow().RunWith(Sink.AsPublisher<int>(false), Materializer), this);
168-
masterSubscription.Cancel();
169-
s1.Request(5);
170-
s1.ExpectNext(1);
171-
s1.ExpectNext(2);
172-
s1.ExpectNext(3);
173-
s1.ExpectNext(4);
174-
s1.ExpectNext(5);
175-
s1.Request(1);
176-
s1.ExpectComplete();
177-
});
167+
await this.AssertAllStagesStoppedAsync(() => {
168+
WithSubstreamsSupport(5, 8,
169+
run: (masterSubscriber, masterSubscription, expectSubFlow) =>
170+
{
171+
var s1 = new StreamPuppet(expectSubFlow().RunWith(Sink.AsPublisher<int>(false), Materializer), this);
172+
masterSubscription.Cancel();
173+
s1.Request(5);
174+
s1.ExpectNext(1);
175+
s1.ExpectNext(2);
176+
s1.ExpectNext(3);
177+
s1.ExpectNext(4);
178+
s1.ExpectNext(5);
179+
s1.Request(1);
180+
s1.ExpectComplete();
181+
});
182+
return Task.CompletedTask;
178183
}, Materializer);
179184
}
180185

181186
[Fact]
182-
public void SplitAfter_must_fail_stream_when_SplitAfter_function_throws()
187+
public async Task SplitAfter_must_fail_stream_when_SplitAfter_function_throws()
183188
{
184-
this.AssertAllStagesStopped(() =>
185-
{
189+
await this.AssertAllStagesStoppedAsync(() => {
186190
var publisherProbe = this.CreateManualPublisherProbe<int>();
187191
var ex = new TestException("test");
188192
var publisher = Source.FromPublisher(publisherProbe).SplitAfter(i =>
189193
{
190194
if (i == 3)
191195
throw ex;
192-
return i%3 == 0;
196+
return i % 3 == 0;
193197
}).Lift().RunWith(Sink.AsPublisher<Source<int, NotUsed>>(false), Materializer);
194-
198+
195199
var subscriber = this.CreateManualSubscriberProbe<Source<int, NotUsed>>();
196200
publisher.Subscribe(subscriber);
197201

@@ -215,29 +219,28 @@ public void SplitAfter_must_fail_stream_when_SplitAfter_function_throws()
215219
subscriber.ExpectError().Should().Be(ex);
216220
substreamPuppet.ExpectError(ex);
217221
upstreamSubscription.ExpectCancellation();
222+
return Task.CompletedTask;
218223
}, Materializer);
219224
}
220225

221226
[Fact(Skip = "Supervision is not supported fully by GraphStages yet")]
222-
public void SplitAfter_must_resume_stream_when_SplitAfter_function_throws()
227+
public async Task SplitAfter_must_resume_stream_when_SplitAfter_function_throws()
223228
{
224-
this.AssertAllStagesStopped(() =>
225-
{
226-
229+
await this.AssertAllStagesStoppedAsync(() => {
230+
return Task.CompletedTask;
227231
}, Materializer);
228232
}
229233

230234
[Fact]
231-
public void SplitAfter_must_pass_along_early_cancellation()
235+
public async Task SplitAfter_must_pass_along_early_cancellation()
232236
{
233-
this.AssertAllStagesStopped(() =>
234-
{
237+
await this.AssertAllStagesStoppedAsync(() => {
235238
var up = this.CreateManualPublisherProbe<int>();
236239
var down = this.CreateManualSubscriberProbe<Source<int, NotUsed>>();
237240

238241
var flowSubscriber =
239242
Source.AsSubscriber<int>()
240-
.SplitAfter(i => i%3 == 0)
243+
.SplitAfter(i => i % 3 == 0)
241244
.Lift()
242245
.To(Sink.FromSubscriber(down))
243246
.Run(Materializer);
@@ -246,49 +249,47 @@ public void SplitAfter_must_pass_along_early_cancellation()
246249
up.Subscribe(flowSubscriber);
247250
var upSub = up.ExpectSubscription();
248251
upSub.ExpectCancellation();
252+
return Task.CompletedTask;
249253
}, Materializer);
250254
}
251255

252256
[Fact]
253-
public void SplitAfter_must_support_eager_cancellation_of_master_stream_on_cancelling_substreams()
257+
public async Task SplitAfter_must_support_eager_cancellation_of_master_stream_on_cancelling_substreams()
254258
{
255-
this.AssertAllStagesStopped(() =>
256-
{
257-
WithSubstreamsSupport(5,8,SubstreamCancelStrategy.Propagate,
259+
await this.AssertAllStagesStoppedAsync(() => {
260+
WithSubstreamsSupport(5, 8, SubstreamCancelStrategy.Propagate,
258261
(masterSubscriber, masterSubscription, expectSubFlow) =>
259262
{
260263
var s1 = new StreamPuppet(expectSubFlow().RunWith(Sink.AsPublisher<int>(false), Materializer),
261264
this);
262265
s1.Cancel();
263266
masterSubscriber.ExpectComplete();
264267
});
268+
return Task.CompletedTask;
265269
}, Materializer);
266270
}
267271

268272
[Fact]
269-
public void SplitAfter_should_work_when_last_element_is_split_by() => this.AssertAllStagesStopped(() =>
270-
{
273+
public async Task SplitAfter_should_work_when_last_element_is_split_by() => await this.AssertAllStagesStoppedAsync(() => {
271274
WithSubstreamsSupport(splitAfter: 3, elementCount: 3,
272275
run: (masterSubscriber, masterSubscription, expectSubFlow) =>
273276
{
274277
var s1 = new StreamPuppet(expectSubFlow()
275278
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
276279
masterSubscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
277-
278280
s1.Request(3);
279281
s1.ExpectNext(1);
280282
s1.ExpectNext(2);
281283
s1.ExpectNext(3);
282284
s1.ExpectComplete();
283-
284285
masterSubscription.Request(1);
285286
masterSubscriber.ExpectComplete();
286287
});
288+
return Task.CompletedTask;
287289
}, Materializer);
288290

289291
[Fact]
290-
public void SplitAfter_should_fail_stream_if_substream_not_materialized_in_time() => this.AssertAllStagesStopped(() =>
291-
{
292+
public async Task SplitAfter_should_fail_stream_if_substream_not_materialized_in_time() => await this.AssertAllStagesStoppedAsync(() => {
292293
var timeout = new StreamSubscriptionTimeoutSettings(StreamSubscriptionTimeoutTerminationMode.CancelTermination, TimeSpan.FromMilliseconds(500));
293294
var settings = ActorMaterializerSettings.Create(Sys).WithSubscriptionTimeoutSettings(timeout);
294295
var tightTimeoutMaterializer = ActorMaterializer.Create(Sys, settings);
@@ -302,13 +303,14 @@ public void SplitAfter_should_fail_stream_if_substream_not_materialized_in_time(
302303
.Wait(TimeSpan.FromSeconds(3));
303304
};
304305
a.Should().Throw<SubscriptionTimeoutException>();
306+
return Task.CompletedTask;
305307
}, Materializer);
306308

307309
// Probably covert by SplitAfter_should_work_when_last_element_is_split_by
308310
// but we received a specific example which we want to cover too,
309311
// see https://github.com/akkadotnet/akka.net/issues/3222
310312
[Fact]
311-
public void SplitAfter_should_not_create_a_subflow_when_no_element_is_left()
313+
public async Task SplitAfter_should_not_create_a_subflow_when_no_element_is_left()
312314
{
313315
var result = new ConcurrentQueue<ImmutableList<(bool, int)>>();
314316
Source.From(new[]
@@ -323,7 +325,7 @@ public void SplitAfter_should_not_create_a_subflow_when_no_element_is_left()
323325
.To(Sink.ForEach<ImmutableList<(bool, int)>>(list => result.Enqueue(list)))
324326
.Run(Materializer);
325327

326-
Thread.Sleep(500);
328+
await Task.Delay(500);
327329
result.All(l => l.Count > 0).Should().BeTrue();
328330
}
329331
}

0 commit comments

Comments
 (0)