Skip to content

Commit 71abf8d

Browse files
committed
Changes in SplitWhen_must_work_in_the_happy_case
1 parent 5f4f5dd commit 71abf8d

File tree

1 file changed

+34
-22
lines changed

1 file changed

+34
-22
lines changed

src/core/Akka.Streams.Tests/Dsl/FlowSplitWhenSpec.cs

+34-22
Original file line numberDiff line numberDiff line change
@@ -90,28 +90,40 @@ public async Task SplitWhen_must_work_in_the_happy_case()
9090
await this.AssertAllStagesStoppedAsync(() => {
9191
WithSubstreamsSupport(elementCount: 4,
9292
run: async (masterSubscriber, masterSubscription, getSubFlow) =>
93-
{
94-
var s1 = new StreamPuppet(getSubFlow()
95-
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
96-
masterSubscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
97-
s1.Request(2);
98-
await s1.ExpectNextAsync(1);
99-
await s1.ExpectNextAsync(2);
100-
s1.Request(1);
101-
await s1.ExpectCompleteAsync();
102-
103-
var s2 = new StreamPuppet(getSubFlow()
104-
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
105-
masterSubscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
106-
107-
s2.Request(1);
108-
await s2.ExpectNextAsync(3);
109-
s2.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
110-
111-
s2.Request(1);
112-
await s2.ExpectNextAsync(4);
113-
s2.Request(1);
114-
await s2.ExpectCompleteAsync();
93+
{
94+
var p = getSubFlow()
95+
.RunWith(Sink.AsPublisher<int>(false), Materializer);
96+
var kit = this;
97+
var probe = kit.CreateManualSubscriberProbe<int>();
98+
p.Subscribe(probe);
99+
var subscription = await probe.ExpectSubscriptionAsync();
100+
//var s1 = new StreamPuppet(getSubFlow()
101+
//.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
102+
await masterSubscriber.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(4100));
103+
subscription.Request(2);
104+
await probe.ExpectNextAsync(1);
105+
await probe.ExpectNextAsync(2);
106+
subscription.Request(1);
107+
await probe.ExpectCompleteAsync();
108+
109+
p = getSubFlow()
110+
.RunWith(Sink.AsPublisher<int>(false), Materializer);
111+
kit = this;
112+
probe = kit.CreateManualSubscriberProbe<int>();
113+
p.Subscribe(probe);
114+
115+
//var s2 = new StreamPuppet(getSubFlow()
116+
//.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
117+
await masterSubscriber.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
118+
119+
subscription.Request(1);
120+
await probe.ExpectNextAsync(3);
121+
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
122+
123+
subscription.Request(1);
124+
await probe.ExpectNextAsync(4);
125+
subscription.Request(1);
126+
await probe.ExpectCompleteAsync();
115127

116128
masterSubscription.Request(1);
117129
await masterSubscriber.ExpectCompleteAsync();

0 commit comments

Comments
 (0)