Skip to content

[62-74] QueueSourceSpec #6610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 30, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 39 additions & 40 deletions src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,9 @@ public void QueueSource_should_buffer_when_needed()
}

[Fact]
public void QueueSource_should_not_fail_when_0_buffer_space_and_demand_is_signalled()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use async methods instead - all tests.

public async Task QueueSource_should_not_fail_when_0_buffer_space_and_demand_is_signalled()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var s = this.CreateManualSubscriberProbe<int>();
var queue =
Source.Queue<int>(0, OverflowStrategy.DropHead)
Expand All @@ -128,15 +127,14 @@ public void QueueSource_should_not_fail_when_0_buffer_space_and_demand_is_signal
sub.Request(1);
AssertSuccess(queue.OfferAsync(1));
sub.Cancel();

return Task.CompletedTask;
}, _materializer);
}

[Fact]
public void QueueSource_should_wait_for_demand_when_buffer_is_0()
public async Task QueueSource_should_wait_for_demand_when_buffer_is_0()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var s = this.CreateManualSubscriberProbe<int>();
var queue =
Source.Queue<int>(0, OverflowStrategy.DropHead)
Expand All @@ -150,14 +148,14 @@ public void QueueSource_should_wait_for_demand_when_buffer_is_0()
ExpectMsg<Enqueued>();
s.ExpectNext(1);
sub.Cancel();
return Task.CompletedTask;
}, _materializer);
}

[Fact]
public void QueueSource_should_finish_offer_and_complete_futures_when_stream_completed()
public async Task QueueSource_should_finish_offer_and_complete_futures_when_stream_completed()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var s = this.CreateManualSubscriberProbe<int>();
var queue =
Source.Queue<int>(0, OverflowStrategy.DropHead)
Expand All @@ -173,15 +171,15 @@ public void QueueSource_should_finish_offer_and_complete_futures_when_stream_com

sub.Cancel();

ExpectMsgAllOf(new object[]{ QueueClosed.Instance, "done" });
ExpectMsgAllOf(new object[] { QueueClosed.Instance, "done" });
return Task.CompletedTask;
}, _materializer);
}

[Fact]
public void QueueSource_should_fail_stream_on_buffer_overflow_in_fail_mode()
public async Task QueueSource_should_fail_stream_on_buffer_overflow_in_fail_mode()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var s = this.CreateManualSubscriberProbe<int>();
var queue =
Source.Queue<int>(1, OverflowStrategy.Fail)
Expand All @@ -192,14 +190,14 @@ public void QueueSource_should_fail_stream_on_buffer_overflow_in_fail_mode()
queue.OfferAsync(1);
queue.OfferAsync(1);
s.ExpectError();
return Task.CompletedTask;
}, _materializer);
}

[Fact]
public void QueueSource_should_remember_pull_from_downstream_to_send_offered_element_immediately()
public async Task QueueSource_should_remember_pull_from_downstream_to_send_offered_element_immediately()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var s = this.CreateManualSubscriberProbe<int>();
var probe = CreateTestProbe();
var queue = TestSourceStage<int, ISourceQueueWithComplete<int>>.Create(
Expand All @@ -213,18 +211,18 @@ public void QueueSource_should_remember_pull_from_downstream_to_send_offered_ele
AssertSuccess(queue.OfferAsync(1));
s.ExpectNext(1);
sub.Cancel();
return Task.CompletedTask;
}, _materializer);
}

[Fact]
public void QueueSource_should_fail_offer_future_if_user_does_not_wait_in_backpressure_mode()
public async Task QueueSource_should_fail_offer_future_if_user_does_not_wait_in_backpressure_mode()
{
this.AssertAllStagesStopped(() =>
{
var tuple =
Source.Queue<int>(5, OverflowStrategy.Backpressure)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(_materializer);
await this.AssertAllStagesStoppedAsync(() => {
var tuple =
Source.Queue<int>(5, OverflowStrategy.Backpressure)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(_materializer);
var queue = tuple.Item1;
var probe = tuple.Item2;

Expand All @@ -239,16 +237,16 @@ public void QueueSource_should_fail_offer_future_if_user_does_not_wait_in_backpr
queue.Complete();

probe.Request(6)
.ExpectNext( 2, 3, 4, 5, 6)
.ExpectNext(2, 3, 4, 5, 6)
.ExpectComplete();
return Task.CompletedTask;
}, _materializer);
}

[Fact]
public void QueueSource_should_complete_watching_future_with_failure_if_stream_failed()
public async Task QueueSource_should_complete_watching_future_with_failure_if_stream_failed()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var s = this.CreateManualSubscriberProbe<int>();
var queue =
Source.Queue<int>(1, OverflowStrategy.Fail)
Expand All @@ -258,14 +256,14 @@ public void QueueSource_should_complete_watching_future_with_failure_if_stream_f
queue.OfferAsync(1); // need to wait when first offer is done as initialization can be done in this moment
queue.OfferAsync(2);
ExpectMsg<Status.Failure>();
return Task.CompletedTask;
}, _materializer);
}

[Fact]
public void QueueSource_should_complete_watching_future_with_failure_if_materializer_shut_down()
public async Task QueueSource_should_complete_watching_future_with_failure_if_materializer_shut_down()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var tempMap = ActorMaterializer.Create(Sys, ActorMaterializerSettings.Create(Sys)); // need to create a new materializer to be able to shutdown it
var s = this.CreateManualSubscriberProbe<int>();
var queue = Source.Queue<int>(1, OverflowStrategy.Fail)
Expand All @@ -274,14 +272,14 @@ public void QueueSource_should_complete_watching_future_with_failure_if_material
queue.WatchCompletionAsync().PipeTo(TestActor);
tempMap.Shutdown();
ExpectMsg<Status.Failure>();
return Task.CompletedTask;
}, _materializer);
}

[Fact]
public void QueueSource_should_return_false_when_element_was_not_added_to_buffer()
public async Task QueueSource_should_return_false_when_element_was_not_added_to_buffer()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var s = this.CreateManualSubscriberProbe<int>();
var queue =
Source.Queue<int>(1, OverflowStrategy.DropNew)
Expand All @@ -296,14 +294,14 @@ public void QueueSource_should_return_false_when_element_was_not_added_to_buffer
sub.Request(1);
s.ExpectNext(1);
sub.Cancel();
return Task.CompletedTask;
}, _materializer);
}

[Fact]
public void QueueSource_should_wait_when_buffer_is_full_and_backpressure_is_on()
public async Task QueueSource_should_wait_when_buffer_is_full_and_backpressure_is_on()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var s = this.CreateManualSubscriberProbe<int>();
var queue =
Source.Queue<int>(1, OverflowStrategy.Backpressure)
Expand All @@ -323,14 +321,14 @@ public void QueueSource_should_wait_when_buffer_is_full_and_backpressure_is_on()
ExpectMsg<Enqueued>();

sub.Cancel();
return Task.CompletedTask;
}, _materializer);
}

[Fact]
public void QueueSource_should_fail_offer_future_when_stream_is_completed()
public async Task QueueSource_should_fail_offer_future_when_stream_is_completed()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var s = this.CreateManualSubscriberProbe<int>();
var queue =
Source.Queue<int>(1, OverflowStrategy.DropNew)
Expand All @@ -344,6 +342,7 @@ public void QueueSource_should_fail_offer_future_when_stream_is_completed()

var exception = Record.ExceptionAsync(async () => await queue.OfferAsync(1)).Result;
exception.Should().BeOfType<StreamDetachedException>();
return Task.CompletedTask;
}, _materializer);
}

Expand Down