Skip to content

Commit 29c012a

Browse files
committed
[WIP] Chasing test suite issues
1 parent fee2e6d commit 29c012a

File tree

4 files changed

+83
-34
lines changed

4 files changed

+83
-34
lines changed

src/core/Akka.TestKit/TestKitBase_AwaitConditions.cs

+62-13
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public void AwaitCondition(Func<bool> conditionIsFulfilled, CancellationToken ca
3838
AwaitConditionAsync(async () => conditionIsFulfilled(), cancellationToken)
3939
.WaitAndUnwrapException();
4040
}
41-
41+
4242
public async Task AwaitConditionAsync(Func<Task<bool>> conditionIsFulfilled, CancellationToken cancellationToken = default)
4343
{
4444
var maxDur = RemainingOrDefault;
@@ -71,15 +71,15 @@ public void AwaitCondition(Func<bool> conditionIsFulfilled, TimeSpan? max, Cance
7171
AwaitConditionAsync(async () => conditionIsFulfilled(), max, cancellationToken)
7272
.WaitAndUnwrapException(cancellationToken);
7373
}
74-
74+
7575
public async Task AwaitConditionAsync(Func<Task<bool>> conditionIsFulfilled, TimeSpan? max, CancellationToken cancellationToken = default)
7676
{
7777
var maxDur = RemainingOrDilated(max);
7878
var interval = new TimeSpan(maxDur.Ticks / 10);
7979
var logger = _testState.TestKitSettings.LogTestKitCalls ? _testState.Log : null;
8080
await InternalAwaitConditionAsync(conditionIsFulfilled, maxDur, interval, (format, args) => _assertions.Fail(format, args), logger, cancellationToken);
8181
}
82-
82+
8383
/// <summary>
8484
/// <para>Await until the given condition evaluates to <c>true</c> or the timeout
8585
/// expires, whichever comes first.</para>
@@ -105,7 +105,7 @@ public void AwaitCondition(Func<bool> conditionIsFulfilled, TimeSpan? max, strin
105105
AwaitConditionAsync(async () => conditionIsFulfilled(), max, message, cancellationToken)
106106
.WaitAndUnwrapException();
107107
}
108-
108+
109109
public async Task AwaitConditionAsync(Func<Task<bool>> conditionIsFulfilled, TimeSpan? max, string message, CancellationToken cancellationToken = default)
110110
{
111111
var maxDur = RemainingOrDilated(max);
@@ -143,16 +143,16 @@ public async Task AwaitConditionAsync(Func<Task<bool>> conditionIsFulfilled, Tim
143143
/// <param name="message">The message used if the timeout expires.</param>
144144
/// <param name="cancellationToken"></param>
145145
public void AwaitCondition(Func<bool> conditionIsFulfilled, TimeSpan? max, TimeSpan? interval, string message = null, CancellationToken cancellationToken = default)
146-
{
146+
{
147147
AwaitConditionAsync(async () => conditionIsFulfilled(), max, interval, message, cancellationToken)
148148
.WaitAndUnwrapException(cancellationToken);
149149
}
150-
150+
151151
public async Task AwaitConditionAsync(Func<Task<bool>> conditionIsFulfilled, TimeSpan? max, TimeSpan? interval, string message = null, CancellationToken cancellationToken = default)
152152
{
153153
var maxDur = RemainingOrDilated(max);
154154
var logger = _testState.TestKitSettings.LogTestKitCalls ? _testState.Log : null;
155-
await InternalAwaitConditionAsync(conditionIsFulfilled, maxDur, interval,
155+
await InternalAwaitConditionAsync(conditionIsFulfilled, maxDur, interval,
156156
(format, args) => AssertionsFail(format, args, message), logger, cancellationToken);
157157
}
158158

@@ -179,7 +179,7 @@ public bool AwaitConditionNoThrow(Func<bool> conditionIsFulfilled, TimeSpan max,
179179
return AwaitConditionNoThrowAsync(async () => conditionIsFulfilled(), max, interval, cancellationToken)
180180
.WaitAndUnwrapException(cancellationToken);
181181
}
182-
182+
183183
public Task<bool> AwaitConditionNoThrowAsync(Func<Task<bool>> conditionIsFulfilled, TimeSpan max, TimeSpan? interval = null, CancellationToken cancellationToken = default)
184184
{
185185
var intervalDur = interval.GetValueOrDefault(TimeSpan.FromMilliseconds(100));
@@ -218,7 +218,7 @@ protected static bool InternalAwaitCondition(Func<bool> conditionIsFulfilled, Ti
218218
{
219219
return InternalAwaitCondition(conditionIsFulfilled, max, interval, fail, null, cancellationToken);
220220
}
221-
221+
222222
protected static Task<bool> InternalAwaitConditionAsync(Func<Task<bool>> conditionIsFulfilled, TimeSpan max, TimeSpan? interval, Action<string, object[]> fail
223223
, CancellationToken cancellationToken = default)
224224
{
@@ -258,7 +258,7 @@ protected static bool InternalAwaitCondition(Func<bool> conditionIsFulfilled, Ti
258258
{
259259
return InternalAwaitConditionAsync(async () => conditionIsFulfilled(), max, interval, fail, logger, cancellationToken)
260260
.WaitAndUnwrapException(cancellationToken);
261-
261+
262262
}
263263

264264
protected static async Task<bool> InternalAwaitConditionAsync(Func<Task<bool>> conditionIsFulfilled, TimeSpan max, TimeSpan? interval, Action<string, object[]> fail, ILoggingAdapter logger, CancellationToken cancellationToken = default)
@@ -293,10 +293,59 @@ protected static async Task<bool> InternalAwaitConditionAsync(Func<Task<bool>> c
293293
return true;
294294
}
295295

296-
private static void ConditionalLog(ILoggingAdapter logger, string format, params object[] args)
296+
protected void AwaitCond(Func<bool> p, TimeSpan? max = null, TimeSpan? interval = null, string message = "")
297297
{
298-
if (logger != null)
299-
logger.Debug(format, args);
298+
if (interval == null) interval = TimeSpan.FromMilliseconds(100);
299+
300+
var dilatedMax = RemainingOrDilated(max);
301+
var stop = Now + dilatedMax;
302+
303+
void Poll(TimeSpan t)
304+
{
305+
if (!p())
306+
{
307+
_assertions.AssertTrue(Now < stop, $"timeout {dilatedMax} expired: {message}");
308+
Thread.Sleep(t);
309+
Poll((stop - Now).Min(interval.Value));
310+
}
311+
}
312+
313+
Poll(dilatedMax.Min(interval.Value));
300314
}
315+
316+
protected void Within(TimeSpan max, Action f) =>
317+
Within(TimeSpan.Zero, max, f);
318+
319+
protected void Within(TimeSpan min, TimeSpan max, Action f)
320+
{
321+
var dilatedMax = Dilated(max);
322+
var start = Now;
323+
var rem = _testState.End.HasValue ? _testState.End.Value - start : Timeout.InfiniteTimeSpan;
324+
_assertions.AssertTrue(rem.IsInfiniteTimeout() || rem >= min, "Required min time {0} not possible, only {1} left.", min, rem);
325+
326+
_testState.LastWasNoMsg = false;
327+
328+
var maxDiff = dilatedMax.Min(rem);
329+
var prevEnd = _testState.End;
330+
_testState.End = start + maxDiff;
331+
332+
try
333+
{
334+
f();
335+
}
336+
finally
337+
{
338+
_testState.End = prevEnd;
339+
}
340+
341+
var diff = Now - start;
342+
_assertions.AssertTrue(min <= diff, $"block took {diff}, should at least have been {min}");
343+
if (!_testState.LastWasNoMsg)
344+
{
345+
_assertions.AssertTrue(diff <= maxDiff, $"block took {diff}, exceeding {maxDiff}");
346+
}
347+
}
348+
349+
private static void ConditionalLog(ILoggingAdapter logger, string format, params object[] args) => logger?.Info(format, args);
301350
}
302351
}

src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs

+13-13
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public void Must_increment_failure_count_on_callTimeout_before_call_finishes()
8484
var breaker = ShortCallTimeoutCb();
8585
Task.Run(() => breaker.Instance.WithSyncCircuitBreaker(() => Thread.Sleep(Dilated(TimeSpan.FromSeconds(1)))));
8686
Within(TimeSpan.FromMilliseconds(900),
87-
() => AwaitCondition(() => breaker.Instance.CurrentFailureCount == 1, Dilated(TimeSpan.FromMilliseconds(100))));
87+
() => AwaitCond(() => breaker.Instance.CurrentFailureCount == 1, Dilated(TimeSpan.FromMilliseconds(100))));
8888
}
8989
}
9090

@@ -194,7 +194,7 @@ public class AnAsynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase
194194
public async Task Must_allow_calls_through()
195195
{
196196
var breaker = LongCallTimeoutCb();
197-
var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout);
197+
var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync1(AwaitTimeout);
198198
Assert.Equal("hi", result);
199199
}
200200

@@ -203,7 +203,7 @@ public async Task Must_increment_failure_count_on_exception()
203203
{
204204
var breaker = LongCallTimeoutCb();
205205
await InterceptException<TestException>(() =>
206-
breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync(AwaitTimeout));
206+
breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync1(AwaitTimeout));
207207
Assert.True(CheckLatch(breaker.OpenLatch));
208208
breaker.Instance.CurrentFailureCount.ShouldBe(1);
209209
}
@@ -244,7 +244,7 @@ public async Task Must_increment_failure_count_on_callTimeout()
244244

245245
// Since the timeout should have happened before the inner code finishes
246246
// we expect a timeout, not TestException
247-
await InterceptException<TimeoutException>(() => future.WaitAsync(AwaitTimeout));
247+
await InterceptException<TimeoutException>(() => future.WaitAsync1(AwaitTimeout));
248248
}
249249

250250
[Fact(DisplayName = "An asynchronous circuit breaker that is closed must invoke onOpen if call fails and breaker transits to open state")]
@@ -264,7 +264,7 @@ public async Task Must_pass_through_next_call_and_close_on_success()
264264
var breaker = ShortResetTimeoutCb();
265265
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException));
266266
Assert.True(CheckLatch(breaker.HalfOpenLatch));
267-
var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout);
267+
var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync1(AwaitTimeout);
268268
Assert.Equal("hi", result);
269269
Assert.True(CheckLatch(breaker.ClosedLatch));
270270
}
@@ -277,7 +277,7 @@ public async Task Must_reopen_on_exception_in_call()
277277
Assert.True(CheckLatch(breaker.HalfOpenLatch));
278278
breaker.OpenLatch.Reset();
279279
await InterceptException<TestException>(() =>
280-
breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync(AwaitTimeout));
280+
breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync1(AwaitTimeout));
281281
Assert.True(CheckLatch(breaker.OpenLatch));
282282
}
283283

@@ -305,7 +305,7 @@ public async Task Must_throw_exceptions_when_called_before_reset_timeout()
305305
Assert.True(CheckLatch(breaker.OpenLatch));
306306

307307
await InterceptException<OpenCircuitException>(
308-
() => breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout));
308+
() => breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync1(AwaitTimeout));
309309
}
310310

311311
[Fact(DisplayName = "An asynchronous circuit breaker that is open must transition to half-open on reset timeout")]
@@ -323,7 +323,7 @@ public async Task Must_increase_reset_timeout_after_it_transits_to_open_again()
323323
_ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException));
324324
Assert.True(CheckLatch(breaker.OpenLatch));
325325

326-
var e1 = await InterceptException<OpenCircuitException>(
326+
var e1 = await InterceptException<OpenCircuitException>(
327327
() => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)));
328328
var shortRemainingDuration = e1.RemainingDuration;
329329

@@ -379,23 +379,23 @@ protected static async Task<T> InterceptException<T>(Func<Task> actionThatThrows
379379
throw new ThrowsException(typeof(T));
380380
}
381381

382-
public TestBreaker ShortCallTimeoutCb() =>
382+
public TestBreaker ShortCallTimeoutCb() =>
383383
new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(50)), Dilated(TimeSpan.FromMilliseconds(500))));
384384

385-
public TestBreaker ShortResetTimeoutCb() =>
385+
public TestBreaker ShortResetTimeoutCb() =>
386386
new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(1000)), Dilated(TimeSpan.FromMilliseconds(50))));
387387

388-
public TestBreaker LongCallTimeoutCb() =>
388+
public TestBreaker LongCallTimeoutCb() =>
389389
new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromSeconds(5), Dilated(TimeSpan.FromMilliseconds(500))));
390390

391391
public TimeSpan LongResetTimeout = TimeSpan.FromSeconds(5);
392392
public TestBreaker LongResetTimeoutCb() =>
393393
new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(100)), Dilated(LongResetTimeout)));
394394

395-
public TestBreaker MultiFailureCb() =>
395+
public TestBreaker MultiFailureCb() =>
396396
new TestBreaker(new CircuitBreaker(Sys.Scheduler, 5, Dilated(TimeSpan.FromMilliseconds(200)), Dilated(TimeSpan.FromMilliseconds(500))));
397397

398-
public TestBreaker NonOneFactorCb() =>
398+
public TestBreaker NonOneFactorCb() =>
399399
new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(2000)), Dilated(TimeSpan.FromMilliseconds(1000)), Dilated(TimeSpan.FromDays(1)), 5, 0));
400400
}
401401

src/core/Akka/Util/Extensions/TaskExtensions.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ namespace System.Threading.Tasks
99
{
1010
internal static class TaskExtensions
1111
{
12-
#if NETSTANDARD
13-
public static async Task WaitAsync(this Task task, TimeSpan timeout)
12+
//#if NETSTANDARD
13+
public static async Task WaitAsync1(this Task task, TimeSpan timeout)
1414
{
1515
var cts = new CancellationTokenSource();
1616
try
@@ -29,7 +29,7 @@ public static async Task WaitAsync(this Task task, TimeSpan timeout)
2929
}
3030
}
3131

32-
public static async Task<TResult> WaitAsync<TResult>(this Task<TResult> task, TimeSpan timeout)
32+
public static async Task<TResult> WaitAsync1<TResult>(this Task<TResult> task, TimeSpan timeout)
3333
{
3434
var cts = new CancellationTokenSource();
3535
try
@@ -46,6 +46,6 @@ public static async Task<TResult> WaitAsync<TResult>(this Task<TResult> task, Ti
4646
cts.Dispose();
4747
}
4848
}
49-
#endif
49+
//#endif
5050
}
5151
}

src/core/Akka/Util/Internal/AtomicState.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public async Task<T> CallThrough<T>(Func<Task<T>> task)
8181
var result = default(T);
8282
try
8383
{
84-
result = await task().WaitAsync(_callTimeout).ConfigureAwait(false);
84+
result = await task().WaitAsync1(_callTimeout).ConfigureAwait(false);
8585
CallSucceeds();
8686
}
8787
catch (Exception ex)
@@ -99,7 +99,7 @@ public async Task<T> CallThrough<T, TState>(TState state, Func<TState, Task<T>>
9999
var result = default(T);
100100
try
101101
{
102-
result = await task(state).WaitAsync(_callTimeout).ConfigureAwait(false);
102+
result = await task(state).WaitAsync1(_callTimeout).ConfigureAwait(false);
103103
CallSucceeds();
104104
}
105105
catch (Exception ex)
@@ -122,7 +122,7 @@ public async Task CallThrough(Func<Task> task)
122122
{
123123
try
124124
{
125-
await task().WaitAsync(_callTimeout).ConfigureAwait(false);
125+
await task().WaitAsync1(_callTimeout).ConfigureAwait(false);
126126
CallSucceeds();
127127
}
128128
catch (Exception ex)
@@ -137,7 +137,7 @@ public async Task CallThrough<TState>(TState state, Func<TState, Task> task)
137137
{
138138
try
139139
{
140-
await task(state).WaitAsync(_callTimeout).ConfigureAwait(false);
140+
await task(state).WaitAsync1(_callTimeout).ConfigureAwait(false);
141141
CallSucceeds();
142142
}
143143
catch (Exception ex)

0 commit comments

Comments
 (0)