Skip to content

Commit f4a29d3

Browse files
committed
fix: SplitWhen_must_work_in_the_happy_case
1 parent c71aeed commit f4a29d3

File tree

1 file changed

+93
-80
lines changed

1 file changed

+93
-80
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowSplitWhenSpec.cs

+93-80
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,21 @@ public StreamPuppet(IPublisher<int> p, TestKitBase kit)
5454

5555
public async Task ExpectNextAsync(int element) => await _probe.ExpectNextAsync(element);
5656

57+
public void ExpectNext(int element) => _probe.ExpectNext(element);
58+
5759
public async Task ExpectNoMsgAsync(TimeSpan max) => await _probe.ExpectNoMsgAsync(max);
60+
public void ExpectNoMsg(TimeSpan max) => _probe.ExpectNoMsg(max);
5861

5962
public async Task ExpectCompleteAsync() => await _probe.ExpectCompleteAsync();
6063

64+
public void ExpectComplete() => _probe.ExpectComplete();
65+
6166
public void ExpectError(Exception ex) => _probe.ExpectError().Should().Be(ex);
6267

6368
public void Cancel() => _subscription.Cancel();
6469
}
6570

66-
private async Task WithSubstreamsSupportAsync(int splitWhen = 3, int elementCount = 6,
71+
private void WithSubstreamsSupport(int splitWhen = 3, int elementCount = 6,
6772
SubstreamCancelStrategy substreamCancelStrategy = SubstreamCancelStrategy.Drain,
6873
Action<TestSubscriber.ManualProbe<Source<int, NotUsed>>, ISubscription, Func<Source<int, NotUsed>>> run = null)
6974
{
@@ -75,7 +80,7 @@ private async Task WithSubstreamsSupportAsync(int splitWhen = 3, int elementCoun
7580
.RunWith(Sink.AsPublisher<Source<int, NotUsed>>(false), Materializer);
7681
var masterSubscriber = TestSubscriber.CreateManualSubscriberProbe<Source<int, NotUsed>>(this);
7782
groupStream.Subscribe(masterSubscriber);
78-
var masterSubscription = await masterSubscriber.ExpectSubscriptionAsync();
83+
var masterSubscription = masterSubscriber.ExpectSubscription();
7984

8085
run?.Invoke(masterSubscriber, masterSubscription, () =>
8186
{
@@ -87,36 +92,38 @@ private async Task WithSubstreamsSupportAsync(int splitWhen = 3, int elementCoun
8792
[Fact]
8893
public async Task SplitWhen_must_work_in_the_happy_case()
8994
{
90-
await this.AssertAllStagesStoppedAsync(async() => {
91-
await WithSubstreamsSupportAsync(elementCount: 4,
92-
run: async (masterSubscriber, masterSubscription, getSubFlow) =>
93-
{
95+
await this.AssertAllStagesStoppedAsync(() =>
96+
{
97+
WithSubstreamsSupport(elementCount: 4,
98+
run: (masterSubscriber, masterSubscription, getSubFlow) =>
99+
{
94100
var s1 = new StreamPuppet(getSubFlow()
95-
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
96-
await masterSubscriber.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
97-
98-
s1.Request(2);
99-
await s1.ExpectNextAsync(1);
100-
await s1.ExpectNextAsync(2);
101-
s1.Request(1);
102-
await s1.ExpectCompleteAsync();
103-
101+
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
102+
masterSubscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
103+
104+
s1.Request(2);
105+
s1.ExpectNext(1);
106+
s1.ExpectNext(2);
107+
s1.Request(1);
108+
s1.ExpectComplete();
109+
104110
var s2 = new StreamPuppet(getSubFlow()
105-
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
106-
await masterSubscriber.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
107-
108-
s2.Request(1);
109-
await s2.ExpectNextAsync(3);
110-
await s2.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
111-
112-
s2.Request(1);
113-
await s2.ExpectNextAsync(4);
114-
s2.Request(1);
115-
await s2.ExpectCompleteAsync();
116-
117-
masterSubscription.Request(1);
118-
await masterSubscriber.ExpectCompleteAsync();
111+
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
112+
masterSubscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
113+
114+
s2.Request(1);
115+
s2.ExpectNext(3);
116+
s2.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
117+
118+
s2.Request(1);
119+
s2.ExpectNext(4);
120+
s2.Request(1);
121+
s2.ExpectComplete();
122+
123+
masterSubscription.Request(1);
124+
masterSubscriber.ExpectComplete();
119125
});
126+
return Task.CompletedTask;
120127
}, Materializer);
121128
}
122129

@@ -141,50 +148,53 @@ await this.AssertAllStagesStoppedAsync(() => {
141148
[Fact]
142149
public async Task SplitWhen_must_work_when_first_element_is_split_by()
143150
{
144-
await this.AssertAllStagesStoppedAsync(async() => {
145-
await WithSubstreamsSupportAsync(1, 3,
146-
run: async (masterSubscriber, masterSubscription, getSubFlow) =>
147-
{
151+
await this.AssertAllStagesStoppedAsync(() =>
152+
{
153+
WithSubstreamsSupport(1, 3,
154+
run: async (masterSubscriber, masterSubscription, getSubFlow) =>
155+
{
148156
var s1 = new StreamPuppet(getSubFlow()
149157
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
150-
151-
s1.Request(5);
152-
await s1.ExpectNextAsync(1);
153-
await s1.ExpectNextAsync(2);
154-
await s1.ExpectNextAsync(3);
158+
159+
s1.Request(5);
160+
await s1.ExpectNextAsync(1);
161+
await s1.ExpectNextAsync(2);
162+
await s1.ExpectNextAsync(3);
155163
await s1.ExpectCompleteAsync();
156-
157-
masterSubscription.Request(1);
158-
await masterSubscriber.ExpectCompleteAsync();
164+
165+
masterSubscription.Request(1);
166+
await masterSubscriber.ExpectCompleteAsync();
159167
});
168+
return Task.CompletedTask;
160169
}, Materializer);
161170
}
162171

163172
[Fact]
164173
public async Task SplitWhen_must_support_cancelling_substreams()
165174
{
166-
await this.AssertAllStagesStoppedAsync(async() => {
167-
await WithSubstreamsSupportAsync(5, 8,
168-
run: async(masterSubscriber, masterSubscription, getSubFlow) =>
169-
{
175+
await this.AssertAllStagesStoppedAsync(() =>
176+
{
177+
WithSubstreamsSupport(5, 8,
178+
run: async (masterSubscriber, masterSubscription, getSubFlow) =>
179+
{
170180
var s1 = new StreamPuppet(getSubFlow()
171-
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
172-
s1.Cancel();
181+
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
182+
s1.Cancel();
173183
var s2 = new StreamPuppet(getSubFlow()
174184
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
175-
176-
s2.Request(4);
177-
await s2.ExpectNextAsync(5);
178-
await s2.ExpectNextAsync(6);
179-
await s2.ExpectNextAsync(7);
180-
await s2.ExpectNextAsync(8);
181-
s2.Request(1);
185+
186+
s2.Request(4);
187+
await s2.ExpectNextAsync(5);
188+
await s2.ExpectNextAsync(6);
189+
await s2.ExpectNextAsync(7);
190+
await s2.ExpectNextAsync(8);
191+
s2.Request(1);
182192
await s2.ExpectCompleteAsync();
183-
184-
masterSubscription.Request(1);
185-
await masterSubscriber.ExpectCompleteAsync();
193+
194+
masterSubscription.Request(1);
195+
await masterSubscriber.ExpectCompleteAsync();
186196
});
187-
return Task.CompletedTask;
197+
return Task.FromResult(Task.CompletedTask);
188198
}, Materializer);
189199
}
190200

@@ -260,22 +270,24 @@ await this.AssertAllStagesStoppedAsync(async() => {
260270
[Fact]
261271
public async Task SplitWhen_must_support_cancelling_the_master_stream()
262272
{
263-
await this.AssertAllStagesStoppedAsync(async() => {
264-
await WithSubstreamsSupportAsync(5, 8,
265-
run: async(masterSubscriber, masterSubscription, getSubFlow) =>
266-
{
273+
await this.AssertAllStagesStoppedAsync(() =>
274+
{
275+
WithSubstreamsSupport(5, 8,
276+
run: async (masterSubscriber, masterSubscription, getSubFlow) =>
277+
{
267278
var s1 = new StreamPuppet(getSubFlow()
268-
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
279+
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
269280
masterSubscription.Cancel();
270-
271-
s1.Request(4);
272-
await s1.ExpectNextAsync(1);
273-
await s1.ExpectNextAsync(2);
274-
await s1.ExpectNextAsync(3);
275-
await s1.ExpectNextAsync(4);
276-
s1.Request(1);
277-
await s1.ExpectCompleteAsync();
281+
282+
s1.Request(4);
283+
await s1.ExpectNextAsync(1);
284+
await s1.ExpectNextAsync(2);
285+
await s1.ExpectNextAsync(3);
286+
await s1.ExpectNextAsync(4);
287+
s1.Request(1);
288+
await s1.ExpectCompleteAsync();
278289
});
290+
return Task.CompletedTask;
279291
}, Materializer);
280292
}
281293

@@ -418,18 +430,19 @@ await this.AssertAllStagesStoppedAsync(async() => {
418430
[Fact]
419431
public async Task SplitWhen_must_support_eager_cancellation_of_master_stream_on_cancelling_substreams()
420432
{
421-
await this.AssertAllStagesStoppedAsync(async() => {
422-
await WithSubstreamsSupportAsync(5, 8, SubstreamCancelStrategy.Propagate,
423-
async(masterSubscriber, masterSubscription, expectSubFlow) =>
424-
{
433+
await this.AssertAllStagesStoppedAsync(() =>
434+
{
435+
WithSubstreamsSupport(5, 8, SubstreamCancelStrategy.Propagate,
436+
async (masterSubscriber, masterSubscription, expectSubFlow) =>
437+
{
425438
var s1 = new StreamPuppet(expectSubFlow()
426-
.RunWith(Sink.AsPublisher<int>(false), Materializer),
427-
this);
428-
s1.Cancel();
439+
.RunWith(Sink.AsPublisher<int>(false), Materializer),
440+
this);
441+
s1.Cancel();
429442
await masterSubscriber.ExpectCompleteAsync();
430-
443+
431444
});
432-
return Task.CompletedTask;
445+
return Task.FromResult(Task.CompletedTask);
433446
}, Materializer);
434447
}
435448
}

0 commit comments

Comments
 (0)