Skip to content

[2-74]ActorRefSourceSpec #6552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 66 additions & 52 deletions src/core/Akka.Streams.Tests/Dsl/ActorRefSourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
Expand All @@ -30,124 +31,139 @@ public ActorRefSourceSpec()
}

[Fact]
public void A_ActorRefSource_must_emit_received_messages_to_the_stream()
public async Task A_ActorRefSource_must_emit_received_messages_to_the_stream()
{
var s = this.CreateManualSubscriberProbe<int>();
var actorRef = Source.ActorRef<int>(10, OverflowStrategy.Fail)
.To(Sink.FromSubscriber(s))
.Run(Materializer);
var sub = s.ExpectSubscription();
var sub = await s.ExpectSubscriptionAsync();
sub.Request(2);
actorRef.Tell(1);
s.ExpectNext(1);
await s.ExpectNextAsync(1);
actorRef.Tell(2);
s.ExpectNext(2);
await s.ExpectNextAsync(2);
actorRef.Tell(3);
s.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
await s.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
}

[Fact]
public void A_ActorRefSource_must_buffer_when_needed()
public async Task A_ActorRefSource_must_buffer_when_needed()
{
var s = this.CreateManualSubscriberProbe<int>();
var actorRef = Source.ActorRef<int>(100, OverflowStrategy.DropHead)
.To(Sink.FromSubscriber(s))
.Run(Materializer);
var sub = s.ExpectSubscription();
Enumerable.Range(1, 20).ForEach(x => actorRef.Tell(x));
var sub = await s.ExpectSubscriptionAsync();
foreach (var x in Enumerable.Range(1, 20))
actorRef.Tell(x);

sub.Request(10);
Enumerable.Range(1, 10).ForEach(x => s.ExpectNext(x));
foreach (var x in Enumerable.Range(1, 10))
await s.ExpectNextAsync(x);
sub.Request(10);
Enumerable.Range(11, 10).ForEach(x => s.ExpectNext(x));
foreach (var x in Enumerable.Range(11, 10))
await s.ExpectNextAsync(x);

Enumerable.Range(200, 200).ForEach(x => actorRef.Tell(x));
foreach (var x in Enumerable.Range(200, 200))
actorRef.Tell(x);
sub.Request(100);
Enumerable.Range(300, 100).ForEach(x => s.ExpectNext(x));

foreach (var x in Enumerable.Range(300, 100))
await s.ExpectNextAsync(x);

}

[Fact]
public void A_ActorRefSource_must_drop_new_when_full_and_with_DropNew_strategy()
public async Task A_ActorRefSource_must_drop_new_when_full_and_with_DropNew_strategy()
{
var t = Source.ActorRef<int>(100, OverflowStrategy.DropNew)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var actorRef = t.Item1;
var sub = t.Item2;

Enumerable.Range(1, 20).ForEach(x => actorRef.Tell(x));
foreach (var x in Enumerable.Range(1, 20))
actorRef.Tell(x);

sub.Request(10);
Enumerable.Range(1, 10).ForEach(x => sub.ExpectNext(x));

foreach (var x in Enumerable.Range(1, 10))
await sub.ExpectNextAsync(x);

sub.Request(10);
Enumerable.Range(11, 10).ForEach(x => sub.ExpectNext(x));

Enumerable.Range(200, 200).ForEach(x => actorRef.Tell(x));
foreach (var x in Enumerable.Range(11, 10))
await sub.ExpectNextAsync(x);

foreach (var x in Enumerable.Range(200, 200))
actorRef.Tell(x);

sub.Request(100);
Enumerable.Range(200, 100).ForEach(x => sub.ExpectNext(x));
foreach(var x in Enumerable.Range(200, 100))
await sub.ExpectNextAsync(x);
}

[Fact]
public void A_ActorRefSource_must_terminate_when_the_stream_is_cancelled()
public async Task A_ActorRefSource_must_terminate_when_the_stream_is_cancelled()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var s = this.CreateManualSubscriberProbe<int>();
var actorRef = Source.ActorRef<int>(0, OverflowStrategy.Fail)
.To(Sink.FromSubscriber(s))
.Run(Materializer);
Watch(actorRef);
var sub = s.ExpectSubscription();
var sub = await s.ExpectSubscriptionAsync();
sub.Cancel();
ExpectTerminated(actorRef);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_ActorRefSource_must_not_fail_when_0_buffer_space_and_demand_is_signalled()
public async Task A_ActorRefSource_must_not_fail_when_0_buffer_space_and_demand_is_signalled()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var s = this.CreateManualSubscriberProbe<int>();
var actorRef = Source.ActorRef<int>(0, OverflowStrategy.DropHead)
.To(Sink.FromSubscriber(s))
.Run(Materializer);
Watch(actorRef);
var sub = s.ExpectSubscription();
var sub = await s.ExpectSubscriptionAsync();
sub.Request(100);
sub.Cancel();
ExpectTerminated(actorRef);
}, Materializer);
}

[Fact]
public void A_ActorRefSource_must_signal_buffered_elements_and_complete_the_stream_after_receiving_Status_Success()
public async Task A_ActorRefSource_must_signal_buffered_elements_and_complete_the_stream_after_receiving_Status_Success()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var s = this.CreateManualSubscriberProbe<int>();
var actorRef = Source.ActorRef<int>(10, OverflowStrategy.Fail)
.To(Sink.FromSubscriber(s))
.Run(Materializer);
var sub = s.ExpectSubscription();
var sub = await s.ExpectSubscriptionAsync();
actorRef.Tell(1);
actorRef.Tell(2);
actorRef.Tell(3);
actorRef.Tell(new Status.Success("ok"));
sub.Request(10);
s.ExpectNext( 1, 2, 3);
s.ExpectComplete();
s.ExpectNext(1, 2, 3);
await s.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_ActorRefSource_must_not_buffer_elements_after_receiving_Status_Success()
public async Task A_ActorRefSource_must_not_buffer_elements_after_receiving_Status_Success()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var s = this.CreateManualSubscriberProbe<int>();
var actorRef = Source.ActorRef<int>(3, OverflowStrategy.DropBuffer)
.To(Sink.FromSubscriber(s))
.Run(Materializer);
var sub = s.ExpectSubscription();
var sub = await s.ExpectSubscriptionAsync();
actorRef.Tell(1);
actorRef.Tell(2);
actorRef.Tell(3);
Expand All @@ -156,46 +172,43 @@ public void A_ActorRefSource_must_not_buffer_elements_after_receiving_Status_Suc
actorRef.Tell(100);
actorRef.Tell(100);
sub.Request(10);
s.ExpectNext( 1, 2, 3);
s.ExpectComplete();

s.ExpectNext(1, 2, 3);
await s.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_ActorRefSource_must_complete_and_materialize_the_stream_after_receiving_Status_Success()
public async Task A_ActorRefSource_must_complete_and_materialize_the_stream_after_receiving_Status_Success()
{
this.AssertAllStagesStopped(() =>
{
var (actorRef, done) = Source.ActorRef<int>(3, OverflowStrategy.DropBuffer)
.ToMaterialized(Sink.Ignore<int>(), Keep.Both)
.Run(Materializer);
await this.AssertAllStagesStoppedAsync(() => {
var (actorRef, done) = Source.ActorRef<int>(3, OverflowStrategy.DropBuffer)
.ToMaterialized(Sink.Ignore<int>(), Keep.Both)
.Run(Materializer);
actorRef.Tell(new Status.Success("ok"));
done.ContinueWith(_ => Done.Instance).Result.Should().Be(Done.Instance);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_ActorRefSource_must_fail_the_stream_when_receiving_Status_Failure()
public async Task A_ActorRefSource_must_fail_the_stream_when_receiving_Status_Failure()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var s = this.CreateManualSubscriberProbe<int>();
var actorRef = Source.ActorRef<int>(10, OverflowStrategy.Fail)
.To(Sink.FromSubscriber(s))
.Run(Materializer);
s.ExpectSubscription();
await s.ExpectSubscriptionAsync();
var ex = new TestException("testfailure");
actorRef.Tell(new Status.Failure(ex));
s.ExpectError().Should().Be(ex);
}, Materializer);
}

[Fact]
public void A_ActorRefSource_must_set_actor_name_equal_to_stage_name()
public async Task A_ActorRefSource_must_set_actor_name_equal_to_stage_name()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var s = this.CreateManualSubscriberProbe<int>();
const string name = "SomeCustomName";
var actorRef = Source.ActorRef<int>(10, OverflowStrategy.Fail)
Expand All @@ -204,6 +217,7 @@ public void A_ActorRefSource_must_set_actor_name_equal_to_stage_name()
.Run(Materializer);
actorRef.Path.ToString().Should().Contain(name);
actorRef.Tell(PoisonPill.Instance);
return Task.CompletedTask;
}, Materializer);
}
}
Expand Down