Skip to content

Commit 2eb3004

Browse files
authored
[BACKPORT #6281] Add ReceiveAsync to TestActorRef (#6286)
* Add ReceiveAsync feature to Akka.TestKit TestActorRef (#6281) Co-authored-by: Aaron Stannard <[email protected]> (cherry picked from commit 5605d83) * Post-merge fix
1 parent 37179fe commit 2eb3004

File tree

4 files changed

+157
-10
lines changed

4 files changed

+157
-10
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file="ExceptionHandling.cs" company="Akka.NET Project">
3+
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
4+
// </copyright>
5+
// -----------------------------------------------------------------------
6+
7+
using System;
8+
using System.Threading.Tasks;
9+
using Akka.Actor;
10+
using FluentAssertions;
11+
using Xunit;
12+
using Xunit.Abstractions;
13+
using static FluentAssertions.FluentActions;
14+
15+
namespace Akka.TestKit.Tests.TestActorRefTests
16+
{
17+
public class ExceptionHandling: TestKit.Xunit2.TestKit
18+
{
19+
private class GiveError
20+
{ }
21+
22+
private class GiveErrorAsync
23+
{ }
24+
25+
private class ExceptionActor : ReceiveActor
26+
{
27+
public ExceptionActor()
28+
{
29+
Receive<GiveError>((b) => throw new Exception("WAT"));
30+
31+
ReceiveAsync<GiveErrorAsync>(async (b) =>
32+
{
33+
await Task.Delay(TimeSpan.FromSeconds(0.1));
34+
throw new Exception("WATASYNC");
35+
});
36+
}
37+
}
38+
39+
public ExceptionHandling(ITestOutputHelper helper) : base("akka.loglevel = debug", helper)
40+
{
41+
}
42+
43+
[Fact]
44+
public void GetException()
45+
{
46+
var props = Props.Create<ExceptionActor>();
47+
var subject = new TestActorRef<ExceptionActor>(Sys, props, null, "testA");
48+
Invoking(() => subject.Receive(new GiveError()))
49+
.Should().Throw<Exception>().WithMessage("WAT");
50+
}
51+
52+
[Fact]
53+
public async Task GetExceptionAsync()
54+
{
55+
var props = Props.Create<ExceptionActor>();
56+
var subject = new TestActorRef<ExceptionActor>(Sys, props, null, "testB");
57+
await Awaiting(() => subject.ReceiveAsync(new GiveErrorAsync()))
58+
.Should().ThrowAsync<Exception>().WithMessage("WATASYNC");
59+
}
60+
}
61+
}

src/core/Akka.TestKit/Internal/InternalTestActorRef.cs

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
//-----------------------------------------------------------------------
77

88
using System;
9+
using System.Collections.Generic;
910
using System.Threading;
11+
using System.Threading.Tasks;
1012
using Akka.Actor;
1113
using Akka.Actor.Internal;
1214
using Akka.Dispatch;
13-
using Akka.Dispatch.SysMsg;
1415
using Akka.Pattern;
1516
using Akka.Util;
1617
using Akka.Util.Internal;
@@ -89,6 +90,14 @@ public void Receive(object message, IActorRef sender = null)
8990
cell.UseThreadContext(() => cell.ReceiveMessageForTest(envelope));
9091
}
9192

93+
public Task ReceiveAsync(object message, IActorRef sender = null)
94+
{
95+
var cell = (TestActorCell)Cell;
96+
sender = sender.IsNobody() ? cell.System.DeadLetters : sender;
97+
var envelope = new Envelope(message, sender);
98+
return cell.UseThreadContextAsync(() => cell.ReceiveMessageForTestAsync(envelope));
99+
}
100+
92101
/// <summary>
93102
/// TBD
94103
/// </summary>
@@ -245,25 +254,73 @@ public override ActorTaskScheduler TaskScheduler
245254
if (taskScheduler != null)
246255
return taskScheduler;
247256

248-
taskScheduler = new TestActorTaskScheduler(this);
257+
taskScheduler = new TestActorTaskScheduler(this, TaskFailureHook);
249258
return Interlocked.CompareExchange(ref _taskScheduler, taskScheduler, null) ?? taskScheduler;
250259
}
251260
}
252261

262+
263+
private readonly Dictionary<object, TaskCompletionSource<Done>> _testActorTasks =
264+
new Dictionary<object, TaskCompletionSource<Done>>();
265+
266+
/// <summary>
267+
/// This is only intended to be called from TestKit's TestActorRef
268+
/// </summary>
269+
/// <param name="envelope">TBD</param>
270+
public Task ReceiveMessageForTestAsync(Envelope envelope)
271+
{
272+
var tcs = new TaskCompletionSource<Done>();
273+
_testActorTasks[envelope.Message] = tcs;
274+
ReceiveMessageForTest(envelope);
275+
return tcs.Task;
276+
}
277+
278+
/// <summary>
279+
/// TBD
280+
/// </summary>
281+
/// <param name="actionAsync">TBD</param>
282+
public Task UseThreadContextAsync(Func<Task> actionAsync)
283+
{
284+
var tmp = InternalCurrentActorCellKeeper.Current;
285+
InternalCurrentActorCellKeeper.Current = this;
286+
try
287+
{
288+
return actionAsync();
289+
}
290+
finally
291+
{
292+
//ensure we set back the old context
293+
InternalCurrentActorCellKeeper.Current = tmp;
294+
}
295+
}
296+
297+
private void TaskFailureHook(object message, Exception exception)
298+
{
299+
if (!_testActorTasks.TryGetValue(message, out var tcs))
300+
return;
301+
if (!(exception is null))
302+
tcs.TrySetException(exception);
303+
else
304+
tcs.TrySetResult(Done.Instance);
305+
_testActorTasks.Remove(message);
306+
}
307+
253308
/// <summary>
254309
/// TBD
255310
/// </summary>
256311
public new object Actor { get { return base.Actor; } }
257312
}
258313

259-
internal class TestActorTaskScheduler : ActorTaskScheduler
314+
internal class TestActorTaskScheduler : ActorTaskScheduler, IAsyncResultInterceptor
260315
{
261-
private readonly ActorCell _testActorCell;
316+
private readonly TestActorCell _testActorCell;
317+
private readonly Action<object, Exception> _taskCallback;
262318

263319
/// <inheritdoc />
264-
internal TestActorTaskScheduler(ActorCell testActorCell) : base(testActorCell)
320+
internal TestActorTaskScheduler(ActorCell testActorCell, Action<object, Exception> taskCallback) : base(testActorCell)
265321
{
266-
_testActorCell = testActorCell;
322+
_taskCallback = taskCallback;
323+
_testActorCell = (TestActorCell) testActorCell;
267324
}
268325

269326
/// <inheritdoc />
@@ -277,6 +334,11 @@ protected override void OnAfterTaskCompleted()
277334
{
278335
ActorCellKeepingSynchronizationContext.AsyncCache = null;
279336
}
337+
338+
public void OnTaskCompleted(object message, Exception exception)
339+
{
340+
_taskCallback(message, exception);
341+
}
280342
}
281343

282344
/// <summary>

src/core/Akka.TestKit/TestActorRefBase.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
using System;
99
using System.Collections.Generic;
10+
using System.Threading.Tasks;
1011
using Akka.Actor;
1112
using Akka.Dispatch;
1213
using Akka.Dispatch.SysMsg;
@@ -51,6 +52,22 @@ public void Receive(object message, IActorRef sender = null)
5152
_internalRef.Receive(message, sender);
5253
}
5354

55+
/// <summary>
56+
/// Directly inject messages into actor ReceiveAsync behavior. Any exceptions
57+
/// thrown will be available to you, while still being able to use
58+
/// become/unbecome.
59+
/// Note: This method violates the actor model and could cause unpredictable
60+
/// behavior. For example, a Receive call to an actor could run simultaneously
61+
/// (2 simultaneous threads running inside the actor) with the actor's handling
62+
/// of a previous Tell call.
63+
/// </summary>
64+
/// <param name="message">The message.</param>
65+
/// <param name="sender">The sender.</param>
66+
public Task ReceiveAsync(object message, IActorRef sender = null)
67+
{
68+
return _internalRef.ReceiveAsync(message, sender);
69+
}
70+
5471
/// <summary>
5572
/// TBD
5673
/// </summary>

src/core/Akka/Dispatch/ActorTaskScheduler.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public static void RunTask(Func<Task> asyncAction)
149149
//suspend the mailbox
150150
dispatcher.Suspend(context);
151151

152-
ActorTaskScheduler actorScheduler = context.TaskScheduler;
152+
var actorScheduler = context.TaskScheduler;
153153
actorScheduler.CurrentMessage = context.CurrentMessage;
154154

155155
actorScheduler.OnBeforeTaskStarted();
@@ -158,18 +158,21 @@ public static void RunTask(Func<Task> asyncAction)
158158
.Unwrap()
159159
.ContinueWith(parent =>
160160
{
161-
Exception exception = GetTaskException(parent);
162-
161+
var exception = GetTaskException(parent);
163162
if (exception == null)
164163
{
165164
dispatcher.Resume(context);
166-
167165
context.CheckReceiveTimeout();
168166
}
169167
else
170168
{
171169
context.Self.AsInstanceOf<IInternalActorRef>().SendSystemMessage(new ActorTaskSchedulerMessage(exception, actorScheduler.CurrentMessage));
172170
}
171+
172+
// Used by TestActorRef to intercept async execution result
173+
if(actorScheduler is IAsyncResultInterceptor interceptor)
174+
interceptor.OnTaskCompleted(actorScheduler.CurrentMessage, exception);
175+
173176
//clear the current message field of the scheduler
174177
actorScheduler.CurrentMessage = null;
175178
actorScheduler.OnAfterTaskCompleted();
@@ -203,3 +206,7 @@ private static Exception TryUnwrapAggregateException(AggregateException aggregat
203206
}
204207
}
205208

209+
internal interface IAsyncResultInterceptor
210+
{
211+
void OnTaskCompleted(object message, Exception exception);
212+
}

0 commit comments

Comments
 (0)