Skip to content

Commit c5bdb2a

Browse files
committed
Revert "Changes to WithSubstreamsSupportAsync"
This reverts commit 5be01a9.
1 parent 5be01a9 commit c5bdb2a

File tree

1 file changed

+27
-27
lines changed

1 file changed

+27
-27
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowSplitWhenSpec.cs

+27-27
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ public StreamPuppet(IPublisher<int> p, TestKitBase kit)
6363
public void Cancel() => _subscription.Cancel();
6464
}
6565

66-
private async Task WithSubstreamsSupportAsync(int splitWhen = 3, int elementCount = 6,
66+
private void WithSubstreamsSupport(int splitWhen = 3, int elementCount = 6,
6767
SubstreamCancelStrategy substreamCancelStrategy = SubstreamCancelStrategy.Drain,
68-
Action<TestSubscriber.ManualProbe<Source<int, NotUsed>>, ISubscription, Func<Task<Source<int, NotUsed>>>> run = null)
68+
Action<TestSubscriber.ManualProbe<Source<int, NotUsed>>, ISubscription, Func<Source<int, NotUsed>>> run = null)
6969
{
7070

7171
var source = Source.From(Enumerable.Range(1, elementCount));
@@ -75,23 +75,23 @@ private async Task WithSubstreamsSupportAsync(int splitWhen = 3, int elementCoun
7575
.RunWith(Sink.AsPublisher<Source<int, NotUsed>>(false), Materializer);
7676
var masterSubscriber = TestSubscriber.CreateManualSubscriberProbe<Source<int, NotUsed>>(this);
7777
groupStream.Subscribe(masterSubscriber);
78-
var masterSubscription = await masterSubscriber.ExpectSubscriptionAsync();
78+
var masterSubscription = masterSubscriber.ExpectSubscription();
7979

80-
run?.Invoke(masterSubscriber, masterSubscription, async() =>
80+
run?.Invoke(masterSubscriber, masterSubscription, () =>
8181
{
8282
masterSubscription.Request(1);
83-
return await masterSubscriber.ExpectNextAsync();
83+
return masterSubscriber.ExpectNext();
8484
});
8585
}
8686

8787
[Fact]
8888
public async Task SplitWhen_must_work_in_the_happy_case()
8989
{
90-
await this.AssertAllStagesStoppedAsync(async() => {
91-
await WithSubstreamsSupportAsync(elementCount: 4,
90+
await this.AssertAllStagesStoppedAsync(() => {
91+
WithSubstreamsSupport(elementCount: 4,
9292
run: async (masterSubscriber, masterSubscription, getSubFlow) =>
9393
{
94-
var p = (await getSubFlow())
94+
var p = getSubFlow()
9595
.RunWith(Sink.AsPublisher<int>(false), Materializer);
9696
var kit = this;
9797
var probe = kit.CreateManualSubscriberProbe<int>();
@@ -106,7 +106,7 @@ await WithSubstreamsSupportAsync(elementCount: 4,
106106
subscription.Request(1);
107107
await probe.ExpectCompleteAsync();
108108

109-
p = (await getSubFlow())
109+
p = getSubFlow()
110110
.RunWith(Sink.AsPublisher<int>(false), Materializer);
111111
kit = this;
112112
probe = kit.CreateManualSubscriberProbe<int>();
@@ -128,7 +128,7 @@ await WithSubstreamsSupportAsync(elementCount: 4,
128128
masterSubscription.Request(1);
129129
await masterSubscriber.ExpectCompleteAsync();
130130
});
131-
//return Task.CompletedTask;
131+
return Task.CompletedTask;
132132
}, Materializer);
133133
}
134134

@@ -153,12 +153,12 @@ await this.AssertAllStagesStoppedAsync(() => {
153153
[Fact]
154154
public async Task SplitWhen_must_work_when_first_element_is_split_by()
155155
{
156-
await this.AssertAllStagesStoppedAsync(async() =>
156+
await this.AssertAllStagesStoppedAsync(() =>
157157
{
158-
await WithSubstreamsSupportAsync(1, 3,
158+
WithSubstreamsSupport(1, 3,
159159
run: async (masterSubscriber, masterSubscription, getSubFlow) =>
160160
{
161-
var s1 = new StreamPuppet((await getSubFlow())
161+
var s1 = new StreamPuppet(getSubFlow()
162162
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
163163

164164
s1.Request(5);
@@ -170,22 +170,22 @@ await WithSubstreamsSupportAsync(1, 3,
170170
masterSubscription.Request(1);
171171
await masterSubscriber.ExpectCompleteAsync();
172172
});
173-
//return Task.CompletedTask;
173+
return Task.CompletedTask;
174174
}, Materializer);
175175
}
176176

177177
[Fact]
178178
public async Task SplitWhen_must_support_cancelling_substreams()
179179
{
180-
await this.AssertAllStagesStoppedAsync(async() =>
180+
await this.AssertAllStagesStoppedAsync(() =>
181181
{
182-
await WithSubstreamsSupportAsync(5, 8,
182+
WithSubstreamsSupport(5, 8,
183183
run: async (masterSubscriber, masterSubscription, getSubFlow) =>
184184
{
185-
var s1 = new StreamPuppet((await getSubFlow())
185+
var s1 = new StreamPuppet(getSubFlow()
186186
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
187187
s1.Cancel();
188-
var s2 = new StreamPuppet((await getSubFlow())
188+
var s2 = new StreamPuppet(getSubFlow()
189189
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
190190

191191
s2.Request(4);
@@ -199,7 +199,7 @@ await WithSubstreamsSupportAsync(5, 8,
199199
masterSubscription.Request(1);
200200
await masterSubscriber.ExpectCompleteAsync();
201201
});
202-
//return Task.CompletedTask;
202+
return Task.CompletedTask;
203203
}, Materializer);
204204
}
205205

@@ -275,12 +275,12 @@ await this.AssertAllStagesStoppedAsync(async() => {
275275
[Fact]
276276
public async Task SplitWhen_must_support_cancelling_the_master_stream()
277277
{
278-
await this.AssertAllStagesStoppedAsync(async() =>
278+
await this.AssertAllStagesStoppedAsync(() =>
279279
{
280-
await WithSubstreamsSupportAsync(5, 8,
280+
WithSubstreamsSupport(5, 8,
281281
run: async (masterSubscriber, masterSubscription, getSubFlow) =>
282282
{
283-
var s1 = new StreamPuppet((await getSubFlow())
283+
var s1 = new StreamPuppet(getSubFlow()
284284
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
285285
masterSubscription.Cancel();
286286

@@ -292,7 +292,7 @@ await WithSubstreamsSupportAsync(5, 8,
292292
s1.Request(1);
293293
await s1.ExpectCompleteAsync();
294294
});
295-
//return Task.CompletedTask;
295+
return Task.CompletedTask;
296296
}, Materializer);
297297
}
298298

@@ -435,18 +435,18 @@ await this.AssertAllStagesStoppedAsync(async() => {
435435
[Fact]
436436
public async Task SplitWhen_must_support_eager_cancellation_of_master_stream_on_cancelling_substreams()
437437
{
438-
await this.AssertAllStagesStoppedAsync(async() => {
439-
await WithSubstreamsSupportAsync(5, 8, SubstreamCancelStrategy.Propagate,
438+
await this.AssertAllStagesStoppedAsync(() => {
439+
WithSubstreamsSupport(5, 8, SubstreamCancelStrategy.Propagate,
440440
async (masterSubscriber, masterSubscription, expectSubFlow) =>
441441
{
442-
var s1 = new StreamPuppet((await expectSubFlow())
442+
var s1 = new StreamPuppet(expectSubFlow()
443443
.RunWith(Sink.AsPublisher<int>(false), Materializer),
444444
this);
445445
s1.Cancel();
446446
await masterSubscriber.ExpectCompleteAsync();
447447

448448
});
449-
//return Task.CompletedTask;
449+
return Task.CompletedTask;
450450
}, Materializer);
451451
}
452452
}

0 commit comments

Comments
 (0)