Skip to content

Commit 4d28ef7

Browse files
committed
Alternative implementation of AtomicState leveraging WaitAsync
1 parent b0529e9 commit 4d28ef7

File tree

4 files changed

+223
-120
lines changed

4 files changed

+223
-120
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="CircuitBreakerStressSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Linq;
10+
using System.Threading;
11+
using System.Threading.Tasks;
12+
using Akka.Actor;
13+
using Akka.Pattern;
14+
using Akka.TestKit;
15+
using Xunit;
16+
using Xunit.Abstractions;
17+
18+
namespace Akka.Tests.Pattern
19+
{
20+
public class CircuitBreakerStressSpec : AkkaSpec
21+
{
22+
internal class RequestJob
23+
{
24+
public static RequestJob Instance => new RequestJob();
25+
private RequestJob() { }
26+
}
27+
28+
internal class JobDone
29+
{
30+
public static JobDone Instance => new JobDone();
31+
private JobDone() { }
32+
}
33+
34+
internal class GetResult
35+
{
36+
public static GetResult Instance => new GetResult();
37+
private GetResult() { }
38+
}
39+
40+
internal class Result
41+
{
42+
public int DoneCount { get; }
43+
public int TimeoutCount { get; }
44+
public int FailCount { get; }
45+
public int CircCount { get; }
46+
47+
public Result(int doneCount, int timeoutCount, int failCount, int circCount)
48+
{
49+
DoneCount = doneCount;
50+
TimeoutCount = timeoutCount;
51+
FailCount = failCount;
52+
CircCount = circCount;
53+
}
54+
}
55+
56+
internal class StressActor : UntypedActor
57+
{
58+
private readonly CircuitBreaker _breaker;
59+
private int _doneCount;
60+
private int _timeoutCount;
61+
private int _failCount;
62+
private int _circCount;
63+
64+
public StressActor(CircuitBreaker breaker) => _breaker = breaker;
65+
66+
protected override void OnReceive(object message)
67+
{
68+
switch (message)
69+
{
70+
case RequestJob _:
71+
_breaker.WithCircuitBreaker(Job).PipeTo(Self);
72+
break;
73+
case JobDone _:
74+
_doneCount++;
75+
break;
76+
case Status.Failure failure when failure.Cause is OpenCircuitException:
77+
_circCount++;
78+
_breaker.WithCircuitBreaker(Job).PipeTo(Self);
79+
break;
80+
case Status.Failure failure when failure.Cause is TimeoutException:
81+
_timeoutCount++;
82+
_breaker.WithCircuitBreaker(Job).PipeTo(Self);
83+
break;
84+
case Status.Failure _:
85+
_failCount++;
86+
_breaker.WithCircuitBreaker(Job).PipeTo(Self);
87+
break;
88+
case GetResult _:
89+
Sender.Tell(new Result(_doneCount, _timeoutCount, _failCount, _circCount));
90+
break;
91+
default:
92+
base.Unhandled(message);
93+
break;
94+
}
95+
}
96+
97+
private async Task<JobDone> Job()
98+
{
99+
await Task.Delay(TimeSpan.FromMilliseconds(300));
100+
return JobDone.Instance;
101+
}
102+
}
103+
104+
public CircuitBreakerStressSpec(ITestOutputHelper output)
105+
: base(output)
106+
{ }
107+
108+
[Fact]
109+
public void A_CircuitBreaker_stress_test()
110+
{
111+
var breaker = new CircuitBreaker(Sys.Scheduler, 5, TimeSpan.FromMilliseconds(200), TimeSpan.FromSeconds(200));
112+
var stressActors = Enumerable.Range(0, 3).Select(i => Sys.ActorOf(Props.Create<StressActor>(breaker))).ToList();
113+
114+
for (var i = 0; i < 1000; i++)
115+
foreach (var stressActor in stressActors)
116+
{
117+
stressActor.Tell(RequestJob.Instance);
118+
}
119+
120+
// let them work for a while
121+
Thread.Sleep(3000);
122+
123+
foreach (var stressActor in stressActors)
124+
{
125+
stressActor.Tell(GetResult.Instance);
126+
var result = ExpectMsg<Result>();
127+
result.DoneCount.ShouldBe(0);
128+
129+
Output.WriteLine("FailCount:{0}, DoneCount:{1}, CircCount:{2}, TimeoutCount:{3}",
130+
result.FailCount, result.DoneCount, result.CircCount, result.TimeoutCount);
131+
}
132+
}
133+
}
134+
}

src/core/Akka/Pattern/CircuitBreaker.cs

+8-21
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,7 @@ public CircuitBreaker(IScheduler scheduler, int maxFailures, TimeSpan callTimeou
214214
/// <summary>
215215
/// Retrieves current failure count.
216216
/// </summary>
217-
public long CurrentFailureCount
218-
{
219-
get { return Closed.Current; }
220-
}
217+
public long CurrentFailureCount => Closed.Current;
221218

222219
public Exception LastCaughtException { get; private set; }
223220

@@ -227,30 +224,20 @@ public long CurrentFailureCount
227224
/// <typeparam name="T">TBD</typeparam>
228225
/// <param name="body">Call needing protected</param>
229226
/// <returns><see cref="Task"/> containing the call result</returns>
230-
public Task<T> WithCircuitBreaker<T>(Func<Task<T>> body)
231-
{
232-
return CurrentState.Invoke(body);
233-
}
227+
public Task<T> WithCircuitBreaker<T>(Func<Task<T>> body) => CurrentState.Invoke(body);
234228

235-
public Task<T> WithCircuitBreaker<T, TState>(TState state,
236-
Func<TState, Task<T>> body)
237-
{
238-
return CurrentState.InvokeState(state, body);
239-
}
229+
public Task<T> WithCircuitBreaker<T, TState>(TState state, Func<TState, Task<T>> body) =>
230+
CurrentState.InvokeState(state, body);
240231

241232
/// <summary>
242233
/// Wraps invocation of asynchronous calls that need to be protected
243234
/// </summary>
244235
/// <param name="body">Call needing protected</param>
245236
/// <returns><see cref="Task"/></returns>
246-
public Task WithCircuitBreaker(Func<Task> body)
247-
{
248-
return CurrentState.Invoke(body);
249-
}
250-
public Task WithCircuitBreaker<TState>(TState state, Func<TState, Task> body)
251-
{
252-
return CurrentState.InvokeState(state, body);
253-
}
237+
public Task WithCircuitBreaker(Func<Task> body) => CurrentState.Invoke(body);
238+
239+
public Task WithCircuitBreaker<TState>(TState state, Func<TState, Task> body) =>
240+
CurrentState.InvokeState(state, body);
254241

255242
/// <summary>
256243
/// The failure will be recorded farther down.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="TaskExtensions.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
12+
namespace Akka.Util.Extensions
13+
{
14+
internal static class TaskExtensions
15+
{
16+
#if NETSTANDARD
17+
public static async Task WaitAsync(this Task task, TimeSpan timeout)
18+
{
19+
var cts = new CancellationTokenSource();
20+
try
21+
{
22+
var delayTask = Task.Delay(timeout, cts.Token);
23+
var completedTask = await Task.WhenAny(task, delayTask);
24+
if (completedTask == delayTask)
25+
throw new TimeoutException($"Execution did not complete within the time allotted {timeout.TotalMilliseconds} ms");
26+
27+
await task;
28+
}
29+
finally
30+
{
31+
cts.Cancel();
32+
cts.Dispose();
33+
}
34+
}
35+
36+
public static async Task<TResult> WaitAsync<TResult>(this Task<TResult> task, TimeSpan timeout)
37+
{
38+
var cts = new CancellationTokenSource();
39+
try
40+
{
41+
var delayTask = Task.Delay(timeout, cts.Token);
42+
var completedTask = await Task.WhenAny(task, delayTask);
43+
return completedTask == delayTask
44+
? throw new TimeoutException($"Execution did not complete within the time allotted {timeout.TotalMilliseconds} ms")
45+
: await task;
46+
}
47+
finally
48+
{
49+
cts.Cancel();
50+
cts.Dispose();
51+
}
52+
}
53+
#endif
54+
}
55+
}

0 commit comments

Comments
 (0)