Skip to content

[14-74]FlowIdleInjectSpec #6558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 25, 2023
132 changes: 65 additions & 67 deletions src/core/Akka.Streams.Tests/Dsl/FlowIdleInjectSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
Expand All @@ -31,43 +32,40 @@ public FlowIdleInjectSpec(ITestOutputHelper helper) : base(helper)
}

[Fact]
public void KeepAlive_must_not_emit_additional_elements_if_upstream_is_fastEnough()
public async Task KeepAlive_must_not_emit_additional_elements_if_upstream_is_fastEnough()
{
this.AssertAllStagesStopped(() =>
{
var result = Source.From(Enumerable.Range(1, 10))
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.Grouped(1000)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);

await this.AssertAllStagesStoppedAsync(() => {
var result = Source.From(Enumerable.Range(1, 10))
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.Grouped(1000)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
result.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
result.Result.Should().BeEquivalentTo(Enumerable.Range(1, 10));
return Task.CompletedTask;
}, Materializer);
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void KeepAlive_must_emit_elements_periodically_after_silent_periods()
public async Task KeepAlive_must_emit_elements_periodically_after_silent_periods()
{
this.AssertAllStagesStopped(() =>
{
var sourceWithIdleGap = Source.Combine(Source.From(Enumerable.Range(1, 5)),
Source.From(Enumerable.Range(6, 5)).InitialDelay(TimeSpan.FromSeconds(2)),
await this.AssertAllStagesStoppedAsync(() => {
var sourceWithIdleGap = Source.Combine(Source.From(Enumerable.Range(1, 5)),
Source.From(Enumerable.Range(6, 5)).InitialDelay(TimeSpan.FromSeconds(2)),
i => new Merge<int, int>(i));


var result = sourceWithIdleGap
.KeepAlive(TimeSpan.FromSeconds(0.6), () => 0)
.Grouped(1000)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);

result.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
result.Result.Should().BeEquivalentTo(
Enumerable.Range(1, 5).Concat(new[] {0, 0, 0}).Concat(Enumerable.Range(6, 5)));
Enumerable.Range(1, 5).Concat(new[] { 0, 0, 0 }).Concat(Enumerable.Range(6, 5)));
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void KeepAlive_must_immediately_pull_upstream()
public async Task KeepAlive_must_immediately_pull_upstream()
{
var upstream = this.CreatePublisherProbe<int>();
var downstream = this.CreateSubscriberProbe<int>();
Expand All @@ -76,17 +74,17 @@ public void KeepAlive_must_immediately_pull_upstream()
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

downstream.Request(1);
await downstream.RequestAsync(1);

upstream.SendNext(1);
downstream.ExpectNext(1);
await upstream.SendNextAsync(1);
await downstream.ExpectNextAsync(1);

upstream.SendComplete();
downstream.ExpectComplete();
await upstream.SendCompleteAsync();
await downstream.ExpectCompleteAsync();
}

[Fact]
public void KeepAlive_must_immediately_pull_upstream_after_busy_period()
public async Task KeepAlive_must_immediately_pull_upstream_after_busy_period()
{
var upstream = this.CreatePublisherProbe<int>();
var downstream = this.CreateSubscriberProbe<int>();
Expand All @@ -96,20 +94,20 @@ public void KeepAlive_must_immediately_pull_upstream_after_busy_period()
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

downstream.Request(10);
await downstream.RequestAsync(10);
downstream.ExpectNextN(10).Should().BeEquivalentTo(Enumerable.Range(1, 10));

downstream.Request(1);
await downstream.RequestAsync(1);

upstream.SendNext(1);
downstream.ExpectNext(1);
await upstream.SendNextAsync(1);
await downstream.ExpectNextAsync(1);

upstream.SendComplete();
downstream.ExpectComplete();
await upstream.SendCompleteAsync();
await downstream.ExpectCompleteAsync();
}

[Fact]
public void KeepAlive_must_work_if_timer_fires_before_initial_request()
public async Task KeepAlive_must_work_if_timer_fires_before_initial_request()
{
var upstream = this.CreatePublisherProbe<int>();
var downstream = this.CreateSubscriberProbe<int>();
Expand All @@ -118,17 +116,17 @@ public void KeepAlive_must_work_if_timer_fires_before_initial_request()
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

downstream.EnsureSubscription();
downstream.ExpectNoMsg(TimeSpan.FromSeconds(1.5));
downstream.Request(1);
downstream.ExpectNext(0);
await downstream.EnsureSubscriptionAsync();
await downstream.ExpectNoMsgAsync(TimeSpan.FromSeconds(1.5));
await downstream.RequestAsync(1);
await downstream.ExpectNextAsync(0);

upstream.SendComplete();
downstream.ExpectComplete();
await upstream.SendCompleteAsync();
await downstream.ExpectCompleteAsync();
}

[Fact]
public void KeepAlive_must_work_if_timer_fires_before_initial_request_after_busy_period()
public async Task KeepAlive_must_work_if_timer_fires_before_initial_request_after_busy_period()
{
var upstream = this.CreatePublisherProbe<int>();
var downstream = this.CreateSubscriberProbe<int>();
Expand All @@ -138,19 +136,19 @@ public void KeepAlive_must_work_if_timer_fires_before_initial_request_after_busy
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

downstream.Request(10);
await downstream.RequestAsync(10);
downstream.ExpectNextN(Enumerable.Range(1, 10));

downstream.ExpectNoMsg(TimeSpan.FromSeconds(1.5));
downstream.Request(1);
downstream.ExpectNext(0);
await downstream.ExpectNoMsgAsync(TimeSpan.FromSeconds(1.5));
await downstream.RequestAsync(1);
await downstream.ExpectNextAsync(0);

upstream.SendComplete();
downstream.ExpectComplete();
await upstream.SendCompleteAsync();
await downstream.ExpectCompleteAsync();
}

[Fact]
public void KeepAlive_must_prefer_upstream_element_over_injected()
public async Task KeepAlive_must_prefer_upstream_element_over_injected()
{
var upstream = this.CreatePublisherProbe<int>();
var downstream = this.CreateSubscriberProbe<int>();
Expand All @@ -159,20 +157,20 @@ public void KeepAlive_must_prefer_upstream_element_over_injected()
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

downstream.EnsureSubscription();
downstream.ExpectNoMsg(TimeSpan.FromSeconds(1.5));
upstream.SendNext(1);
downstream.ExpectNoMsg(TimeSpan.FromSeconds(0.5));
await downstream.EnsureSubscriptionAsync();
await downstream.ExpectNoMsgAsync(TimeSpan.FromSeconds(1.5));
await upstream.SendNextAsync(1);
await downstream.ExpectNoMsgAsync(TimeSpan.FromSeconds(0.5));

downstream.Request(1);
downstream.ExpectNext(1);
await downstream.RequestAsync(1);
await downstream.ExpectNextAsync(1);

upstream.SendComplete();
downstream.ExpectComplete();
await upstream.SendCompleteAsync();
await downstream.ExpectCompleteAsync();
}

[Fact]
public void KeepAlive_must_prefer_upstream_element_over_injected_after_busy_period()
public async Task KeepAlive_must_prefer_upstream_element_over_injected_after_busy_period()
{
var upstream = this.CreatePublisherProbe<int>();
var downstream = this.CreateSubscriberProbe<int>();
Expand All @@ -182,22 +180,22 @@ public void KeepAlive_must_prefer_upstream_element_over_injected_after_busy_peri
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

downstream.Request(10);
await downstream.RequestAsync(10);
downstream.ExpectNextN(Enumerable.Range(1, 10));

downstream.ExpectNoMsg(TimeSpan.FromSeconds(1.5));
upstream.SendNext(1);
downstream.ExpectNoMsg(TimeSpan.FromSeconds(0.5));
await downstream.ExpectNoMsgAsync(TimeSpan.FromSeconds(1.5));
await upstream.SendNextAsync(1);
await downstream.ExpectNoMsgAsync(TimeSpan.FromSeconds(0.5));

downstream.Request(1);
downstream.ExpectNext(1);
await downstream.RequestAsync(1);
await downstream.ExpectNextAsync(1);

upstream.SendComplete();
downstream.ExpectComplete();
await upstream.SendCompleteAsync();
await downstream.ExpectCompleteAsync();
}

[Fact]
public void KeepAlive_must_reset_deadline_properly_after_injected_element()
public async Task KeepAlive_must_reset_deadline_properly_after_injected_element()
{
var upstream = this.CreatePublisherProbe<int>();
var downstream = this.CreateSubscriberProbe<int>();
Expand All @@ -206,12 +204,12 @@ public void KeepAlive_must_reset_deadline_properly_after_injected_element()
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

downstream.Request(2);
downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
downstream.ExpectNext(0);
await downstream.RequestAsync(2);
await downstream.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
await downstream.ExpectNextAsync(0);

downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
downstream.ExpectNext(0);
await downstream.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
await downstream.ExpectNextAsync(0);
}
}
}