Skip to content

Commit e74696f

Browse files
committed
Alternative implementation of AtomicState leveraging WaitAsync
1 parent b0529e9 commit e74696f

File tree

3 files changed

+215
-99
lines changed

3 files changed

+215
-99
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="CircuitBreakerStressSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Linq;
10+
using System.Threading;
11+
using System.Threading.Tasks;
12+
using Akka.Actor;
13+
using Akka.Pattern;
14+
using Akka.TestKit;
15+
using Xunit;
16+
using Xunit.Abstractions;
17+
18+
namespace Akka.Tests.Pattern
19+
{
20+
public class CircuitBreakerStressSpec : AkkaSpec
21+
{
22+
internal class RequestJob
23+
{
24+
public static RequestJob Instance => new RequestJob();
25+
private RequestJob() { }
26+
}
27+
28+
internal class JobDone
29+
{
30+
public static JobDone Instance => new JobDone();
31+
private JobDone() { }
32+
}
33+
34+
internal class GetResult
35+
{
36+
public static GetResult Instance => new GetResult();
37+
private GetResult() { }
38+
}
39+
40+
internal class Result
41+
{
42+
public int DoneCount { get; }
43+
public int TimeoutCount { get; }
44+
public int FailCount { get; }
45+
public int CircCount { get; }
46+
47+
public Result(int doneCount, int timeoutCount, int failCount, int circCount)
48+
{
49+
DoneCount = doneCount;
50+
TimeoutCount = timeoutCount;
51+
FailCount = failCount;
52+
CircCount = circCount;
53+
}
54+
}
55+
56+
internal class StressActor : UntypedActor
57+
{
58+
private readonly CircuitBreaker _breaker;
59+
private int _doneCount;
60+
private int _timeoutCount;
61+
private int _failCount;
62+
private int _circCount;
63+
64+
public StressActor(CircuitBreaker breaker) => _breaker = breaker;
65+
66+
protected override void OnReceive(object message)
67+
{
68+
switch (message)
69+
{
70+
case RequestJob _:
71+
_breaker.WithCircuitBreaker(Job).PipeTo(Self);
72+
break;
73+
case JobDone _:
74+
_doneCount++;
75+
break;
76+
case Status.Failure failure when failure.Cause is OpenCircuitException:
77+
_circCount++;
78+
_breaker.WithCircuitBreaker(Job).PipeTo(Self);
79+
break;
80+
case Status.Failure failure when failure.Cause is TimeoutException:
81+
_timeoutCount++;
82+
_breaker.WithCircuitBreaker(Job).PipeTo(Self);
83+
break;
84+
case Status.Failure _:
85+
_failCount++;
86+
_breaker.WithCircuitBreaker(Job).PipeTo(Self);
87+
break;
88+
case GetResult _:
89+
Sender.Tell(new Result(_doneCount, _timeoutCount, _failCount, _circCount));
90+
break;
91+
default:
92+
base.Unhandled(message);
93+
break;
94+
}
95+
}
96+
97+
private async Task<JobDone> Job()
98+
{
99+
await Task.Delay(TimeSpan.FromMilliseconds(300));
100+
return JobDone.Instance;
101+
}
102+
}
103+
104+
public CircuitBreakerStressSpec(ITestOutputHelper output)
105+
: base(output)
106+
{ }
107+
108+
[Fact]
109+
public void A_CircuitBreaker_stress_test()
110+
{
111+
var breaker = new CircuitBreaker(Sys.Scheduler, 5, TimeSpan.FromMilliseconds(200), TimeSpan.FromSeconds(200));
112+
var stressActors = Enumerable.Range(0, 3).Select(i => Sys.ActorOf(Props.Create<StressActor>(breaker))).ToList();
113+
114+
for (var i = 0; i < 1000; i++)
115+
foreach (var stressActor in stressActors)
116+
{
117+
stressActor.Tell(RequestJob.Instance);
118+
}
119+
120+
// let them work for a while
121+
Thread.Sleep(3000);
122+
123+
foreach (var stressActor in stressActors)
124+
{
125+
stressActor.Tell(GetResult.Instance);
126+
var result = ExpectMsg<Result>();
127+
result.DoneCount.ShouldBe(0);
128+
129+
Output.WriteLine("FailCount:{0}, DoneCount:{1}, CircCount:{2}, TimeoutCount:{3}",
130+
result.FailCount, result.DoneCount, result.CircCount, result.TimeoutCount);
131+
}
132+
}
133+
}
134+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="TaskExtensions.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
12+
namespace Akka.Util.Extensions
13+
{
14+
internal static class TaskExtensions
15+
{
16+
#if NETSTANDARD
17+
public static async Task WaitAsync(this Task task, TimeSpan timeout)
18+
{
19+
var cts = new CancellationTokenSource();
20+
try
21+
{
22+
var delayTask = Task.Delay(timeout, cts.Token);
23+
var completedTask = await Task.WhenAny(task, delayTask);
24+
if (completedTask == delayTask)
25+
throw new TimeoutException($"Execution did not complete within the time allotted {timeout.TotalMilliseconds} ms");
26+
27+
await task;
28+
}
29+
finally
30+
{
31+
cts.Cancel();
32+
cts.Dispose();
33+
}
34+
}
35+
36+
public static async Task<TResult> WaitAsync<TResult>(this Task<TResult> task, TimeSpan timeout)
37+
{
38+
var cts = new CancellationTokenSource();
39+
try
40+
{
41+
var delayTask = Task.Delay(timeout, cts.Token);
42+
var completedTask = await Task.WhenAny(task, delayTask);
43+
return completedTask == delayTask
44+
? throw new TimeoutException($"Execution did not complete within the time allotted {timeout.TotalMilliseconds} ms")
45+
: await task;
46+
}
47+
finally
48+
{
49+
cts.Cancel();
50+
cts.Dispose();
51+
}
52+
}
53+
#endif
54+
}
55+
}

src/core/Akka/Util/Internal/AtomicState.cs

+26-99
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using System.Collections.Concurrent;
1010
using System.Runtime.ExceptionServices;
1111
using System.Threading.Tasks;
12+
using Akka.Util.Extensions;
1213

1314
namespace Akka.Util.Internal
1415
{
@@ -73,149 +74,78 @@ await Task
7374
/// <summary>
7475
/// Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed
7576
/// call timeout is counted as a failed call, otherwise a successful call
76-
///
77-
/// NOTE: In .Net there is no way to cancel an uncancellable task. We are merely cancelling the wait and marking this
78-
/// as a failure.
79-
///
80-
/// see http://blogs.msdn.com/b/pfxteam/archive/2011/11/10/10235834.aspx
8177
/// </summary>
82-
/// <typeparam name="T">TBD</typeparam>
83-
/// <param name="task">Implementation of the call</param>
84-
/// <returns>result of the call</returns>
78+
/// <param name="task"><see cref="Task"/> Implementation of the call</param>
79+
/// <returns><see cref="Task"/> containing the result of the call</returns>
8580
public async Task<T> CallThrough<T>(Func<Task<T>> task)
8681
{
87-
var deadline = DateTime.UtcNow.Add(_callTimeout);
88-
ExceptionDispatchInfo capturedException = null;
89-
T result = default(T);
82+
var result = default(T);
9083
try
9184
{
92-
result = await task().ConfigureAwait(false);
85+
result = await task().WaitAsync(_callTimeout).ConfigureAwait(false);
86+
CallSucceeds();
9387
}
9488
catch (Exception ex)
9589
{
96-
capturedException = ExceptionDispatchInfo.Capture(ex);
97-
}
98-
99-
// Need to make sure that timeouts are reported as timeouts
100-
if (capturedException != null)
101-
{
90+
var capturedException = ExceptionDispatchInfo.Capture(ex);
10291
CallFails(capturedException.SourceException);
10392
capturedException.Throw();
10493
}
105-
else if (DateTime.UtcNow.CompareTo(deadline) >= 0)
106-
{
107-
CallFails(new TimeoutException(
108-
$"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms"));
109-
}
110-
else
111-
{
112-
CallSucceeds();
113-
}
94+
11495
return result;
11596
}
116-
117-
public async Task<T> CallThrough<T,TState>(TState state, Func<TState,Task<T>> task)
97+
98+
public async Task<T> CallThrough<T, TState>(TState state, Func<TState, Task<T>> task)
11899
{
119-
var deadline = DateTime.UtcNow.Add(_callTimeout);
120-
ExceptionDispatchInfo capturedException = null;
121-
T result = default(T);
100+
var result = default(T);
122101
try
123102
{
124-
result = await task(state).ConfigureAwait(false);
103+
result = await task(state).WaitAsync(_callTimeout).ConfigureAwait(false);
104+
CallSucceeds();
125105
}
126106
catch (Exception ex)
127107
{
128-
capturedException = ExceptionDispatchInfo.Capture(ex);
129-
}
130-
131-
// Need to make sure that timeouts are reported as timeouts
132-
if (capturedException != null)
133-
{
108+
var capturedException = ExceptionDispatchInfo.Capture(ex);
134109
CallFails(capturedException.SourceException);
135110
capturedException.Throw();
136111
}
137-
else if (DateTime.UtcNow.CompareTo(deadline) >= 0)
138-
{
139-
CallFails(new TimeoutException(
140-
$"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms"));
141-
}
142-
else
143-
{
144-
CallSucceeds();
145-
}
112+
146113
return result;
147114
}
148115

149116
/// <summary>
150117
/// Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed
151118
/// call timeout is counted as a failed call, otherwise a successful call
152-
///
153-
/// NOTE: In .Net there is no way to cancel an uncancellable task. We are merely cancelling the wait and marking this
154-
/// as a failure.
155-
///
156-
/// see http://blogs.msdn.com/b/pfxteam/archive/2011/11/10/10235834.aspx
157119
/// </summary>
158120
/// <param name="task"><see cref="Task"/> Implementation of the call</param>
159-
/// <returns><see cref="Task"/></returns>
121+
/// <returns><see cref="Task"/> containing the result of the call</returns>
160122
public async Task CallThrough(Func<Task> task)
161123
{
162-
var deadline = DateTime.UtcNow.Add(_callTimeout);
163-
ExceptionDispatchInfo capturedException = null;
164-
165124
try
166125
{
167-
await task().ConfigureAwait(false);
126+
await task().WaitAsync(_callTimeout).ConfigureAwait(false);
127+
CallSucceeds();
168128
}
169129
catch (Exception ex)
170130
{
171-
capturedException = ExceptionDispatchInfo.Capture(ex);
172-
}
173-
174-
// Need to make sure that timeouts are reported as timeouts
175-
if (capturedException != null)
176-
{
177-
CallFails(capturedException?.SourceException);
131+
var capturedException = ExceptionDispatchInfo.Capture(ex);
132+
CallFails(capturedException.SourceException);
178133
capturedException.Throw();
179-
}
180-
else if (DateTime.UtcNow.CompareTo(deadline) >= 0)
181-
{
182-
CallFails(new TimeoutException(
183-
$"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms"));
184-
}
185-
else
186-
{
187-
CallSucceeds();
188134
}
189135
}
190-
136+
191137
public async Task CallThrough<TState>(TState state, Func<TState, Task> task)
192138
{
193-
var deadline = DateTime.UtcNow.Add(_callTimeout);
194-
ExceptionDispatchInfo capturedException = null;
195-
196139
try
197140
{
198-
await task(state).ConfigureAwait(false);
141+
await task(state).WaitAsync(_callTimeout).ConfigureAwait(false);
142+
CallSucceeds();
199143
}
200144
catch (Exception ex)
201145
{
202-
capturedException = ExceptionDispatchInfo.Capture(ex);
203-
}
204-
205-
// Need to make sure that timeouts are reported as timeouts
206-
if (capturedException != null)
207-
{
208-
CallFails(capturedException?.SourceException);
146+
var capturedException = ExceptionDispatchInfo.Capture(ex);
147+
CallFails(capturedException.SourceException);
209148
capturedException.Throw();
210-
}
211-
else if (DateTime.UtcNow.CompareTo(deadline) >= 0)
212-
{
213-
CallFails(new TimeoutException(
214-
$"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms"));
215-
}
216-
else
217-
{
218-
CallSucceeds();
219149
}
220150
}
221151

@@ -237,9 +167,7 @@ public abstract Task<T> InvokeState<T, TState>(TState state,
237167
/// <returns><see cref="Task"/> containing result of protected call</returns>
238168
public abstract Task Invoke(Func<Task> body);
239169

240-
public abstract Task InvokeState<TState>(TState state,
241-
Func<TState, Task> body);
242-
170+
public abstract Task InvokeState<TState>(TState state, Func<TState, Task> body);
243171

244172
/// <summary>
245173
/// Invoked when call fails
@@ -265,7 +193,6 @@ public void Enter()
265193
EnterInternal();
266194
NotifyTransitionListeners();
267195
}
268-
269196
}
270197

271198
/// <summary>

0 commit comments

Comments
 (0)